Commit 3540a970 authored by Belhorn, Matt's avatar Belhorn, Matt
Browse files

Adds Jupyter notebook describing use of MPI cluster engines on Rhea.

parent ba87737e
Loading
Loading
Loading
Loading
+432 −0
Original line number Diff line number Diff line
%% Cell type:markdown id: tags:

# Interactive parallel processing in Jupyter notebooks on Rhea

## Install `ipyparallel`

The first thing to do is to install and enable `ipyparallel`:

- Activate the venv
- pip install ipyparallel
- Enable the extensions:
    - `jupyter nbextension install --sys-prefix --py ipyparallel`
    - `jupyter nbextension enable --sys-prefix --py ipyparallel`
    - `jupyter serverextension enable --sys-prefix --py ipyparallel`
- `ipython profile create --parallel --profile=<NAME>`
    - Engine launches default to \<NAME\> = 'default'
    - While tempting to use the default profile, it is risky.

A `default` profile is automatically created in the ipython configuration directory which is normally `$HOME/.ipython`. This documentation will refer to the ipython config dir as `IPYTHONDIR`. The default profile is located at `IPYTHONDIR/profile_default` and is used by all default ipython and jupyter instances on all resources. It should therefore be kept truly default. This walkthrough will use a non-default user-generated profile named `mpi` as an example instead of the default. It can be created by calling:

```
$ ipython profile create --parallel --profile=mpi
```


## To use MPI:

Instruct the profile to use `mpiexec` to start the cluster engines. The default MPI size for cluster engines should be less than the number of cores on a single node without consideration of HyperThreading. On Rhea, this is 16. An unmodified profile will detect 32 HT cores and use this as the default. Both of these changes are made by editing the lines (possibly commented out) in the file IPYTHONDIR/profile_mpi/ipcluster_config.py:

```
c.IPClusterEngines.engine_launcher_class = 'MPIEngineSetLauncher'
c.IPClusterEngines.n = 16
```
It is also necessary to tell ipyparallel to use the hostfile list generated for the PBS job by changing the following setting in `ipcluster_config.py`:

```
c.MPILauncher.mpi_args = ['--hostfile', environ['PBS_NODEFILE']]
```

### Greater MPI Communicator Sizes

Setting `n=16` is a failsafe value for when the cluster engines are started on the same node as the *ipyparallel cluster controller*. This is typically the node local to the jupyter server instance. It is possible to launch an engine cluster with a greater MPI sizes but the controller needs to be able to listen for connections from other nodes.

To do this, the PBS job that launches the server must request additional nodes:

```
#PBS -l nodes=4 # for instance
```

**This will consume allocation faster and is very wasteful if most of the time the job is running is spent not doing any work.**

The cluster controller, by default, only listens to local connections. This must be changed so that MPI ranks spawned on the other nodes in the job can connect to the controller by changing the values for the following settings in `IPYTHONDIR/profile_mpi/ipcontroller_config.py`:

```
c.RegistrationFactory.ip = u'*'
c.HubFactory.engine_ip = u'*'
```


# Using Ipyparallel

## Starting the 'cluster'

Use the `IPython Clusters` tab on the Jupyter home view to start a cluster profile configured for MPI.

## Connecting to the cluster in a Notebook

Notebooks are given their own single-threaded, non-MPI Python kernel. Code run in the notebook is nominally run on this local notebook kernel. Code and cells that are intended to run under MPI parallism must be directed to run on a cluster of engines through an `ipyparallel.Client` instance running in the local kernel that is attached to the cluster engines. Such a client is created in the local kernel by running:

%% Cell type:code id: tags:

``` python
import ipyparallel as ipp
client = ipp.Client(profile='mpi', debug=False)
print "Engine IDs available to this client:\n {0}".format(client.ids)
```

%% Output

    Engine IDs available to this client:
     [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]

%% Cell type:markdown id: tags:

The client uses semi-synchronous messaging (provided by ZMQ) to send execution messages to the attached cluster and recieve response results. [**Messages are generally non-blocking**](https://ipyparallel.readthedocs.io/en/latest/asyncresult.html).

The client is iterable and indexed over *direct multiplexer views* of the individual attached engines. These individual engine views can be sliced into multi-engine views. Parallel code must be sent to a view.

%% Cell type:code id: tags:

``` python
view = client.direct_view() # All engines, all the time
subset_view = client[:8]    # First 8 current engines.
single_view = client[0]     # First current engine
```

%% Cell type:markdown id: tags:

## Running code on a view

Once a view is created, two provided decorators, `@DirectView.remote` and `@DirectView.parallel` can be used define functions that take advantage of the view's engines. Remote functions (`@remote`) are executed on the engines as opposed to in the local python kernel within the full parallel universe:

%% Cell type:code id: tags:

``` python
# DirectView.remote decorates functions that run on cluster engines.
@view.remote(block=True)
def ranks_in_view():
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    return comm.Get_rank()

@subset_view.remote(block=True)
def ranks_in_subset_view():
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    return comm.Get_rank()

print ranks_in_view()
print ranks_in_subset_view()
```

%% Output

    [4, 0, 2, 1, 7, 12, 8, 3, 5, 10, 14, 13, 11, 15, 9, 6, 16, 18, 17, 22, 20, 24, 26, 21, 28, 27, 29, 30, 19, 23, 25, 31, 57, 49, 55, 32, 42, 38, 51, 53, 48, 56, 54, 58, 59, 50, 52, 41, 33, 37, 39, 47, 36, 35, 43, 34, 40, 45, 46, 44]
    [4, 0, 2, 1, 7, 12, 8, 3]

%% Cell type:markdown id: tags:

Whereas parallel functions (`@parallel`) distribute element-wise operations on sequence types across the cluster:

%% Cell type:code id: tags:

``` python
@subset_view.parallel(block=True)
def what_parallel_do(a, b):
    return '{0}{1}'.format(a, ''.join(b))

import string
def letters_and_numbers(n):
    return ((string.ascii_lowercase * (1+(n // 26)))[:n],
            [str(i) for i in range(n)])
```

%% Cell type:code id: tags:

``` python
a, b = letters_and_numbers(8)
what_parallel_do(a, b)
```

%% Output

    ['a0', 'b1', 'c2', 'd3', 'e4', 'f5', 'g6', 'h7']

%% Cell type:markdown id: tags:

This is easily confused with `map` operations when the sequences lengths are less than or equal to the number of parallel processing elements:

%% Cell type:code id: tags:

``` python
what_parallel_do.map(a, b)
```

%% Output

    ['a0', 'b1', 'c2', 'd3', 'e4', 'f5', 'g6', 'h7']

%% Cell type:markdown id: tags:

but **`@parallel` is not an element-wise map**. It applies the operation to subsequences of the inputs distributed over the engines and then combines the result. Map operations should use the `map` method explicitly:

%% Cell type:code id: tags:

``` python
a, b = letters_and_numbers(10)
print what_parallel_do(a, b)
print what_parallel_do.map(a, b)
```

%% Output

    ['ab01', 'cd23', 'e4', 'f5', 'g6', 'h7', 'i8', 'j9']
    ['a0', 'b1', 'c2', 'd3', 'e4', 'f5', 'g6', 'h7', 'i8', 'j9']

%% Cell type:markdown id: tags:

Beyond remote and parallel functions, any arbitrary local function can be run on cluster engines using the `apply`, `execute`, and `map` methods.

Instructions passed to the cluster are generally run asynchronously but can be made blocking by setting the `DirectView.block` attribute to `True`.

%% Cell type:code id: tags:

``` python
view.block = True
subset_view.block = True
single_view.block = True
```

%% Cell type:markdown id: tags:

Otherwise, each method provides a `*_sync` and `*_async` variant that will execute code as indicated. Asynchronous execution adds additional complexity and will be discussed separately from this introduction.

The `apply` method (and it's synchronous and asynchronous variants) will run an arbitrary function on each engine in the view:

%% Cell type:code id: tags:

``` python
def hello(*args):
    import os
    import platform as plat
    msg = "Hello from process {pid} on {host} given '{args}' args."
    return msg.format(pid=os.getpid(), host=plat.node(), args=args)

view.apply(hello, 1, 2)
```

%% Output

    ["Hello from process 59377 on rhea6 given '(1, 2)' args.",
     "Hello from process 59373 on rhea6 given '(1, 2)' args.",
     "Hello from process 59375 on rhea6 given '(1, 2)' args.",
     "Hello from process 59374 on rhea6 given '(1, 2)' args.",
     "Hello from process 59380 on rhea6 given '(1, 2)' args.",
     "Hello from process 59385 on rhea6 given '(1, 2)' args.",
     "Hello from process 59381 on rhea6 given '(1, 2)' args.",
     "Hello from process 59376 on rhea6 given '(1, 2)' args.",
     "Hello from process 59378 on rhea6 given '(1, 2)' args.",
     "Hello from process 59383 on rhea6 given '(1, 2)' args.",
     "Hello from process 59387 on rhea6 given '(1, 2)' args.",
     "Hello from process 59386 on rhea6 given '(1, 2)' args.",
     "Hello from process 59384 on rhea6 given '(1, 2)' args.",
     "Hello from process 59388 on rhea6 given '(1, 2)' args.",
     "Hello from process 59382 on rhea6 given '(1, 2)' args.",
     "Hello from process 59379 on rhea6 given '(1, 2)' args.",
     "Hello from process 77967 on rhea7 given '(1, 2)' args.",
     "Hello from process 77969 on rhea7 given '(1, 2)' args.",
     "Hello from process 77968 on rhea7 given '(1, 2)' args.",
     "Hello from process 77973 on rhea7 given '(1, 2)' args.",
     "Hello from process 77971 on rhea7 given '(1, 2)' args.",
     "Hello from process 77975 on rhea7 given '(1, 2)' args.",
     "Hello from process 77977 on rhea7 given '(1, 2)' args.",
     "Hello from process 77972 on rhea7 given '(1, 2)' args.",
     "Hello from process 77979 on rhea7 given '(1, 2)' args.",
     "Hello from process 77978 on rhea7 given '(1, 2)' args.",
     "Hello from process 77980 on rhea7 given '(1, 2)' args.",
     "Hello from process 77981 on rhea7 given '(1, 2)' args.",
     "Hello from process 77970 on rhea7 given '(1, 2)' args.",
     "Hello from process 77974 on rhea7 given '(1, 2)' args.",
     "Hello from process 77976 on rhea7 given '(1, 2)' args.",
     "Hello from process 77982 on rhea7 given '(1, 2)' args.",
     "Hello from process 106153 on rhea29 given '(1, 2)' args.",
     "Hello from process 106145 on rhea29 given '(1, 2)' args.",
     "Hello from process 106151 on rhea29 given '(1, 2)' args.",
     "Hello from process 110027 on rhea9 given '(1, 2)' args.",
     "Hello from process 110037 on rhea9 given '(1, 2)' args.",
     "Hello from process 110033 on rhea9 given '(1, 2)' args.",
     "Hello from process 106147 on rhea29 given '(1, 2)' args.",
     "Hello from process 106149 on rhea29 given '(1, 2)' args.",
     "Hello from process 106144 on rhea29 given '(1, 2)' args.",
     "Hello from process 106152 on rhea29 given '(1, 2)' args.",
     "Hello from process 106150 on rhea29 given '(1, 2)' args.",
     "Hello from process 106154 on rhea29 given '(1, 2)' args.",
     "Hello from process 106155 on rhea29 given '(1, 2)' args.",
     "Hello from process 106146 on rhea29 given '(1, 2)' args.",
     "Hello from process 106148 on rhea29 given '(1, 2)' args.",
     "Hello from process 110036 on rhea9 given '(1, 2)' args.",
     "Hello from process 110028 on rhea9 given '(1, 2)' args.",
     "Hello from process 110032 on rhea9 given '(1, 2)' args.",
     "Hello from process 110034 on rhea9 given '(1, 2)' args.",
     "Hello from process 110042 on rhea9 given '(1, 2)' args.",
     "Hello from process 110031 on rhea9 given '(1, 2)' args.",
     "Hello from process 110030 on rhea9 given '(1, 2)' args.",
     "Hello from process 110038 on rhea9 given '(1, 2)' args.",
     "Hello from process 110029 on rhea9 given '(1, 2)' args.",
     "Hello from process 110035 on rhea9 given '(1, 2)' args.",
     "Hello from process 110040 on rhea9 given '(1, 2)' args.",
     "Hello from process 110041 on rhea9 given '(1, 2)' args.",
     "Hello from process 110039 on rhea9 given '(1, 2)' args."]

%% Cell type:markdown id: tags:

Likewise, `map` will execute a function in parallel over input sequences.

%% Cell type:code id: tags:

``` python
subset_view.map(lambda x: x**2, range(10))
```

%% Output

    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

%% Cell type:markdown id: tags:

## Namepaces and Data Movement

Objects in the notebook's global namespace must be copied to the engine namespace to become available engine globals. Data movement is generally done explicitly although some objects, such as remote functions and their arguments, are automatically serialized and copied to the cluster.

%% Cell type:code id: tags:

``` python
local = 1

@single_view.remote(block=True)
def scope_and_serialization_demo(passed):
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    engine = "rank-{r:0{w}d}: ".format(r=rank, w=len(str(size)))
    state = "{var} == '{value}'"
    try:
        found = local
    except NameError:
        found = 'UNDEFINED'
    return engine + "; ".join(
        (state.format(var=' local', value=found),
         state.format(var='passed', value=passed)))

scope_and_serialization_demo(local)
```

%% Output

    "rank-08:  local == 'UNDEFINED'; passed == '1'"

%% Cell type:markdown id: tags:

Objects and data can be moved across the local notebook and remote engine namespaces through through the DirectView's `dict`-like item accessors and methods such as `apply`, `get`, `update`, `scatter`, and `gather` much in the same way ordinary namespaces can be altered through the `__dict__` attribute.

%% Cell type:code id: tags:

``` python
# Copy `local` to engines' namespace:
single_view['local'] = local
scope_and_serialization_demo(local)
```

%% Output

    "rank-08:  local == '1'; passed == '1'"

%% Cell type:markdown id: tags:

Sequences can be evenly distributed from the interactive session to the cluster engines by means of the `scatter` method. Data can be reduced back to the notebook namespace using the `gather` method.

%% Cell type:code id: tags:

``` python
subset_view.scatter('scattered_list', range(16))
print subset_view['scattered_list']
subset_view.gather('scattered_list')
```

%% Output

    [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15]]

    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

%% Cell type:markdown id: tags:

These methods copy the data and cannot be used to mutate objects in-place across namespace. It is also important to note that these operations are not the same as their MPI counterparts. Communication between ranks on the cluster must be done through MPI.

%% Cell type:code id: tags:

``` python
a = [1,2]
def mutate_a():
    for i, j in enumerate(a):
        a[i] = j + 1

print a
mutate_a()
print a
```

%% Output

    [1, 2]
    [2, 3]

%% Cell type:code id: tags:

``` python
subset_view.scatter('a', range(16))
a = subset_view.gather('a') # Copy-op, not shared memory
print a
subset_view.apply(mutate_a); # Mutates only engine namespace
print a
print subset_view.gather('a')
```

%% Output

    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]

%% Cell type:markdown id: tags:

likewise,

%% Cell type:code id: tags:

``` python
subset_view.scatter('a', range(16))
a = subset_view.gather('a')
print a
mutate_a() # Mutates only local namespace
print a
print subset_view.gather('a')
```

%% Output

    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

%% Cell type:markdown id: tags:

# Where to go from here?

The [ipyparallel package](https://github.com/ipython/ipyparallel) has an excellent set of examples that cover many more features and topics for running parallel python code interactively. Some notable features not discussed here but covered in the upstream documentation include

* The parallel cell magic operator `%px` used to run lines and cells directly on the engines (must read!).
* Threads
* The `run()` operator for launching python scripts in parallel interactively.