Why a Message Passing Interface?
When designing a parallel processing program, there is usually a need to pass messages between the simultaneously running program instances, so that they can synchronize with each other. It is also fairly common for one instance of the running program to be designted the master or controller, so that it can manage the other instances, and know once all of the other instances have completed.
In the world of cluster computing, rather than rely on proprietary or bespoke method of passing messages between parallel running programs, a Message Passing Interface (MPI) Forum has been established to provide a standardized interface that is consistent across different computing architectures and operating systems.
Open MPI is one such implementation of that standard interface, which -- to quote the that web site -- is developed and maintained by a consortium of academic, research, and industry partners. Another implementation is MPICH, which has been around a bit longer, and there are numerous discussions on the Internet on which one might be better under which scenerios. And there are others, but the gist is that because they are all built on the same MPI interface, a parallel program may be coded once and run on any MPI compliant implementation without modification. The only caveat is that not all implementations may include 100% of the functionality of the MPI standard, or may adhere to different versions of the MPI standard. Backwards compatibiltiy between MPI versions is usually maintained, although new features are being added going forward.
In the Linux + Scientific cluster computing community, albeit somewhat subjectively, Open MPI appears to be fairly common and popular, so that is the MPI implementation being included in these examples. It is also quick and easy to install on a Raspberry Pi, and does not require any kernel modifictions.
Installing Open MPI
Now that Slurm has been installed and working, it is possible to install Open MPI across all of the nodes in the cluster with a single command:
srun --nodes=4 sudo apt-get install openmpi-bin libopenmpi-dev -y
Testing Open MPI
The MPI Test program included here will run run on four (4) nodes (hydra01 => hydra04), and each node will run four (4) processes, to match the number of cores per node. Each process (16 in total) will run the /usr/bin/hostname command and output the result. The first process on the first node will be considered the controller. To test the message passing interface, the output of the hostname command from each process shall write to the controller, and the controller will display every message received. Once the controller has received fifteen (15) messages -- or one message per process minus itself -- it will exit. The directory conventions followed in this example are:
- /data/slurm/src ... shall contain source code
- /data/slurm/bin ... shall contain compiled code and executable scripts
- /data/slurm/run ... shall contain batch scripts for submission (and output)
1. Create directories to orgainze program source and executables on shared cluster storage:
$ sudo mkdir -p /data/slurm/bin
$ sudo mkdir -p /data/slurm/src
$ sudo mkdir -p /data/slurm/run
$ sudo chown pi:pi /data/slurm/bin /data/slurm/src /data/slurm/run
2. Create a sample Open MPI test program in C, and compile it using the Open MPI GCC wrapper:
$ cd /data/slurm/src
$ vi mpi_test.c
/* test_mpi.c - Test the Open MPI Interface */
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char** argv){
int node, size, count, tag;
int maxlen = 1024;
char local[maxlen];
char remote[maxlen];
FILE *pipe;
char *command = "/usr/bin/hostname";
MPI_Status status;
/* Initialize MPI and get node number and world size */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &node);
MPI_Comm_size(MPI_COMM_WORLD, &size);
/* Run the hostname command and capture the output */
pipe = popen(command, "r");
if (pipe == NULL) {
printf("[%02d]: Command failed: %s\n", node, command);
exit(1);
}
while (fgets(local, sizeof(local), pipe) != NULL) {
printf("[%02d]: %s %s", node, command, local);
}
pclose(pipe);
/* Node zero is the controller here */
count = size - 1;
if (node == 0) {
printf("*** Start controller on [%02d]:%s", node, local);
while (count) {
MPI_Recv(&remote, maxlen, MPI_CHAR, MPI_ANY_SOURCE,
MPI_ANY_TAG, MPI_COMM_WORLD, &status);
printf("[%02d]: Received %s", node, remote);
count--;
}
printf("*** End controller on [%02d]:%s\n", node, local);
} else {
MPI_Send(&local, strlen(local), MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}
/* Clean-up */
MPI_Finalize();
}
$ mpicc -o ../bin/test_mpi test_mpi.c
3. Create a Slurm batch job to run the Open MPI test program on all nodes in the cluster:
Note: The batch job is run through the bash shell, and the #SBATCH comments are interpreted as parameters to the sbatch command: the --nodes parameter defines the number of nodes that the job runs on, and the --ntasks-per-node parameter defines the number of jobs per node (which typically matches the number of cores per node)
$ cd /data/slurm/run
$ vi test_mpi.sh
#!/bin/bash
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=4
cd $SLURM_SUBMIT_DIR
mpirun ../bin/test_mpi
4. Submit the Slurm batch job with the sbatch command, and monitor it with the squeue command
$ sbatch test_mpi.sh
Submitted batch job 208
$ squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
208 all test_mpi pi R 0:02 4 hydra[01-04]
5. Once completed, display the batch job output file slurm-###.out
(replace ### with the batch job number output from the sbatch command)
Note: The output shown has been sorted for readibility, although the order of the lines will actually be mixed
$ cat slurm-208.out
*** Start controller on [00]:hydra01
[00]: Received hydra01
[00]: Received hydra01
[00]: Received hydra01
[00]: Received hydra02
[00]: Received hydra02
[00]: Received hydra02
[00]: Received hydra02
[00]: Received hydra03
[00]: Received hydra03
[00]: Received hydra03
[00]: Received hydra03
[00]: Received hydra04
[00]: Received hydra04
[00]: Received hydra04
[00]: Received hydra04
[00]: /usr/bin/hostname hydra01
[01]: /usr/bin/hostname hydra01
[02]: /usr/bin/hostname hydra01
[03]: /usr/bin/hostname hydra01
[04]: /usr/bin/hostname hydra02
[05]: /usr/bin/hostname hydra02
[06]: /usr/bin/hostname hydra02
[07]: /usr/bin/hostname hydra02
[08]: /usr/bin/hostname hydra03
[09]: /usr/bin/hostname hydra03
[10]: /usr/bin/hostname hydra03
[11]: /usr/bin/hostname hydra03
[12]: /usr/bin/hostname hydra04
[13]: /usr/bin/hostname hydra04
[14]: /usr/bin/hostname hydra04
[15]: /usr/bin/hostname hydra04
*** End controller on [00]:hydra01
Using Python with MPI
In addition to the native C, as used in this last example, there are a number of other languages for which MPI wrappers are available. These wrappers permit MPI messaging to be used within those languages.
Python is one of the most popular languages today, according to various surveys, and in recent years has been increasing in popularity. There are several MPI wrappers for Python available, and MPI for Python (mpi4py) appears to be fairly popular and well covered on the Intenet. It is also actively maintained, and makes available most of the functionaity of Open MPI within the Python programming language.
Python is a much higher level language that C, and is popular in many DevOps projects, which fits nicely with cluster computing. To install Python with an MPI wrapper, and run an example Python program that mimics the prior MPI test program written in C:
1. Install the lastest Python interpreter and MPI for Python wrapper across the cluster using Slurm:
$ srun --nodes=4 sudo apt-get install python3
$ srun --nodes=4 python -m pip install mpi4py
2. Create a sample Open MPI Python test program:
$ cd /data/slurm/bin
$ vi mpi_test_python.py
# test_mpi_python.py - Test the Open MPI Interface from Python
from mpi4py import MPI
from subprocess import Popen, PIPE
command = "/usr/bin/hostname";
# Get node number and world size (MPI_Init is called implicitly by import)
comm = MPI.COMM_WORLD
node = comm.Get_rank()
size = comm.Get_size()
# Run the hostname command and capture the output
with Popen([command], stdout=PIPE) as proc:
local = proc.stdout.read().decode("UTF-8")
if local == "":
print("[%02d]: Command failed: %s" % (node, command))
else:
print("[%02d]: %s %s" % (node, command, local), end="")
# Node zero is the controller here
count = size - 1
if node == 0:
print("*** Start controller on [%02d]:%s" % (node, local), end="")
while count > 0:
remote = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=None)
print("[%02d]: Received %s" % (node, remote), end="")
count -= 1
print("*** End controller on [%02d]:%s\n" % (node, local), end="")
else:
comm.send(local, dest=0, tag=0)
# Clean-up (MPI_Finalize is called implicitly when script exits)
exit(0)
3. Create a Slurm batch job to run the Open MPI Python test program on all nodes in the cluster:
Reminder: The --nodes parameter defines the number of nodes that the job runs on, and the --ntasks-per-node parameter defines the number of jobs per node (which typically matches the number of cores per node)
$ cd /data/slurm/run
$ vi test_mpi_python.sh
#!/bin/bash
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=4
cd $SLURM_SUBMIT_DIR
mpirun python ../bin/test_mpi_python.py
4. Submit the Slurm batch job with the sbatch command, and monitor it with the squeue command
$ sbatch test_mpi_python.sh
Submitted batch job 208
$ squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
208 all test_mpi pi R 0:02 4 hydra[01-04]
5. Once completed, display the batch job output file slurm-###.out, where ### is the job number, which should match the output of the test program written in C
Using Java with MPI
The Open MPI project includes an optional Java wrapper, but unfortunately the pre-built Open MPI binary packaged for Raspberry Pi OS (Debian 11) does not include that wrapper. Therefore, it is necessary to download and build Open MPI from source.
Note: The only file missing from the pre-built Open MPI package is the JAR library (mpi.jar) containing an interface layer on top of Open MPI; that pre-built library is available for download from this site, along with these sample Open MPI test programs, if you do not wish to build Open MPI from source, and can therefore start with step 6, otherwise follow steps 1 through 5
To build the MPI Java wrapper from Open MPI source (these examples use version 4.1.0, and should match the version of Open MPI that was installed, e.g., use the command mpirun --version to verify the version installed):
1. Install the Java Runtime (suggesting OpenJDK 11) on all cluster nodes
$ srun --nodes=4 sudo apt-get install openjdk-11-jdk -y
2. Download the source from the Open MPI web site on the first (hydra01) node using a browser:
Note: The version installed in this example is 4.1.0:
https://www.open-mpi.org/software/ompi/v4.1/
Click the download link: openmpi-4.1.0.tar.gz
3. Build Open MPI from source:
$ sudo tar xvf openmpi-4.1.0.tar.gz -C /data
$ sudo chown -R pi:pi /data/openmpi-4.1.0
$ cd /data/openmpi-4.1.0
$ ./configure --enable-mpi-java
$ make
4. Copy the newly built MPI Java library to the expected location:
$ sudo cp \
/data/openmpi-4.1.0/ompi/mpi/java/java/mpi.jar \
/usr/lib/arm-linux-gnueabihf/openmpi/lib/
5. Link the Java Runtime to the expected location
Note: Running the command mpijavac --showme will display where the wrapper expects to find the javac command
$ cd /usr/lib/jvm
$ sudo ln -s java-11-openjdk-armhf default-java
6. Create a sample Open MPI Java test program, and compile it using the Open MPI javac wrapper:
$ cd /data/slurm/src
$ vi mpi_test_java.java
/* test_mpi_java.java - Test the Open MPI Interface from Java */
import mpi.Datatype;
import mpi.MPI;
import mpi.MPIException;
import mpi.Status;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
public class test_mpi_java {
public static void main(String[] args) throws MPIException {
String command = "/usr/bin/hostname";
/* Initialize MPI and get node number and world size */
MPI.Init(args);
int node = MPI.COMM_WORLD.getRank();
int size = MPI.COMM_WORLD.getSize();
String nodeStr = String.format("%02d", node);
/* Run the hostname command and capture the output */
Runtime run = Runtime.getRuntime();
Process process = null;
try {
process = run.exec(command);
} catch (IOException ioe) {
System.out.println("[" + nodeStr + "]: Command failed: " + command);
}
BufferedReader br = new BufferedReader(
new InputStreamReader(process.getInputStream()));
StringBuffer local = new StringBuffer();
String line;
try {
while ((line = br.readLine()) != null) local.append(line + "\n");
} catch (IOException ioe) {
local.append("[" + nodeStr + "]: Command failed: " + command);
}
System.out.print("[" + nodeStr + "]: " + command + " " + local);
/* Node zero is the controller here */
int count = size -1;
if (node == 0) {
System.out.print("*** Start controller on [" + nodeStr + "]:" + local);
while (count > 0) {
byte[] remote = new byte[100];
int any = -1;
Status status = MPI.COMM_WORLD.recv(remote, remote.length, MPI.BYTE,
any, any);
System.out.print("[" + nodeStr + "]: Received " +
new String(remote, 0, status.getCount(MPI.BYTE)));
count--;
}
System.out.print("*** End controller on [" + nodeStr + "]:" + local);
} else {
byte[] message = local.toString().getBytes();
MPI.COMM_WORLD.send(message, message.length, MPI.BYTE, 0, 0);
}
/* Clean-up */
MPI.Finalize();
}
}
$ mpijavac -d ../bin test_mpi_java.java
7. Create a Slurm batch job to run the Open MPI Java test program on all nodes in the cluster:
Reminder: The --nodes parameter defines the number of nodes that the job runs on, and the --ntasks-per-node parameter defines the number of jobs per node (which typically matches the number of cores per node)
$ cd /data/slurm/run
$ vi test_mpi_java.sh
#!/bin/bash
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=4
cd $SLURM_SUBMIT_DIR
export CLASSPATH=.:../bin
mpirun java test_mpi_java
8. Submit the Slurm batch job with the sbatch command, and monitor it with the squeue command
$ sbatch test_mpi_java.sh
Submitted batch job 208
$ squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
208 all test_mpi pi R 0:02 4 hydra[01-04]
9. Once completed, display the batch job output file slurm-###.out, where ### is the job number, which should match the output of the test program written in C