Basic imports
import os,sys,time
import numpy as np
from IPython.display import display
from IPython import parallel
rc = parallel.Client()
dview = rc[:]
This time, we create a LoadBalancedView
lview = rc.load_balanced_view()
lview
LoadBalancedViews behave very much like a DirectView on a single engine:
Each call to apply() results in a single remote computation,
and the result (or AsyncResult) of that call is returned directly,
rather than in a list, as in the multi-engine DirectView.
e0 = rc[0]
from numpy.linalg import norm
A = np.linspace(0, 2*np.pi, 1024)
e0.apply_sync(norm, A, 2)
lview.apply_sync(norm, A, 2)
However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:
e0.apply_sync(os.getpid)
for i in range(2*len(rc.ids)):
print lview.apply_sync(os.getpid)
The LoadBalancedView also has a load-balanced version of the builtin map()
lview.block = True
serial_result = map(lambda x:x**10, range(32))
parallel_result = lview.map(lambda x:x**10, range(32))
serial_result == parallel_result
Just like apply(), you can use non-blocking map with block=False or map_async
amr = lview.map_async(lambda x:x**10, range(32))
amr
for n in amr:
print n
AsyncResults with multiple results are actually iterable before their results arrive.
This means that you can perform map/reduce operations on elements as they come in:
lview.block = False
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print dv['id']
# create a Reference to `id`. This will be a different value on each engine
ref = parallel.Reference('id')
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
print "%i: %.3f"%(i, time.time()-tic)
sys.stdout.flush()
Now we submit a bunch of tasks of increasing magnitude, and watch where they happen, iterating through the results as they come.
def sleep_here(t):
"""sleep here for a time, return where it happened"""
import time
time.sleep(t)
return id
amr = lview.map(sleep_here, [.01*t for t in range(32)])
tic = time.time()
for i,r in enumerate(amr):
print "task %i on engine %i: %.3f" % (i, r, time.time()-tic)
sys.stdout.flush()
Unlike DirectView.map(), which always results in one task per engine,
LoadBalance map defaults to one task per item in the sequence. This
can be changed by specifying the chunksize keyword arg.
amr = lview.map(sleep_here, [.01*t for t in range(32)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
print "task %i on engine %i: %.3f" % (i, r, time.time()-tic)
sys.stdout.flush()
def area(w,h):
return w*h
widths = range(1,4)
heights = range(6,10)
areas = []
for w in widths:
for h in heights:
areas.append(area(w,h))
areas
%loadpy soln/nestedloop.py
Validate the result:
print areas
print list(ar)
areas == list(ar)
Load-balancing provides you with lots of control that we don't have time to discuss