Neal Hughes bio photo

Neal Hughes

Economist.
Public Toddler servant.

Email Facebook Github Stackoverflow Strava

One of the cool things about Cython is that it supports multi-threaded code, via OpenMP. While Python allows for message passing (i.e., multiple processes) shared memory (i.e., multi-threading) is not possible due to the Global Interpreter Lock (see this earlier post).

Relative to message passing, multi-threading is fast (and has lower memory requirements). The catch is that you can run into concurrency problems: where different threads need to access the same memory locations at the same time. As such, multi-threading is best suited to performing large numbers of simple calculations: where the order in which the calculations are executed doesn’t matter.

Recently, this post received some some discussion on stackoverflow. The main thing to remember is to place this line at the top of your Cython code as per this earlier post.

#cython: boundscheck=False, wraparound=False, nonecheck=False

The stackoverflow discussion also demonstrates how performance can improve with different load scheduling options. From my experience it is hard to predict which load scheduling method will achieve best performance and the differences often aren’t that great. However, in the below example it does make a difference, so it is probably a good idea to experiment with this in practice.

A simple Cython example

The perfect use case is applying a function element wise over a large array. Lets say we want to apply the function \( f\) to some array \( X\)

$$ f(x) = \begin{cases} e^x & \mbox{if } x > 0.5 \\ 0 & \mbox{if } otherwise \end{cases} $$

Below I have a Python version and a Cython version of \( f\) in the file thread_demo.pyx

import numpy as np
from math import exp 
from libc.math cimport exp as c_exp

def array_f(X):
    
    Y = np.zeros(X.shape)
    index = X > 0.5
    Y[index] = np.exp(X[index])

    return Y

def c_array_f(double[:] X):

    cdef int N = X.shape[0]
    cdef double[:] Y = np.zeros(N)
    cdef int i

    for i in range(N):
        if X[i] > 0.5:
            Y[i] = c_exp(X[i])
        else:
            Y[i] = 0

    return Y

Next we compile this file with a standard setup.py (like we did here). Now lets test these functions in IPython:

from thread_demo import *
import numpy as np
X = -1 + 2*np.random.rand(10000000) 
%timeit array_f(X)
1 loops, best of 3: 222 ms per loop
%timeit c_array_f(X)
10 loops, best of 3: 87.5 ms per loop

In this case, the Cython version is not dramatically faster than Python, because our function is simple enough to ‘vectorise’ via numpy.

Now to create a multi-threaded version of c_array_f we just need to replace range() in the first loop with the multi-treaded version prange() from cython.parallel:

from cython.parallel import prange

def c_array_f_multi(double[:] X):

    cdef int N = X.shape[0]
    cdef double[:] Y = np.zeros(N)
    cdef int i

    for i in prange(N, nogil=True):
        if X[i] > 0.5:
            Y[i] = c_exp(X[i])
        else:
            Y[i] = 0

    return Y

Too easy. This tells the compiler to run the loop across multiple CPU cores. In this case, we have no concurrency problems: the order in which the loop is executed doesn’t matter. Now we just need to make a few changes to our setup.py file, in order to compile our code with OpenMP:

from distutils.core import setup
from Cython.Build import cythonize
from distutils.extension import Extension
from Cython.Distutils import build_ext

ext_modules=[
    Extension("thread_demo",
              ["thread_demo.pyx"],
              libraries=["m"],
              extra_compile_args = ["-O3", "-ffast-math", "-march=native", "-fopenmp" ],
              extra_link_args=['-fopenmp']
              ) 
]

setup( 
  name = "thread_demo",
  cmdclass = {"build_ext": build_ext},
  ext_modules = ext_modules
)

Now lets give it a go

%timeit c_array_f_multi(X)
10 loops, best of 3: 22.4 ms per loop

So with four cores we get just under a 4 times speed up.

A few more details

So far I’ve glossed over a few important details. Here are some basic things to consider (for more see the documentation).

Firstly, notice the nogil argument in prange(N, nogil=True). In order to run multi-threaded code we need to turn off the GIL. This means that you can’t have Python code inside your multi-threaded loop or compilation will fail. It also means that any functions called inside the loop need to be defined nogil, for example:

cdef inline double f(double x) nogil:
    if x > 0.5:
        return c_exp(x)
    else:
        return 0

def c_array_f_multi(double[:] X):

    cdef int N = X.shape[0]
    cdef double[:] Y = np.zeros(N)
    cdef int i

    for i in prange(N, nogil=True):
        Y[i] = f(X[i])
    return Y

prange() takes a few other arguments including num_threads: which will default to the number of cores on your system and schedule: which has to do with load balancing. The simplest option here is ‘static’ which just breaks the loop into equal chunks. This is fine if all the steps compute in around the same time. If not, one thread may finish before the others leaving resources idle. In this case, you might try ‘dynamic’ (see the docs for detail).

The other key issue is memory management, that is working out which variables should be shared between threads and which should be private or ‘thread local’. The good thing with Cython is that all of this detail is - in true Python style - magically inferred from your code.

The basic idea is that variables that are only read from are shared, while variables assigned to are private. An important special case are ‘reduction’ variables. Any variable with an in-place operator (i.e., `+=’) is automatically taken to be a reduction variable, which means that the thread local values are combined after all threads have completed. This is useful if you need to compute a sum:

 def c_array_f_multi_sum(double[:] X):

    cdef int N = X.shape[0]
    cdef double Ysum = 0
    cdef int i

    for i in prange(N, nogil=True):
        Ysum += f(X[i])
    return Ysum

def array_f_sum(X):
    
    Y = np.zeros(X.shape)
    
    index = X > 0.5
    Y[index] = np.exp(X[index])

    return np.sum(Y)
from thread_demo import *
X = -1 + 2*np.random.rand(10000000) 
array_f_sum(X)
5349424.6891543046
c_array_f_multi_sum(X)
5349424.689154312

Note that if we instead put Ysum = Ysum + f(X[i]) we would have got a compiler error.

Radial Basis Function (RBF) example

Recall the problem of evaluating a RBF approximation scheme from this post. Below we try out a multi-threaded version:

def rbf_network_multithread(double[:, :] X,  double[:] beta, double theta):

    cdef int N = X.shape[0]
    cdef int D = X.shape[1]
    cdef double[:] Y = np.zeros(N)
    cdef int i, j, d
    cdef double r = 0

    for i in prange(N, nogil=True):
        for j in range(N):
            r = 0
            for d in range(D):
                r += (X[j, d] - X[i, d])**2 
            r = r**0.5
            Y[i] += beta[j] * c_exp(-(r * theta)**2)

    return Y
D = 5
N = 1000
X = np.array([np.random.rand(N) for d in range(D)]).T
beta = np.random.rand(N)
theta = 10
%timeit rbf_network(X, beta, theta)
10 loops, best of 3: 45.2 ms per loop
%timeit rbf_network_multi(X, beta, theta)
100 loops, best of 3: 8.67 ms per loop

For some reason this achieves a better than 4 times speed up!

Bottom line

Of course, the real challenge is in identifying / producing code segments that are suited to multi-threading. In many cases, blindly running a loop in parallel will either fail or lead to a decrease in performance if the threads get in each others way. But for simple applications at least, running multiple threads in Cython is as easy as adding prange() to your loops.