Data movement is where IPython's naive model suffers the most.
But knowing about your cluster lets you make smarter decisions about data movement than a simple rc[:].push.
I ran this example with a cluster on a 64-core remote VM, so communication between the client and controller is over the public internet, while communication between the controller and engines is local.
This is an example of 'broadcasting' a numpy array using memmapped files, to reduce the amount of expensive network traffic when several engines are on the same host.
import socket
import os, sys, re
import numpy as np
import ipyparallel as ipp
rc = ipp.Client(profile="mpi")
eall = rc.broadcast_view(coalescing=True)
First, create a map of engine id to hostname
engine_hosts = eall.apply_async(socket.gethostname).get_dict()
engine_hosts
Next, reverse that to create a map of hostname to engine ids
host_engines = {}
for eid, host in engine_hosts.items():
if host not in host_engines:
host_engines[host] = []
host_engines[host].append(eid)
host_engines
Now we can measure our baseline overhead: how long does it take to roundrip an empty task on all engines. We shouldn't expect anything to take less time than this.
%time _ = eall.apply_sync(lambda : None)
Now let's look at how long it takes to send data in the simplest possible way
import numpy as np
data = np.random.random((512, 512))
%%time
ar = rc[:].push({'data': data}, block=False)
ar.wait_interactive()
Here we get to the broadcast implementation. Instead of seinding the array directly to every engine via IPP, we:
This results in the same data to all engines, but only one send per remote host instead of per remote engine.
%px import numpy as np
@ipp.interactive
def array_to_file(A, name):
"""write an array to a temporary file, return its filename"""
import tempfile
with tempfile.NamedTemporaryFile(suffix='.np', delete=False) as tf:
np.save(tf, A)
data_path = tf.name
if name:
globals()[name] = data_path
return data_path
@ipp.interactive
def load_memmap(name, path, mode='r+'):
"""load a file on disk into the interactive namespace as a memmapped array"""
globals()[name] = np.memmap(path, mode=mode)
def bcast_memmap(data, name, client, host_engines):
"""broadcast a numpy array efficiently
- sends data to each remote host only once
- loads with memmap everywhere
"""
# actually push the data, just once to each machine
memmap_path_name = f"_bcast_array_{name}"
one_per_host = rc.broadcast_view([engines[0] for engines in host_engines.values()], coalescing=True)
send_ar = one_per_host.apply_async(array_to_file, data, name=memmap_path_name)
# load the data on all engines into a memmapped array
async_results = []
e_all = rc.broadcast_view(coalescing=True)
return e_all.apply_async(load_memmap, name, ipp.Reference(memmap_path_name))
%%time
ar = bcast_memmap(data, 'data', rc, host_engines)
ar.wait_interactive()
So that's a lot quicker! And a lot less memory used in both the client and the scheduler.
%px np.linalg.norm(data, 2)
You can also do the same thing with MPI.