Load-balancing with IPython.parallel

Basic imports

In [1]:
import os,sys,time
import numpy as np

from IPython.display import display
from IPython import parallel
rc = parallel.Client()
dview = rc[:]

This time, we create a LoadBalancedView

In [2]:
lview = rc.load_balanced_view()
lview
Out[2]:
<LoadBalancedView None>

LoadBalancedViews behave very much like a DirectView on a single engine:

Each call to apply() results in a single remote computation, and the result (or AsyncResult) of that call is returned directly, rather than in a list, as in the multi-engine DirectView.

In [3]:
e0 = rc[0]
In [6]:
from numpy.linalg import norm
A = np.linspace(0, 2*np.pi, 1024)

e0.apply_sync(norm, A, 2)
Out[6]:
116.1115241640244
In [7]:
lview.apply_sync(norm, A, 2)
Out[7]:
116.1115241640244

However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:

In [8]:
e0.apply_sync(os.getpid)
Out[8]:
74967
In [9]:
for i in range(2*len(rc.ids)):
    print lview.apply_sync(os.getpid)
74968
74973
74967
74974
74968
74973
74967
74974

Map

The LoadBalancedView also has a load-balanced version of the builtin map()

In [10]:
lview.block = True

serial_result   =       map(lambda x:x**10, range(32))
parallel_result = lview.map(lambda x:x**10, range(32))

serial_result == parallel_result
Out[10]:
True

Just like apply(), you can use non-blocking map with block=False or map_async

In [11]:
amr = lview.map_async(lambda x:x**10, range(32))
amr
Out[11]:
<AsyncMapResult: <lambda>>

Map results are iterable!

In [12]:
for n in amr:
    print n
0
1
1024
59049
1048576
9765625
60466176
282475249
1073741824
3486784401
10000000000
25937424601
61917364224
137858491849
289254654976
576650390625
1099511627776
2015993900449
3570467226624
6131066257801
10240000000000
16679880978201
26559922791424
41426511213649
63403380965376
95367431640625
141167095653376
205891132094649
296196766695424
420707233300201
590490000000000
819628286980801

AsyncResults with multiple results are actually iterable before their results arrive.

This means that you can perform map/reduce operations on elements as they come in:

In [13]:
lview.block = False
In [14]:
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print dv['id']

# create a Reference to `id`. This will be a different value on each engine
ref = parallel.Reference('id')

tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
    print "%i: %.3f"%(i, time.time()-tic)
    sys.stdout.flush()
[0, 1, 2, 3]
0: 0.030
1: 1.017
2: 2.028
3: 3.026

Now we submit a bunch of tasks of increasing magnitude, and watch where they happen, iterating through the results as they come.

In [15]:
def sleep_here(t):
    """sleep here for a time, return where it happened"""
    import time
    time.sleep(t)
    return id

amr = lview.map(sleep_here, [.01*t for t in range(32)])
tic = time.time()
for i,r in enumerate(amr):
    print "task %i on engine %i: %.3f" % (i, r, time.time()-tic)
    sys.stdout.flush()
task 0 on engine 2: 0.001
task 1 on engine 3: 0.001
task 2 on engine 1: 0.001
task 3 on engine 0: 0.002
task 4 on engine 2: 0.043
task 5 on engine 3: 0.043
task 6 on engine 1: 0.096
task 7 on engine 0: 0.108
task 8 on engine 2: 0.110
task 9 on engine 3: 0.179
task 10 on engine 1: 0.205
task 11 on engine 0: 0.207
task 12 on engine 2: 0.245
task 13 on engine 3: 0.307
task 14 on engine 1: 0.361
task 15 on engine 0: 0.370
task 16 on engine 2: 0.459
task 17 on engine 3: 0.488
task 18 on engine 1: 0.544
task 19 on engine 0: 0.582
task 20 on engine 2: 0.670
task 21 on engine 3: 0.719
task 22 on engine 1: 0.777
task 23 on engine 0: 0.829
task 24 on engine 2: 0.923
task 25 on engine 3: 0.990
task 26 on engine 1: 1.051
task 27 on engine 0: 1.107
task 28 on engine 2: 1.205
task 29 on engine 3: 1.298
task 30 on engine 1: 1.355
task 31 on engine 0: 1.441

Unlike DirectView.map(), which always results in one task per engine, LoadBalance map defaults to one task per item in the sequence. This can be changed by specifying the chunksize keyword arg.

In [17]:
amr = lview.map(sleep_here, [.01*t for t in range(32)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
    print "task %i on engine %i: %.3f" % (i, r, time.time()-tic)
    sys.stdout.flush()
task 0 on engine 2: 0.087
task 1 on engine 2: 0.088
task 2 on engine 2: 0.088
task 3 on engine 2: 0.089
task 4 on engine 3: 0.255
task 5 on engine 3: 0.256
task 6 on engine 3: 0.256
task 7 on engine 3: 0.257
task 8 on engine 1: 0.425
task 9 on engine 1: 0.425
task 10 on engine 1: 0.426
task 11 on engine 1: 0.427
task 12 on engine 0: 0.576
task 13 on engine 0: 0.577
task 14 on engine 0: 0.577
task 15 on engine 0: 0.578
task 16 on engine 2: 0.798
task 17 on engine 2: 0.798
task 18 on engine 2: 0.799
task 19 on engine 2: 0.799
task 20 on engine 3: 1.124
task 21 on engine 3: 1.125
task 22 on engine 3: 1.125
task 23 on engine 3: 1.126
task 24 on engine 1: 1.452
task 25 on engine 1: 1.452
task 26 on engine 1: 1.453
task 27 on engine 1: 1.453
task 28 on engine 0: 1.764
task 29 on engine 0: 1.764
task 30 on engine 0: 1.765
task 31 on engine 0: 1.765

Example

Parallelize nested loops

Often we want to run a function with a variety of combinations of arguments. A useful skill is the ability to express a nested loop in terms of a map.

In [15]:
def area(w,h):
    return w*h


widths = range(1,4)
heights = range(6,10)

areas = []
for w in widths:
    for h in heights:
        areas.append(area(w,h))
areas
Out[15]:
[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]

`itertools.product` and `zip` will be helpful.

In [17]:
%loadpy soln/nestedloop.py

Validate the result:

In [19]:
print areas
print list(ar)
areas == list(ar)
[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]
[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]
Out[19]:
True

Further Information

Load-balancing provides you with lots of control that we don't have time to discuss

  • Full DAG task dependency (in time and/or destination)
  • Functional dependencies to confine tasks to engines with appropriate capabilities
  • Specify subsets of engines for each task
  • Scheduling timeouts
  • Retries of failed task
  • TaskScheduler.hwm for greedy assignment of tasks to hide network latency behind computation
  • and more!