124. Design Patterns

Design patterns are ways to structure code, particularity when using object-based programming. One or more design patterns may be used in any application.

Based on Mastering Design Patterns by Ayeva and Kaspamaplis

See also https://en.wikipedia.org/wiki/Software_design_pattern

Creational Patterns

  1. Factory centralises creation of object instances into a function(s), so all object instance creations can easily be found. Decouples generation of object from accessing objection.

  2. Abstract factory extends factory method (which centralises object instance creation). It contains a collection of object instance factories which may be chosen from (e.g. forms styled for Windows or Mac depending on OS).

  3. Builder constructs a complex object from component objects (often where order of construction is important) – e.g. build web page from component parts (objects). The Director controls the construction. Returns a single final object (built from component objects).

  4. Prototype creates objects by cloning an existing object. Can also be useful for create an archive copy of an object at a given point in time. Python has built in clone method for this.

  5. Singleton is a class which allows only one instance of the class (e.g. for storing the global state of a program, or for a coordinating object, or controlling accesses to a shared resource).

Structural Design Patterns

  1. Adapter is a class of objects for creating an interface between two otherwise incompatible objects (e.g. interfacing between old and new software without needing to change either the old or new object architecture, or when using software where the source code is hidden). It is based on a dictionary that matches new_object.method() to old_object_method().

  2. Decorator extends the function of a class, object or method. Especially useful when many different functions require the same extension, e.g. timing function, validating inputs, monitoring/logging, adding GUI components.

  3. Bridge provides an interface between general code and specific use cases, e.g. processing data with a bridge that allows multiple sources of data, device drivers allowing generic output targeted to specific devices, or using alternative themed GUI implementations.

  4. Facade provides a simplified application interface to the client, hiding the underlying complex application. Client code may then call just one class/method. Alternatively, in a complex system, parts of the system may communicate with each other through a limited number of facades. Facades can also help to keep client cod access independent of underlying code.

  5. Flyweight minimises memory usage by sharing resources as much as possible. A flyweight object contains state-independent immutable data (intrinsic data) that is used by other objects. Flyweight objects should not contain state-dependent mutable data (extrinsic data). Look for what data is common across objects and extract that to a flyweight object.

  6. Model-View-Controller (MVC) uses the principle of Separation of Concerns (SoC) where an application is divided into distinct sections. MVC describes application of SoC to OOP. 1) Model: the core of the program (data, logic, rules, state). 2) View: visual representation of the model, such as a GUI, keyboard or mouse input, text output, chart output, etc. 3) Controller: the link between the model and the view. MVC allows separation of view and model via different controllers, allowing for different views in different circumstances. MVC may also make use of different adaptors.

  7. Proxy uses a surrogate object to access an actual object. Examples are 1) remote proxy which represents an object that is held elsewhere, 2) virtual proxy which uses lazy implementation to delay creation of an object until and if actually required, 3) protection proxy controls access to a sensitive object, and 4) smart (reference) proxy which performs extra actions as an object is accessed (e.g. counting number of times object accessed, or thread-safety checking).

Behavioural Design Patterns

  1. Chain of Responsibility is used when it is not known which object will respond to a request. The request is sent to multiple objects (e.g. nodes on a network). AN object will then decide whether to accept the request, forward the request, or reject the request. This chain continues until all required actions are completed.The Chain of Responsibility creates a flow of requests to achieve a task. The flow of requests is worked out at the time, rather than the route being pre-defined. The client only needs to know how to communicate with the head of the chain.

  2. Command encapsulates all required actions in a command, often including ability to undo. GUI buttons may also use command to execute optoin or display tool tips. Macros may be an example of command: a sequence of steps to perform.

  3. Observer (or Publish/Subscribe) defines a one-to-many dependency between objects where a state change in one object results in all its dependents being notified and updated automatically.

  4. State allows an object to alter its behavior when its internal state changes. The object will appear to change its behaviour. For example in a game amonster might go from sleep to wake to attack. Python has the module state_machine to assist in state machine programming.

  5. Interpreter is a simple langauge to allow advanced users of a system more control in the system.

  6. Strategy defines a family of algorithms, encapsulates each one, and makes them interchangeable. Strategy lets the algorithm vary independently from clients that use it. For example Python may select a specific sort method depending on the data.

  7. Memento supports history and undo, and enables restore to a previous state.

  8. Iterator Provide a way to access the elements of an aggregate object sequentially without exposing its underlying representation.

  9. Template Define the skeleton of an algorithm in an operation, deferring some steps to subclasses. Template method lets subclasses redefine certain steps of an algorithm without changing the algorithm’s structure.

Microservices and patterns for cloud

  1. Microservices breaks applications down into smaller applications, each of which may be developed and deployed separately. Frequently each microservice will run in a container that also has all the required dependencies.

  2. Retry is common in micrososervices, and allows for temporary unavailability of a dependent services. The process may be retired, either immediately or after waiting a few seconds.

  3. Circuit Breaker monitors a service and shuts down a service if reliability is too low.

  4. Cache-Aside holds commonly used data in memory rather than re-reading from disc.

  5. Throttling limits the number of requests a client can send.

123. Time for some fun. A quick game of ‘pong’

This code requires the installation of pygame (pip install pygame).

# To run this game install pygame with `pip install pygame`

import pygame
import random
from time import sleep

class Ball():
    """Class to define a ball"""
    
    def __init__(self, screen_width, screen_height):
        self.size = 10
        self.x = random.randint(0, screen_width * 0.8)
        self.y = random.randint(0, screen_height * 0.8)
        self.x_vel = 1
        self.y_vel = 1
        self.screen_width = screen_width
        self.screen_height = screen_height
        return
        

    def detect_collision(self, paddle_y, paddle_height, paddle_width):
        """"Method to detect screen edge, detetct losing,
        and detect hitting the paddle """
        
        lose = 0
        hit = 0

        # Check if ball has hit the left hand side of the screen
        if self.x <= 0:
            # Mark as lose (will be over-ridden if ball hits paddle)
            lose = 1

        # Check if ball has hit the right hand side of the screen
        if self.x + self.size >= self.screen_width:
            # Reverse x velcoity
            self.x_vel = -self.x_vel

        # Check if ball has hit top or bottom of screen
        if self.y <=0 or self.y + self.size >= self.screen_height:
            # Reverse y velocity
            self.y_vel = -self.y_vel
            
        # Paddle collision detection
        
        ball_centre_y = self.y + (self.size/2)
        
        if (self.x <= paddle_width and ball_centre_y > paddle_y and 
                ball_centre_y < paddle_y + paddle_height):
            
            # Ball hits paddle. Record and reverse x velocity
            lose = 0
            hit = 1
            self.x_vel *= -1   
            
            # identify region of paddle hit (and adjust y velocity)
            region_fraction = (ball_centre_y - paddle_y) / paddle_height
            if region_fraction <= 0.25:
                self.y_vel = -2
            elif region_fraction <= 0.5:
                self.y_vel = -1
            elif region_fraction <= 0.75:
                self.y_vel = 1
            else:
                self.y_vel = 2           
        
        # Return if game lost or if paddle has hit ball
        return lose, hit
    
   
    def move_ball(self, paddle_y, paddle_height, paddle_width):
        """Method to move ball"""
        
        # Move ball
        self.x += self.x_vel
        self.y += self.y_vel

        # Check for collision with screen edges or paddle
        lose = self.detect_collision(paddle_y, paddle_height, paddle_width)
        
        return lose

class Game():
    """
    Class to initiaite and run game
    """

    def __init__(self):
        """
        Constructor method for game
        """

        # Initialise pygame
        pygame.init()

        # Set initial lives
        self.lives = 3    

        # Hit counter (number of times the paddle hits the ball)
        self.hits = 0

        # Set delay between game loops (will reduce during the game)
        self.delay = int(5)

        # Set text font
        # Use pygame.font.get_fonts() after pygame.init() to see avilable fonts
        self.font = pygame.font.SysFont('dejavusansmono', 18)
         
        # Set window width and height
        self.screen_width = 800
        self.screen_height = 600

        # Initiate pygame window
        self.win = (pygame.display.set_mode(
            (self.screen_width, self.screen_height)))
        pygame.display.set_caption('Moving blocks')
        
        # Repeat game while lives remain 
        while self.lives > 0:

            # Initialise ball
            self.ball = Ball(self.screen_width, self.screen_height)
            
            # Initialise paddle
            self.paddle = Paddle(self.screen_width, self.screen_height)
            
            # Initiate game loop (ends with life lost)
            self.continue_loop = True
            self.loop_game()
            
            # Once game loop is finished subtract a life
            self.lives -= 1

            # Call display to show when miss
            self.display_on_miss()

        # Quit game
        pygame.quit()
        
        return
        
    
    def check_events(self):
        """
        Check for close game window
        """
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
        
        return 
    

    def display_on_miss(self):
        # Clear screen
        self.win.fill((0, 0, 0))
        
        # Display message if lives are left
        if self.lives > 0:
            text_string = 'You missed! Remiaining lives: ' + str (self.lives)
            self.win.blit(self.font.render(
                text_string, True, (255,0,0)), (250, 250))
            
        # Display message if no lives left    
        else:
            text_string = 'Game over. Number of hits: ' + str(self.hits)
            self.win.blit(self.font.render(
                text_string, True, (255,0,0)), (250, 250))

        # Render new screen
        pygame.display.update()

        # Display for two seconds before continuing
        sleep (2)

        # Display message for all
        text_string = 'Press any key to continue.'
        self.win.blit(self.font.render(
            text_string, True, (255,0,0)), (250, 280))

        # Render new screen
        pygame.display.update()

        # Wait for key press
        pygame.event.clear()
        pygame.event.wait()
        
        return


    def loop_game(self):
        """
        Main game loop
        """

        # Reset game loop delay (controls speed of movement)
        self.delay = int(5)

        # Record nmber of hits in this game (used to increase game speed)
        self.hits_this_ball = 0

        while self.continue_loop:

            # Loop delay (longer delay leads to slower movement)
            pygame.time.delay(self.delay)
            
            # Check events (for game close)
            self.check_events()
            
            # Clear screen to all black
            self.win.fill((0, 0, 0))
            
            # Move ball (and check for miss otr hit with paddle)
            miss, hit = self.ball.move_ball(
                self.paddle.y, self.paddle.height, self.paddle.width)

            # Increment hits and reduce loop pause every 5 hits
            self.hits += hit
            self.hits_this_ball += hit
            if hit ==1 and self.hits_this_ball % 5 == 0 and self.delay > 0:
                self.delay -= 1

            # Set loop to stop if ball has gone out of play
            if miss == 1:
                self.continue_loop = False
            
            # Move paddle
            self.paddle.move()
            
            # Redraw ball
            pygame.draw.rect(self.win,(255, 255, 255), 
                (self.ball.x, self.ball.y, self.ball.size, self.ball.size))
            
            # Redraw paddle
            pygame.draw.rect(self.win, (255, 255, 255),
                (self.paddle.x, self.paddle.y, 
                self.paddle.width, self.paddle.height))
                
            # Display lives left and hits (top right of screen)

            text_string = 'Lives left: ' + str (self.lives)
            self.win.blit(
                self.font.render(
                    text_string, True, (255,0,0)), (650, 15))

            text_string = 'Hits: ' + str (self.hits)
            self.win.blit(
                self.font.render(
                    text_string, True, (255,0,0)), (650, 35))
            
            # Render new screen
            pygame.display.update()
            
        return
      
class Paddle():
    """Paddle class"""
    
    def __init__(self, screen_width, screen_height):
        self.width = 5
        self.height = 100 
        self.screen_width = screen_width
        self.screen_height = screen_height
        self.x = 0
        self.y = int(screen_height/2) 
        self.velocity = 3
        
        return
        
    def move(self):
        """Move paddle"""
        # Move paddle if key is held down
        keys = pygame.key.get_pressed()
        
        if keys[pygame.K_UP] and self.y > 0:
            self.y -= self.velocity
            
        if keys[pygame.K_DOWN] and self.y + self.height < self.screen_height:
            self.y += self.velocity
        
        return
               
game = Game()    

122: Oversampling to correct for imbalanced data using naive sampling or SMOTE

Machine learning can have poor performance for minority classes (where one or more classes represent only a small proportion of the overall data set compared with a dominant class). One method of improving performance is to balance out the number of examples between different classes. Here two methods are described:

  1. Resampling from the minority classes to give the same number of examples as the majority class.
  2. SMOTE (Synthetic Minority Over-sampling Technique): creating synthetic data based on creating new data points that are mid-way between two near neighbours in any particular class.

SMOTE uses imblearn See: https://imbalanced-learn.org

Install with: pip install -U imbalanced-learn, or conda install -c conda-forge imbalanced-learn

Reference

N. V. Chawla, K. W. Bowyer, L. O.Hall, W. P. Kegelmeyer, “SMOTE: synthetic minority over-sampling technique,” Journal of artificial intelligence research, 16, 321-357, 2002

Create dummy data

First we will create some unbalanced dummy data: Classes 0, 1 and 2 will represent 1%, 5% and 94% of the data respectively.

from sklearn.datasets import make_classification
X, y = make_classification(n_samples=5000, n_features=2, n_informative=2, n_redundant=0, n_repeated=0, n_classes=3,                            n_clusters_per_class=1, weights=[0.01, 0.05, 0.94],                            class_sep=0.8, random_state=0)

Count instances of each class.

from collections import Counter
print(sorted(Counter(y).items()))

OUT:
[(0, 64), (1, 262), (2, 4674)]

Define function to plot data

import matplotlib.pyplot as plt

def plot_classes(X,y):
    colours = ['k','b','g']
    point_colours = [colours[val] for val in y]
    X1 = X[:,0]
    X2 = X[:,1]
    plt.scatter(X1, X2, facecolor = point_colours, edgecolor = 'k')
    plt.show()

Plots data using function

plot_classes(X,y)

Oversample with naive sampling to match numbers in each class

With naive resampling we repeatedly randomly sample from the minority classes and add that the new sample to the existing data set, leading to multiple instances of the minority classes.This builds up the number of minority class samples.

from imblearn.over_sampling import RandomOverSampler
ros = RandomOverSampler(random_state=0)
X_resampled, y_resampled = ros.fit_resample(X, y)

Count instances of each class in the augmented data.

from collections import Counter
print(sorted(Counter(y_resampled).items()))

OUT:
[(0, 4674), (1, 4674), (2, 4674)]

Plot augmented data (it looks the same as the original as points are overlaid).

plot_classes(X_resampled,y_resampled)

SMOTE with continuous variables

SMOTE (synthetic minority oversampling technique) works by finding two near neighbours in a minority class, producing a new point midway between the two existing points and adding that new point in to the sample. The example shown is in two dimensions, but SMOTE will work across multiple dimensions (features). SMOTE therefore helps to ‘fill in’ the feature space occupied by minority classes.

from imblearn.over_sampling import SMOTE
X_resampled, y_resampled = SMOTE().fit_resample(X, y)

# Count instances of each class
from collections import Counter
print(sorted(Counter(y_resampled).items()))

OUT:
[(0, 4674), (1, 4674), (2, 4674)]

Plot augmented data (note minority class data points now exist in new spaces).

SMOTE with mixed continuous and binary/categorical values

It is not possible to calculate a ‘mid point’ between two points of binary or categorical data. An extension to the SMOTE method allows for use of binary or categorical data by taking the most common occurring category of nearest neighbours to a minority class point.

# create a synthetic data set with continuous and categorical features
import numpy as np
rng = np.random.RandomState(42)
n_samples = 50
X = np.empty((n_samples, 3), dtype=object)
X[:, 0] = rng.choice(['A', 'B', 'C'], size=n_samples).astype(object)
X[:, 1] = rng.randn(n_samples)
X[:, 2] = rng.randint(3, size=n_samples)
y = np.array([0] * 20 + [1] * 30)

Count instances of each class

print(sorted(Counter(y).items()))

OUT:
[(0, 20), (1, 30)]

Show last 10 original data points

print (X[-10:])

OUT:
[['A' 1.4689412854323924 2]
 ['C' -1.1238983345400366 0]
 ['C' 0.9500053955071801 2]
 ['A' 1.7265164685753638 1]
 ['A' 0.4578850770000152 0]
 ['C' -1.6842873783658814 0]
 ['B' 0.32684522397001387 0]
 ['A' -0.0811189541586873 2]
 ['B' 0.46779475326315173 1]
 ['B' 0.7361223506692577 0]]

Use SMOTENC to create new data points.

from imblearn.over_sampling import SMOTENC
smote_nc = SMOTENC(categorical_features=[0, 2], random_state=0)
X_resampled, y_resampled = smote_nc.fit_resample(X, y)

Count instances of each class

print(sorted(Counter(y_resampled).items()))

OUT:
[(0, 30), (1, 30)]

Show last 10 values of X (SMOTE data points are added to the end of the original data set)

print (X_resampled[-10:])

[['C' -1.0600505672469849 1]
 ['C' -0.36965644259183145 1]
 ['A' 0.1453826708354494 2]
 ['C' -1.7442827953859052 2]
 ['C' -1.6278053447258838 2]
 ['A' 0.5246469549655818 2]
 ['B' -0.3657680728116921 2]
 ['A' 0.9344237230779993 2]
 ['B' 0.3710891618824609 2]
 ['B' 0.3327240726719727 2]]

121. More list comprehension examples

Example 1 – double the numbers

Standard loop approach:

foo = [1, 2, 3, 4]
bar = []

for x in foo:
    bar.append(x * 2)
    
print(bar)

Out:

[2, 4, 6, 8]

Using list comprehension:

foo = [1, 2, 3, 4]
bar = [x * 2 for x in foo]
print(bar)

Out:

[2, 4, 6, 8]

Example 2 – convert Celsius to Fahrenheit

This example calls a function from within the list comprehension.

Define the function:

def convert_celsius_to_fahrenheit(deg_celsius):
    """
    Convert degress celsius to fahrenheit
    Returns float value - temp in fahrenheit
    Keyword arguments:
        def_celcius -- temp in degrees celsius
    """
    return (9/5) * deg_celsius + 32

Standard loop approach:

#list of temps in degree celsius to convert to fahrenheit
celsius = [39.2, 36.5, 37.3, 41.0]

#standard for loop approach
fahrenheit = []
for x in celsius:
    fahrenheit.append(convert_celsius_to_fahrenheit(x))

print('Using standard for loop: {}'.format(fahrenheit))

Out:

Using standard for loop: [102.56, 97.7, 99.14, 105.8]

Using list comprehension

fahrenheit = [convert_celsius_to_fahrenheit(x) for x in celsius]
print('Using list comprehension: {}'.format(fahrenheit))

Out:

Using list comprehension: [102.56, 97.7, 99.14, 105.8]

Example 3 – convert the strings to different data types

This example also make use of the zip function. Zip allows you to iterate through two lists at the same time.

inputs = ["1", "3.142", "True", "spam"]
converters = [int, float, bool, str]

values_with_correct_data_types = [t(s) for (s, t) in zip(inputs, converters)]
print(values_with_correct_data_types)

Out:

[1, 3.142, True, 'spam']

Example 4 – Using if statements within a list comprehension

The example filters a list of file names to the python files only

unfiltered_files = ['test.py', 'names.csv', 'fun_module.py', 'prog.config']

# Standard loop form
python_files = []
# filter the files using a standard for loop 
for file in unfiltered_files:
    if file[-2:] == 'py':
        python_files.append(file)
        
print('using standard for loop: {}'.format(python_files))

#list comprehension
python_files = [file for file in unfiltered_files if file[-2:] == 'py']

print('using list comprehension {}'.format(python_files))

Out:

using standard for loop: ['test.py', 'fun_module.py']
using list comprehension ['test.py', 'fun_module.py']

Example 5 – List comprehension to create a list of lists

list_of_lists = []

# Standard loop form
for i in range(5):
    sub_list = []
    for j in range(3):
        sub_list.append(i * j)
    list_of_lists.append(sub_list)

print(list_of_lists)


# List comprehension
list_of_lists = [[i * j for j in range(3)] for i in range(5)]

print(list_of_lists)

Out:

[[0, 0, 0], [0, 1, 2], [0, 2, 4], [0, 3, 6], [0, 4, 8]]
[[0, 0, 0], [0, 1, 2], [0, 2, 4], [0, 3, 6], [0, 4, 8]]

Example 6: Iterate over all items in a list of lists

The code converts a list of lists to a list of items
We call this flattening the list.

list_of_lists = [[8, 2, 1], [9, 1, 2], [4, 5, 100]]

# Standard loop form
flat_list = []
for row in list_of_lists:
    for col in row:
        flat_list.append(col)

print(flat_list)

# List comprehension:
flat_list = [item for sublist in list_of_lists for item in sublist]
print(flat_list)

Out:

[8, 2, 1, 9, 1, 2, 4, 5, 100]
[8, 2, 1, 9, 1, 2, 4, 5, 100]

120. Generating log normal samples from provided arithmetic mean and standard deviation of original population

The log normal distribution is frequently a useful distribution for mimicking process times in healthcare pathways (or many other non-automated processes). The distribution has a right skew which may frequently occur when some clinical process step has some additional complexity to it compared to the ‘usual’ case.

To sample from a log normal distribution we need to convert the mean and standard deviation that was calculated from the original non-logged population into the mu and sigma of the underlying log normal population.

(For maximum computation effuiciency, when calling the function repeatedly using the same mean and standard deviation, you may wish to split this into two functions – one to calculate mu and sigma which needs only calling once, and the other to sample from the log normal distribution given mu and sigma).

For more on the maths see:

https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal-data-with-specified-mean-and-variance.html

import numpy as np

def generate_lognormal_samples(mean, stdev, n=1):
    """
    Returns n samples taken from a lognormal distribution, based on mean and
    standard deviation calaculated from the original non-logged population.
    
    Converts mean and standard deviation to underlying lognormal distribution
    mu and sigma based on calculations desribed at:
        https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal-data-
        with-specified-mean-and-variance.html
        
    Returns a numpy array of floats if n > 1, otherwise return a float
    """
    
    # Calculate mu and sigma of underlying lognormal distribution
    phi = (stdev ** 2 + mean ** 2) ** 0.5
    mu = np.log(mean ** 2 / phi)
    sigma = (np.log(phi ** 2 / mean ** 2)) ** 0.5
    
    # Generate lognormal population
    generated_pop = np.random.lognormal(mu, sigma , n)
    
    # Convert single sample (if n=1) to a float, otherwise leave as array
    generated_pop = \
        generated_pop[0] if len(generated_pop) == 1 else generated_pop
        
    return generated_pop

Test the function

We will generate a population of 100,000 samples with a given mean and standard deviation (these would be calculated on the non-logged population), and test the resulting generated population has the same mean and standard deviation.

mean = 10
stdev = 10
generated_pop = generate_lognormal_samples(mean, stdev, 100000)
print ('Mean:', generated_pop.mean())
print ('Standard deviation:', generated_pop.std())

Out:

Mean: 10.043105926813356
Standard deviation: 9.99527575740651

Plot a histogram of the generated population:

import matplotlib.pyplot as plt
%matplotlib inline
bins = np.arange(0,51,1)
plt.hist(generated_pop, bins=bins)
plt.show()

Generating a single sample

The function will return a single number if no n is given in the function call:

print (generate_lognormal_samples(mean, stdev))

Out: 6.999376449335125

119: Optimising scikit-learn machine learning models with grid search or randomized search

Machine learning models have many hyper-parameters (parameters set before a model is fitted, and which remain constant throughout model fitting). Optimising model hyper-parameters may involve many model runs with alternative hyper-parameters. In SciKit-Learn, this may be performed in an automated fashion using Grid Search (which explores all combinations of provided hyper-parameters) or Randomized Search (which randomly selects combinations to test).

Grid search and randomized search will perform this optimisation using k-fold validation which avoids potential bias in training/test splits.

Here we will revisit a previous example of machine learning, using Random Forests to predict whether a person has breast cancer. We will then use Grid Search to optimise performance, using the ‘f1’ performance score (https://en.wikipedia.org/wiki/F1_score) as an accuracy score that balances the importance of false negatives and false positives.

First we will look at how we previously built the Random Forests model.

(See https://pythonhealthcare.org/2018/04/17/72-machine-learning-random-forests/ for previous post on Random Forest method)

# import required modules

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd

def calculate_diagnostic_performance (actual_predicted):
    """ Calculate diagnostic performance.
    
    Takes a Numpy array of 1 and zero, two columns: actual and predicted
    
    Note that some statistics are repeats with different names
    (precision = positive_predictive_value and recall = sensitivity).
    Both names are returned
    
    Returns a dictionary of results:
        
    1) accuracy: proportion of test results that are correct    
    2) sensitivity: proportion of true +ve identified
    3) specificity: proportion of true -ve identified
    4) positive likelihood: increased probability of true +ve if test +ve
    5) negative likelihood: reduced probability of true +ve if test -ve
    6) false positive rate: proportion of false +ves in true -ve patients
    7) false negative rate:  proportion of false -ves in true +ve patients
    8) positive predictive value: chance of true +ve if test +ve
    9) negative predictive value: chance of true -ve if test -ve
    10) precision = positive predictive value 
    11) recall = sensitivity
    12) f1 = (2 * precision * recall) / (precision + recall)
    13) positive rate = rate of true +ve (not strictly a performance measure)
    """
    # Calculate results
    actual_positives = actual_predicted[:, 0] == 1
    actual_negatives = actual_predicted[:, 0] == 0
    test_positives = actual_predicted[:, 1] == 1
    test_negatives = actual_predicted[:, 1] == 0
    test_correct = actual_predicted[:, 0] == actual_predicted[:, 1]
    accuracy = np.average(test_correct)
    true_positives = actual_positives & test_positives
    true_negatives = actual_negatives & test_negatives
    sensitivity = np.sum(true_positives) / np.sum(actual_positives)
    specificity = np.sum(true_negatives) / np.sum(actual_negatives)
    positive_likelihood = sensitivity / (1 - specificity)
    negative_likelihood = (1 - sensitivity) / specificity
    false_positive_rate = 1 - specificity
    false_negative_rate = 1 - sensitivity
    positive_predictive_value = np.sum(true_positives) / np.sum(test_positives)
    negative_predictive_value = np.sum(true_negatives) / np.sum(test_negatives)
    precision = positive_predictive_value
    recall = sensitivity
    f1 = (2 * precision * recall) / (precision + recall)
    positive_rate = np.mean(actual_predicted[:,1])
    
    # Add results to dictionary
    performance = {}
    performance['accuracy'] = accuracy
    performance['sensitivity'] = sensitivity
    performance['specificity'] = specificity
    performance['positive_likelihood'] = positive_likelihood
    performance['negative_likelihood'] = negative_likelihood
    performance['false_positive_rate'] = false_positive_rate
    performance['false_negative_rate'] = false_negative_rate
    performance['positive_predictive_value'] = positive_predictive_value
    performance['negative_predictive_value'] = negative_predictive_value
    performance['precision'] = precision
    performance['recall'] = recall
    performance['f1'] = f1
    performance['positive_rate'] = positive_rate

    return performance

def load_data ():
    """Load the data set. Here we load the Breast Cancer Wisconsin (Diagnostic)
    Data Set. Data could be loaded from other sources though the structure
    should be compatible with this data set, that is an object with the 
    following attributes:
        .data (holds feature data)
        .feature_names (holds feature titles)
        .target_names (holds outcome classification names)
        .target (holds classification as zero-based number)
        .DESCR (holds text-based description of data set)"""
    
    data_set = datasets.load_breast_cancer()
    return data_set

def normalise (X_train,X_test):
    """Normalise X data, so that training set has mean of zero and standard
    deviation of one"""
    
    # Initialise a new scaling object for normalising input data
    sc=StandardScaler() 
    # Set up the scaler just on the training set
    sc.fit(X_train)
    # Apply the scaler to the training and test sets
    X_train_std=sc.transform(X_train)
    X_test_std=sc.transform(X_test)
    return X_train_std, X_test_std


def print_diagnostic_results (performance):
    """Iterate through, and print, the performance metrics dictionary"""
    
    print('\nMachine learning diagnostic performance measures:')
    print('-------------------------------------------------')
    for key, value in performance.items():
        print (key,'= %0.3f' %value) # print 3 decimal places
    return

def print_feaure_importances (model, features):
    print ()
    print ('Feature importances:')
    print ('--------------------')
    df = pd.DataFrame()
    df['feature'] = features
    df['importance'] = model.feature_importances_
    df = df.sort_values('importance', ascending = False)
    print (df)
    return

def split_data (data_set, split=0.25):
    """Extract X and y data from data_set object, and split into training and
    test data. Split defaults to 75% training, 25% test if not other value 
    passed to function"""
    
    X=data_set.data
    y=data_set.target
    X_train,X_test,y_train,y_test=train_test_split(
        X,y,test_size=split, random_state=0)
    return X_train,X_test,y_train,y_test

def test_model(model, X, y):
    """Return predicted y given X (attributes)"""
    
    y_pred = model.predict(X)
    test_results = np.vstack((y, y_pred)).T
    return test_results

def train_model (X, y):
    """Train the model. Note n_jobs=-1 uses all cores on a computer"""
    from sklearn.ensemble import RandomForestClassifier
    model = RandomForestClassifier(n_jobs=-1)
    model.fit (X,y)
    return model

###### Main code #######

# Load data
data_set = load_data()

# Split data into training and test sets
X_train,X_test,y_train,y_test = split_data(data_set, 0.25)

# Normalise data (not needed for Random Forests)
# X_train_std, X_test_std = normalise(X_train,X_test)

# Train model
model = train_model(X_train, y_train)

# Produce results for test set
test_results = test_model(model, X_test, y_test)

# Measure performance of test set predictions
performance = calculate_diagnostic_performance(test_results)

# Print performance metrics
print_diagnostic_results(performance)
Out: Machine learning diagnostic performance measures:

accuracy = 0.951
sensitivity = 0.944
specificity = 0.962
positive_likelihood = 25.028
negative_likelihood = 0.058
false_positive_rate = 0.038
false_negative_rate = 0.056
positive_predictive_value = 0.977
negative_predictive_value = 0.911
precision = 0.977
recall = 0.944
f1 = 0.960
positive_rate = 0.608

Optimise with grid search

NOTE: Grid search may take considerable time to run!

Grid search enables us to perform an exhaustive search of hyper-parameters (those model parameters that are constant in any one model). We define which hyper-parameters we wish to change, and what values we wish to try. All combinations are tested. Test are performed using k-fold validation which re-runs the model with different train/test splits (this avoids bias in our train/test split, but does increase the time required). You may wish to time small grid search first, so you have a better idea of how many parameter combinations you can realistically look at.

We pass four arguments to the grid search method:

1) The range of values for the hyper-parameters, defined in a dictionary 2) The machine learning model to use 3) The number of k-fold splits to use (cv); a value of 5 will give five 80:20 training/test splits with each sample being present in the test set once 4) The accuracy score to use. In a classification model ‘accuracy’ is common. For a regression model using scoring='neg_mean_squared_error' is common (for grid search an accuracy score must be a ‘utility function’ rather than a ‘cost function’, that is, higher values are better).

If the best model uses a value at one extreme of the provided hyper-paramter ranges then it is best to expand the range of that hyper-paraemter to be sure an optimum has been found.

More info on grid search: https://scikit-learn.org/stable/modules/grid_search.html

An alternative approach is randomised hyper-parameter searching. See https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html

# Use Grid search to optimise
# n_jobs is set to -1 to use all cores on the CPU

from sklearn.model_selection import GridSearchCV
param_grid = {'n_estimators': [10, 30, 100, 300, 1000, 3000],
              'bootstrap': [True, False],
              'min_samples_split': [2, 4, 6, 8, 10],
              'n_jobs': [-1]}

# Grid search will use k-fold cross-validation (CV is number of splits)
# Grid search also needs a ultility function (higher is better) rather than
# a cost function (lower is better) so use neg square mean error

from sklearn.ensemble import RandomForestClassifier
forest_grid = RandomForestClassifier()
grid_search = GridSearchCV(forest_grid, param_grid, cv=10,
                           scoring='accuracy')

grid_search.fit(X_train, y_train); #';' suppresses printed output

Show optimised model hyper-parameters:

# show best parameters
# If best parameters are at the extremes of the searches then extend the range

grid_search.best_params_

Out:

{'bootstrap': True, 'min_samples_split': 6, 'n_estimators': 30, 'n_jobs': -1}
# Or, show full description
grid_search.best_estimator_

Out:

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=None, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=6,
            min_weight_fraction_leaf=0.0, n_estimators=30, n_jobs=-1,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)

Now we will use the optimised model. We could use the text above (from the output of grid_search.best_estimator_, or we can use grid_search.best_estimator_ directly.

# Use optimised model
model = grid_search.best_estimator_
model.fit (X_train, y_train);

Test optimised model:

test_results = test_model(model, X_test, y_test)

# Measure performance of test set predictions
performance = calculate_diagnostic_performance(test_results)

# Print performance metrics
print_diagnostic_results(performance)

Out:

Machine learning diagnostic performance measures:

accuracy = 0.972
sensitivity = 0.967
specificity = 0.981
positive_likelihood = 51.233
negative_likelihood = 0.034
false_positive_rate = 0.019
false_negative_rate = 0.033
positive_predictive_value = 0.989
negative_predictive_value = 0.945
precision = 0.989
recall = 0.967
f1 = 0.978
positive_rate = 0.615

Our accuracy has now increased from 95.1% to 97.2%.

When the number of parameter combinations because unreasonable large for grid search, and alternative is to use random search, which will select parameters randomly from the ranges given. The number of combinations tried is given by the argument n_iter.

Below is an example where we expand the number of arguments varied (becoming too large for grid search) and use random search to test 50 different samples.

For more details see https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html

## Use Grid search to optimise
# n_jobs is set to -1 to use all cores on the CPU

from sklearn.model_selection import RandomizedSearchCV

param_grid = {'n_estimators': [10, 30, 100, 300, 1000, 3000],
              'bootstrap': [True, False],
              'min_samples_split': range(2,11),
              'max_depth': range(1,30),
              'min_samples_split': [2, 4, 6, 8, 10],
              'n_jobs': [-1]}

n_iter_search = 50

from sklearn.ensemble import RandomForestClassifier
forest_grid = RandomForestClassifier()
random_search = RandomizedSearchCV(forest_grid, param_grid, cv=10,
                           n_iter=n_iter_search, scoring='accuracy')

random_search.fit(X_train, y_train); #';' suppresses printed output
# show best parameters
# If best parameters are at the extremes of the searches then extend the range

random_search.best_params_

Out:

{'n_jobs': -1,
 'n_estimators': 100,
 'min_samples_split': 2,
 'max_depth': 29,
 'bootstrap': False}
# Or, show full description
random_search.best_estimator_

Out:

RandomForestClassifier(bootstrap=False, class_weight=None, criterion='gini',
            max_depth=29, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=-1,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)

Now we will train a model with the optimized model hyper-parameters, and test against the test set.

# Use optimised model
model = random_search.best_estimator_
model.fit (X_train, y_train);

test_results = test_model(model, X_test, y_test)

# Measure performance of test set predictions
performance = calculate_diagnostic_performance(test_results)

# Print performance metrics
print_diagnostic_results(performance)

Out:

Machine learning diagnostic performance measures:

accuracy = 0.986
sensitivity = 0.989
specificity = 0.981
positive_likelihood = 52.411
negative_likelihood = 0.011
false_positive_rate = 0.019
false_negative_rate = 0.011
positive_predictive_value = 0.989
negative_predictive_value = 0.981
precision = 0.989
recall = 0.989
f1 = 0.989
positive_rate = 0.629

So though random search does not explore all combinations, because we can increase the number of parameters to explore, comapred with grid search, we have increased our accuracy to 98.6%

118: Python basics – saving python objects to disk with pickle

Sometimes we may wish to save Python objects to disc (for example if we have performed a lot of processing to get to a certain point). We can use Python’s pickle method to save and reload any Python object. Here we will save and reload a NumPy array, and then save and reload a collection of different objects.

Saving a single python object

Here we will use pickle to save a single object, a NumPy array.

import pickle 
import numpy as np

# Create array of random numbers:
my_array= np.random.rand(2,4)
print (my_array)

Out:

[[0.6383297  0.45250192 0.09882854 0.84896196]
 [0.97006917 0.29206495 0.92500062 0.52965801]]
# Save using pickle
filename = 'pickled_array.p'
with open(filename, 'wb') as filehandler:
    pickle.dump(my_array, filehandler)

Reload and print pickled array:

filename = 'pickled_array.p'
with open(filename, 'rb') as filehandler: 
    reloaded_array = pickle.load(filehandler)

print ('Reloaded array:')
print (reloaded_array)

Out:

Reloaded array:
[[0.6383297  0.45250192 0.09882854 0.84896196]
 [0.97006917 0.29206495 0.92500062 0.52965801]]

Using a tuple to save multiple objects

We can use pickle to save a collection of objects grouped together as a list, a dictionary, or a tuple. Here we will save a collection of objects as a tuple.

# Create an array, a list, and a dictionary
my_array = np.random.rand(2,4)
my_list =['A', 'B', 'C']
my_dictionary = {'name': 'Bob', 'Age': 42}
# Save all items in a tuple
items_to_save = (my_array, my_list, my_dictionary)
filename = 'pickled_tuple_of_objects.p'
with open(filename, 'wb') as filehandler:
    pickle.dump(items_to_save, filehandler)

Reload pickled tuple, unpack the objects, and print them.

filename = 'pickled_tuple_of_objects.p'
with open(filename, 'rb') as filehandler:
    reloaded_tuple = pickle.load(filehandler)


reloaded_array = reloaded_tuple[0]
reloaded_list = reloaded_tuple[1]
reloaded_dict = reloaded_tuple[2]

print ('Reloaded array:')
print (reloaded_array)
print ('\nReloaded list:')
print (reloaded_list)
print ('\n Reloaded dictionary')
print (reloaded_dict)

Out:

Reloaded array:
[[0.40193978 0.55173167 0.89411291 0.84625061]
 [0.86540981 0.27835353 0.43359222 0.31579122]]

Reloaded list:
['A', 'B', 'C']

 Reloaded dictionary
{'name': 'Bob', 'Age': 42}