IPython Parallel 7 adds a Cluster API for starting/stopping clusters.
This is the new implementation of ipcluster,
which can be more easily re-used in Python programs.
The ipcluster script is
Controllers and Engines are started with "Launchers", which are objects representing a running process.
Each Cluster has:
The combination of profile_dir and cluster_id uniquely identifies a cluster.
You can have many clusters in one profile, but each must have a distinct cluster id.
To create a cluster, instantiate a Cluster object:
import ipyparallel as ipp
cluster = ipp.Cluster()
cluster
To start the cluster:
await cluster.start_controller()
cluster
engine_set_id = await cluster.start_engines(n=4)
cluster
As you can see, all methods on the Cluster object are async by default.
Every async method also has a _sync variant, if you don't want to / can't use asyncio.
engine_set_2 = cluster.start_engines_sync(n=2)
engine_set_2
At this point, we have a cluster with a controller and six engines in two groups.
There is also a start_cluster method that starts the controller and one engine set, for convenience:
engine_set_id = await cluster.start_cluster(n=4)
We can get a client object connected to the cluster with connect_client()
rc = await cluster.connect_client()
rc.wait_for_engines(6)
rc.ids
And we can use our classic apply_async(...).get_dict() pattern to get a dict by engine id of hostname, pid for each engine:
def identify():
import os
import socket
return {"host": socket.gethostname(), "pid": os.getpid()}
rc[:].apply_async(identify).get_dict()
We can send signals to engine sets by id
(sending signals to just one engine is still a work in progress)
import signal
import time
ar = rc[:].apply_async(time.sleep, 100)
# oops! I meant 1!
await cluster.signal_engines(signal.SIGINT)
ar.get()
Now it's time to cleanup. Every start_ method has a correspinding stop_method.
We can stop one engine set at a time with stop_engines:
await cluster.stop_engines(engine_set_2)
Or stop the whole cluster
await cluster.stop_cluster()
Cluster can also be used as a Context manager, in which case:
as returns a connected clientThis makes it a lot easier to scope an IPython cluster for the duration of a computation and ensure that it is cleaned up when you are done.
import os
with Cluster(n=4) as rc:
engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
It can also be async
async with Cluster(n=2) as rc:
engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
IPython's mechanism for launching controllers and engines is called Launchers.
These are in ipyparallel.cluster.launcher.
There are two kinds of Launcher:
n enginesYou can use abbreviations to access the launchers that ship with IPython parallel, such as 'MPI', 'Local', or 'SGE', or you can pass classes themselves (or their import strings, such as 'mymodule.MyEngineSetLauncher').
I'm going to start a cluster with engines using MPI:
import os
os.environ["OMPI_MCA_rmaps_base_oversubscribe"] = "1"
cluster = Cluster(n=4, engines='MPI')
await cluster.start_cluster()
rc = await cluster.connect_client()
rc.wait_for_engines(4)
rc.ids
Now I'm going to run a test with another new feature
def uhoh():
import time
from mpi4py import MPI
rank = MPI.COMM_WORLD.rank
if rank == 0:
print("rank 0: oh no.")
1 / 0
print(f"rank {rank}: barrier")
MPI.COMM_WORLD.barrier()
ar = rc[:].apply_async(uhoh)
ar.get(timeout=2)
Uh oh! We are stuck in barrier because engine 0 failed.
Let's try interrupting and getting the errors:
import signal
await cluster.signal_engines(signal.SIGINT)
ar.get(timeout=2)
It didn't work! This is because MPI.barrier isn't actually interruptible 😢.
We are going to have to resort to more drastic measures, and restart the engines:
await cluster.restart_engines()
rc.wait_for_engines(4)
rc.ids
We are now back to having 4 responsive engines. Their IPP engine id may have changed, but I can get back to using them.
def get_rank():
from mpi4py import MPI
return MPI.COMM_WORLD.rank
rank_map = rc[:].apply_async(get_rank).get_dict()
rank_map
Finally, clean everything up
await cluster.stop_cluster()
a Cluster object writes its state to disk,
in a file accessible as cluster.cluster_file.
By default, this willb e $PROFILE_DIR/security/ipcluster-$cluster-id.json.
Cluster objects can load state from a dictionary with Cluster.from_dict(d)
or from a JSON file containing that information with Cluster.from_file().
The default arguments for from_file are to use the current IPython profile (default: 'default')
and empty cluster id,
so if you start a cluster with ipcluster start, you can connect to it immediately with
cluster = ipp.Cluster.from_file()
import ipyparallel as ipp
cluster = ipp.Cluster.from_file()
cluster
ipp.ClusterManager provides an API for collecting/discovering/loading all the clusters on your system.
By default, it finds loads clusters in all your IPython profiles, but can be confined to one profile or use explicit profile directories.
clusters = ipp.ClusterManager().load_clusters()
clusters
This is the class that powers the new ipcluster list
!ipcluster list
!ipcluster stop --profile mpi --cluster-id abc-123
!ipcluster list
The same operation can be done from the Python API:
cluster = ipp.Cluster.from_file(profile="default", cluster_id="")
await cluster.stop_cluster()
!ipcluster list