dask.distributed is a cool library for doing distributed execution. You should check it out, if you haven't already.
In many cases, dask.distributed should replace using IPython Parallel if you primarily use the LoadBalancedView.
However, you may already have infrastructure for deploying and managing IPython engines, and IPython Parallel's interactive debugging features can still be useful.
Any IPython cluster can become a dask cluster at any time, and be used simultaneously via both APIs.
You can turn your IPython cluster into a distributed cluster by calling Client.become_dask():
import ipyparallel as ipp
rc = ipp.Cluster(n=4).start_and_connect_sync()
dask_client = rc.become_dask(ncores=1)
dask_client
This will:
By default, distributed Workers will use threads to run on all cores of a machine.
In this case, since I already have one engine per core,
I tell distributed to run one core per Worker with ncores=1.
We can now use our IPython cluster with distributed:
from distributed import progress
def square(x):
return x ** 2
def neg(x):
return -x
A = dask_client.map(square, range(1000))
B = dask_client.map(neg, A)
total = dask_client.submit(sum, B)
progress(total)
total.result()
I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.
First, I need to get a mapping of one engine per host:
import socket
engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()
engine_hosts
I can reverse this mapping, to get a list of engines on each host:
host_engines = {}
for engine_id, host in engine_hosts.items():
if host not in host_engines:
host_engines[host] = []
host_engines[host].append(engine_id)
host_engines
Now I can get one engine per host:
one_engine_per_host = [engines[0] for engines in host_engines.values()]
one_engine_per_host
Here's a concise, but more opaque version that does the same thing:
one_engine_per_host = list({host: eid for eid, host in engine_hosts.items()}.values())
one_engine_per_host
I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:
rc.stop_distributed()
dask_client = rc.become_dask(one_engine_per_host)
dask_client
And submit the same tasks again:
A = dask_client.map(square, range(100))
B = dask_client.map(neg, A)
total = dask_client.submit(sum, B)
progress(total)
rc.stop_distributed()
dask_client = rc.become_dask(one_engine_per_host)
dask_client
Let's set the %px magics to only run on our one engine per host:
view = rc[one_engine_per_host]
view.block = True
view.activate()
Let's submit some work that's going to fail somewhere in the middle:
from IPython.display import display
from distributed import progress
def shift5(x):
return x - 5
def inverse(x):
return 1 / x
shifted = dask_client.map(shift5, range(1, 10))
inverted = dask_client.map(inverse, shifted)
total = dask_client.submit(sum, inverted)
display(progress(total))
total.result()
We can see which task failed:
[f for f in inverted if f.status == "error"]
When IPython starts a worker on each engine,
it stores it in the distributed_worker variable in the engine's namespace.
This lets us query the worker interactively.
We can check out the current data resident on each worker:
%%px
dask_worker.data
Now that we can poke around with each Worker, we can have a slightly easier time figuring out what went wrong.