KNN - ML from Scratch - Part 1

Gurudev Ilangovan

June 18, 2019


For my Machine Learing class at A&M, I implemented common ML algorithms from the ground up. It’s amazing how easy it is to implement an ML technique in python. I thought that I’d share my implementations in a series of short blog posts (I am new to python, so please cut me some slack.). In my opinion, there are three main advantages to this endeavour:

  • The understanding we get when we implement something from scratch is unmatched. A lot of the times, we think we understand a concept but when we implement it, we get a slew of doubts, thereby forcing us to really get the details right.
  • It helps us appreciate the design of libraries better and understand why things are designed a certain way. It helps truly get the API of a library with enduring success like scikit-learn.
  • The satisfaction when we see the identical results convinces, that these libraries do not do something like the algorithm we study but actually implement the very algorithm we study, only better.

Techniques and Design

In this blog, I am sharing the code for

  1. A small module to scale training and testing dataset features, the right way.
  2. A K Nearest Neighbour classifier

Typically the features are converted to the same scale before we use a distance based classifier like KNN to classify. This is to ensure that all features are given equal importance. The standardizing can be done by min-max scaling or z-scaling.

In a package like scikit learn, each step is modularized. We can then aggregate multiple steps using pipelines (See last section). This means that if we do not scale features manually, the algorithm executes KNN without translating all features to the same scale. I, however, decided to make scaling part of my KNN.

Also when I watched Jeremy Howard’s excellent ML course, I found one of his tips extremely useful. He keeps a function simple and keeps calling another function and passing the buck to it. Eventually, the code just writes itself. Very sage advice!

Setup (Libraries and Data)

import pandas as pd
import numpy as np
import scipy
import collections

Since this series of posts concentrates on the implementations, I won’t delve into the data and results per se. I’ll just briefly explain the dataset and the classification task for demonstration purposes. The dataset shows the response - area (in square feet) affected by volcanos and different features corresponding to it. We can change this to a classification problem by predicting whether the area was affected by volcanoes or not (area > 0).

df_train_org = pd.read_csv("../data/ml_scratch_1_train.csv")
df_test_org = pd.read_csv("../data/ml_scratch_1_test.csv")

tr_X = df_train_org.loc[:, df_train_org.columns != 'area']
ts_X = df_test_org.loc[:, tr_X.columns]
tr_y = (df_train_org['area'] > 0)*1
ts_y = (df_test_org['area'] > 0)*1
   X  Y  month  day  FFMC   DMC     DC  ISI  temp  RH  wind  rain  area
0  7  5      3    5  86.2  26.2   94.3  5.1   8.2  51   6.7   0.0   0.0
1  7  4     10    2  90.6  35.4  669.1  6.7  18.0  33   0.9   0.0   0.0
2  7  4     10    6  90.6  43.7  686.9  6.7  14.6  33   1.3   0.0   0.0
3  8  6      3    5  91.7  33.3   77.5  9.0   8.3  97   4.0   0.2   0.0
4  8  6      3    7  89.3  51.3  102.2  9.6  11.4  99   1.8   0.0   0.0


This is a basic template for a standardizer that can do both Z-scaling and min-max scaling. The advantage of having such a standardizer is that we can store the mean, sd, min and max values of the training set and use it when we standardize the test set. This is very important as we cannot standardize the test set using the test set summary stats.

class Standardize():

    def __init__(self, X):
        self.X = X

    def normalize_mappings(self):
        mu = self.X.mean()
        sd = self.X.std()
        return {'mu': mu, 'sd': sd}

    def min_max_mappings(self):
        mins = self.X.min()
        maxs = self.X.max()
        return {'mins': mins, 'maxs': maxs}

    def standardize(self, method="normalize", X=None):
        if X is None:
            X = self.X
        if method == "normalize":
            X = (X - self.normalize_mappings['mu'])/self.normalize_mappings['sd']
        elif method == "min_max":
            X = ((X - self.min_max_mappings['mins'])
                 / (self.min_max_mappings['maxs'] - self.min_max_mappings['mins']))
        return X

K Nearest Neighbours

Creating a KNN classifier

We create a KNN classifier using 3 parameters:

  1. Number of nearest neighbours, k.
  2. When we calculate the distance, the norm to be used, norm.
  3. The standardization to be used, standardization

Fitting a classifier

The fit method for a KNN classifier is fairly minimal. We

  1. Create (and store) a standarizer using the training data passed.
  2. Standardize the training data using the stored standardizer and store the standardized training data.
  3. Store the labels


The predict method is the most important method because this is where we need to do most work. The steps are listed out:

  1. First, we standardize the test data using the standardizer we stored during the fit procedure. This ensures, test data is standardized according to training data mappings.
  2. Next, we get the distances of test data points from all training data points (dist_matrix). This distance matrix has dimensions n_test x n_train. We use a scipy function called distance_matrix for this and we pass the norm to this function. I tried implementing my own distance matrix and though, I got the right answer, the process was painfully slow.
  3. Once we get the distance matrix, for each observation in test data, we find the indices of the k closest training observations (get_nearest_neighbour_indices).
  4. Since we have the labels for all indices, for each test data observtion, we count the labels among the k closest neighbours (count_votes).
  5. Once we count the votes for each observation, we just have to pick the class with the most votes (tag_winner). In case of a tie, ideally a class should be picked at random but in the current implementaion, the first class returned by collections.Counter is returned. For KNNs, it anyway better to give an odd k, so I didn’t spend too much time on it.
  6. Thus for each test point, a class is returned.


This is a simple function to get the accuracy of the predicted values.


class Knn():

    def __init__(self, 
                 norm=2.0 ,

        self.k = k
        self.norm = norm
        self.standardization = standardization

    def fit(self, tr_X, tr_y):
        """Pass training data as X and y """
        self.standardizer = Standardize(X=tr_X)
        self.tr_X = self.standardizer.standardize(method=self.standardization)
        self.tr_y = tr_y

    def predict(self, ts_X):
        ts_X = self.standardizer.standardize(
            method=self.standardization, X=ts_X)
        d = self._dist_matrix(ts_X)
        neighbours = self._get_nearest_neighbour_indices(d)
        counts = self._count_votes(neighbours)
        return self._tag_winner(counts)

    def _dist_matrix(self, ts_X):
        dists = scipy.spatial.distance_matrix(ts_X, self.tr_X, p=self.norm)
        dists = np.vstack(dists)
        return dists

    def _get_nearest_neighbour_indices(self, dist_matrix):
        return np.argpartition(dist_matrix, kth=self.k, axis=1)[:, :self.k]

    def _count_votes(self, indice_matrix):
        return np.apply_along_axis(lambda x: collections.Counter(self.tr_y.iloc[x]), 1, indice_matrix)

    def _tag_winner(self, vote_mat):
        count_max = np.vectorize(lambda x: x.most_common(1)[0][0])
        return count_max(vote_mat)

    def score(self, ts_X, ts_y):
        return (ts_y == self.predict(ts_X)).mean()

    def __repr__(self):
        return (f'Knn(k={self.k}, norm={self.norm}, standardization="{self.standardization}"')
model_knn = Knn(k=13, norm=2, standardization="min_max"), tr_y)
model_knn.score(ts_X, ts_y)

Verifying the results using scikit-learn

from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MinMaxScaler, Normalizer
from sklearn import pipeline
model_std_knn = pipeline.Pipeline([('scaling', MinMaxScaler()),
                       ('knn', KNeighborsClassifier(n_neighbors=13))]), tr_y)
model_std_knn.score(ts_X, ts_y)

As we can see, the results match!

Thank you for reading. Comments welcome!

comments powered by Disqus