KISS

Keep It Simple Stupid

Simple concurrency example in python

| comments

Hello folks! It’s great to see you here!

Today’s article is about a simple concurrency example in python. Keep on reading, it’ll be fun…

As a short intro, I’m taking the Coursera’s Algorithms: Design and Analysis, Part 1 class now, and there are programming assignments to make sure you understand the algorithms. My language of choice is Python — easy syntax and a lot of features. To be more precise, I’m using python 3.

The assignment was to compute the minimal cut in a graph. The basic idea is to run the randomized contraction algorithm approximately n * n * log(n) times and select the minimum among the results. I’m not going to write about my implementation (and I can’t while the class is online) here, and just say that it takes about 8 minutes to run the algorithm n * log(n) times on one core.

Obviously, one of the solution is to use multiple threads to crunch the numbers concurrently. So, I’m opening the python doc on the threading module and recollect about the following:

CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once…

… which means CPython’s threads are not for us, we’ll go with multiprocessing then. The gist is very simple: our main process creates N child processes, each of which computes 1/N share of work and sends the result to the parent, which, when gathers all the results, produces the final result. Let N here be the number of cores available. Take a look at my source, with mocks instead of the actual implementation:

(python_concurrency.py) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3

import sys
from random import randrange
from math import log, ceil, trunc
import copy
from multiprocessing import cpu_count, Process, SimpleQueue

def minCut(g):
    '''Runs the random contraction algorithm and returns the min cut (# of
    edges) [MOCK].'''

    a = 1
    for i in range(1000):
        a *= 1.1

    return randrange(10, 100)


# message ids
MSG_PROGRESS, MSG_END = range(2)

def minCutIterative(g, n, queue=None):
    '''Calculates the min cut of the graph found by running n iterations.

    Returns the result by writing it to queue and by simply return. Also,
    reports the number of processed iterations after the latest report once
    in a while.
    '''

    min_cut = 10 ** 6
    last_report = -1
    report_period = randrange(15, 51)
    for i in range(0, n):
        if ((i % report_period) == 0) and queue:
            queue.put((MSG_PROGRESS, i - last_report))
            last_report = i

        mc = minCut(g)
        if mc < min_cut:
            min_cut = mc

    if queue:
        queue.put((MSG_PROGRESS, i - last_report))
        queue.put((MSG_END, min_cut))

    return min_cut


def main():
    g = {}
    n = 1000

    process_count = cpu_count()
    print('Found', process_count, 'cores')

    run_count = ceil(n ** 2)
    run_count_per_process = ceil(run_count / process_count)
    run_count = run_count_per_process * process_count
    print('Will run {} times ({} per process)'.format(run_count,
        run_count_per_process))

    # queue to get results from the subprocesses. since we don't care which
    # process got which value, we'll use one queue
    queue = SimpleQueue()
    plist = []
    # start worker processes
    for i in range(0, process_count):
        p = Process(target=minCutIterative, args=(g, run_count_per_process,
            queue))
        plist.append(p)
        p.start()

    print('Working, please wait…')

    total_iterations = 0
    min_cuts = []
    # monitor progress and get all the numbers
    while True:
        (msg, val) = queue.get()
        if msg == MSG_PROGRESS:
            total_iterations += val
            progress = trunc(total_iterations / run_count * 100)
            print('\r[{0:10s}] {1}%'.format('#' * (progress // 10), progress), end='')

        elif msg == MSG_END:
            min_cuts.append(val)
            if len(min_cuts) == process_count:
                break

    print('\nGathered min cuts:', min_cuts)
    min_cut = min(min_cuts)
    print('Final min cut:', min_cut)

    # join the child processes. even though it's not strictly necessary, the
    # multiprocessing programming guide advises us to join zombie processes
    for proc in plist:
        proc.join()

if __name__ == '__main__':
    main()

A run on one core takes 52 seconds, and only 15 seconds with 8 cores (actually, only 4 are physical ones). Now the CPU usage is 800% (on my real program):

To the code. The minCut function here just mocks the real work by multiplying a number multiple times. The main function prepares the data, determines the number of iterations for each subprocess, creates a SimpleQueue to get notified, starts the processes, repeatedly waits for data, and then prints the final result. Finally, the minCutIterative is the function that each of the child processes run.

The SimpleQueue is used here so that the workers can report about performed iterations once in a while. To distinguish between a progress message and the result message, MSG_PROGRESS and MSG_END constants are used, they are the first part of a message tuple. With this the main process gets the data and can draw a nice progress bar in the terminal:

1
[####      ] 44%

That’s it, really easy! If you have any questions/suggestions, please contact me.

Later!

Comments