Programming Examples¶
Remote command example (sequential mode)¶
The following example shows how to send a command on some nodes, how to get a specific buffer and how to get gathered buffers:
from ClusterShell.Task import task_self
task = task_self()
task.run("/bin/uname -r", nodes="green[36-39,133]")
print task.node_buffer("green37")
for buf, nodes in task.iter_buffers():
print nodes, buf
if task.max_retcode() != 0:
print "An error occurred (max rc = %s)" % task.max_retcode()
Result:
2.6.32-431.el6.x86_64
['green37', 'green38', 'green36', 'green39'] 2.6.32-431.el6.x86_64
['green133'] 3.10.0-123.20.1.el7.x86_64
Max return code is 0
Remote command example with live output (event-based mode)¶
The following example shows how to use the event-based programmation model by
installing an EventHandler and listening for EventHandler.ev_read()
(we've got a line to read) and EventHandler.ev_hup()
(one command has
just completed) events. The goal here is to print standard outputs of uname
-a
commands during their execution and also to notify the user of any
erroneous return codes:
from ClusterShell.Task import task_self
from ClusterShell.Event import EventHandler
class MyHandler(EventHandler):
def ev_read(self, worker, node, sname, msg):
print "%s: %s" % (node, msg)
def ev_hup(self, worker, node, rc):
if rc != 0:
print "%s: returned with error code %s" % (node, rc)
task = task_self()
# Submit command, install event handler for this command and run task
task.run("/bin/uname -a", nodes="fortoy[32-159]", handler=MyHandler())
check_nodes.py example script¶
The following script is available as an example in the source repository and is usually packaged with ClusterShell:
#!/usr/bin/python
# check_nodes.py: ClusterShell simple example script.
#
# This script runs a simple command on remote nodes and report node
# availability (basic health check) and also min/max boot dates.
# It shows an example of use of Task, NodeSet and EventHandler objects.
# Feel free to copy and modify it to fit your needs.
#
# Usage example: ./check_nodes.py -n node[1-99]
import optparse
from datetime import date, datetime
import time
from ClusterShell.Event import EventHandler
from ClusterShell.NodeSet import NodeSet
from ClusterShell.Task import task_self
class CheckNodesResult(object):
"""Our result class"""
def __init__(self):
"""Initialize result class"""
self.nodes_ok = NodeSet()
self.nodes_ko = NodeSet()
self.min_boot_date = None
self.max_boot_date = None
def show(self):
"""Display results"""
if self.nodes_ok:
print "%s: OK (boot date: min %s, max %s)" % \
(self.nodes_ok, self.min_boot_date, self.max_boot_date)
if self.nodes_ko:
print "%s: FAILED" % self.nodes_ko
class CheckNodesHandler(EventHandler):
"""Our ClusterShell EventHandler"""
def __init__(self, result):
"""Initialize our event handler with a ref to our result object."""
EventHandler.__init__(self)
self.result = result
def ev_read(self, worker, node, sname, msg):
"""Read event from remote nodes"""
# this is an example to demonstrate remote result parsing
bootime = " ".join(msg.strip().split()[2:])
date_boot = None
for fmt in ("%Y-%m-%d %H:%M",): # formats with year
try:
# datetime.strptime() is Python2.5+, use old method instead
date_boot = datetime(*(time.strptime(bootime, fmt)[0:6]))
except ValueError:
pass
for fmt in ("%b %d %H:%M",): # formats without year
try:
date_boot = datetime(date.today().year, \
*(time.strptime(bootime, fmt)[1:6]))
except ValueError:
pass
if date_boot:
if not self.result.min_boot_date or \
self.result.min_boot_date > date_boot:
self.result.min_boot_date = date_boot
if not self.result.max_boot_date or \
self.result.max_boot_date < date_boot:
self.result.max_boot_date = date_boot
self.result.nodes_ok.add(node)
else:
self.result.nodes_ko.add(node)
def ev_close(self, worker, timedout):
"""Worker has finished (command done on all nodes)"""
if timedout:
nodeset = NodeSet.fromlist(worker.iter_keys_timeout())
self.result.nodes_ko.add(nodeset)
self.result.show()
def main():
""" Main script function """
# Initialize option parser
parser = optparse.OptionParser()
parser.add_option("-d", "--debug", action="store_true", dest="debug",
default=False, help="Enable debug mode")
parser.add_option("-n", "--nodes", action="store", dest="nodes",
default="@all", help="Target nodes (default @all group)")
parser.add_option("-f", "--fanout", action="store", dest="fanout",
default="128", help="Fanout window size (default 128)",
type=int)
parser.add_option("-t", "--timeout", action="store", dest="timeout",
default="5", help="Timeout in seconds (default 5)",
type=float)
options, _ = parser.parse_args()
# Get current task (associated to main thread)
task = task_self()
nodes_target = NodeSet(options.nodes)
task.set_info("fanout", options.fanout)
if options.debug:
print "nodeset : %s" % nodes_target
task.set_info("debug", True)
# Create ClusterShell event handler
handler = CheckNodesHandler(CheckNodesResult())
# Schedule remote command and run task (blocking call)
task.run("who -b", nodes=nodes_target, handler=handler, \
timeout=options.timeout)
if __name__ == '__main__':
main()
Using NodeSet with Parallel Python Batch script using SLURM¶
The following example shows how to use the NodeSet class to expand
$SLURM_NODELIST
environment variable in a Parallel Python batch script
launched by SLURM. This variable may contain folded node sets. If ClusterShell
is not available system-wide on your compute cluster, you need to follow
Installing ClusterShell as user using PIP first.
Example of SLURM pp.sbatch
to submit using sbatch pp.sbatch
:
#!/bin/bash
#SBATCH -N 2
#SBATCH --ntasks-per-node 1
# run the servers
srun ~/.local/bin/ppserver.py -w $SLURM_CPUS_PER_TASK -t 300 &
sleep 10
# launch the parallel processing
python -u ./pp_jobs.py
Example of a pp_jobs.py
script:
#!/usr/bin/env python
import os, time
import pp
from ClusterShell.NodeSet import NodeSet
# get the nodelist form Slurm
nodeset = NodeSet(os.environ['SLURM_NODELIST'])
# start the servers (ncpus=0 will make sure that none is started locally)
# casting nodelist to tuple/list will correctly expand $SLURM_NODELIST
job_server = pp.Server(ncpus=0, ppservers=tuple(nodelist))
# make sure the servers have enough time to start
time.sleep(5)
# test function to execute on the remove nodes
def test_func():
print os.uname()
# start the jobs
job_1 = job_server.submit(test_func,(),(),("os",))
job_2 = job_server.submit(test_func,(),(),("os",))
# retrive the results
print job_1()
print job_2()
# Cleanup
job_server.print_stats()
job_server.destroy()