Inside the crooked Python ecosystem few straight things were ever made that I like and often use. One is Spack - the software package manager that promoted the HPC Dependency Hell to Purgatory. The other that I've discovered quite recently is arguably the Python package. Turning Python finally into an efficient glue for shell scripting. Making it fun again.
Such is Plumbum: the "shell combinators" library toolkit. A actively
maintained and highly usable package that allows using shell commands
transparently within a python program, without a need to refer to
subprocess
in most cases.
Sorry, Common Workflow Language, but it looks like you've been overthrown! Alone with its dependencies, Plumbum allows to quickly build a custom workflow or pipeline mechanism - and in this post I'm giving a live example of one such system, together with concerns and problems that occured while I was figuring it out.
§ Laying hands on Plumbum
The docs are clean and clear, the table of contents reflects well the
package features: shell scripting of both local and remote machines
in a similar manner, plus the built-in definitions for turning python
scripts into POSIX CLI executable commands without explicit use of
click
package or its analogs.
First-class citizens here are commands exposed on machines, paths and envs - state of the shell defined with environment variables. Nothing very different in principle from what one would expect from a system like that.
Yet there is a few words to be said on the syntax. In a sense
plumbum
defines a DSL for composition of the package inhabitants,
which is cool! But also can look a bit confusing since that dialect is
defined within python.
First thing to mention is the overload of some standard arithmetic and
comparison operators. E.g. /
works as paths concatenator (allowing
to drop eye-sticking os.path
idioms):
from plumbum import local base = local.path("/tmp") return type(base / "siegetower")
: <class 'plumbum.path.local.LocalPath'>
Similarly, <
and >
perform IO redirection and |
can be used to
build pipes:
from plumbum import local return (local["ls"][local.env.get("HOME")] | local["grep"]["Downloads"])()
: Downloads
Note that in the previous example there's already quite a few parentheses... It will get worse with the command nesting and is actually a price of describing composition in Python.
At first, commands available on machines are defined with []
from plumbum import local ls = local["ls"] return type(ls)
: <class 'plumbum.machines.local.LocalCommand'>
This definition, by the way, would throw exception in an unlikely
event that ls
is not found in $PATH
of the local machine.
That's true for all commands interfaced with plumbum. Make sure that:
either you call your script from a shell with pre-configured environment by e.g. sourcing all the needed configuration files with extra environment variables exposed - this is easy to do on a local computer,
or have all the configuration needed written in
~/.profile
- like in the case ofmodule load
commands on remote clusters.
The commands can be partially concretized with extra arguments, by
adding extra []
with those arguments listed.
from plumbum import local ls = local["ls"]["-lt"] ls("-a") # calls 'ls -lt -a' in a current working dir print(type(ls))
: <class 'plumbum.commands.base.BoundCommand'>
Notice as the type is changed to the BoundCommand
. Binding
(concretizing) commands can be further done in a partial application
or currying style:
from plumbum import local ls = local["ls"] ls = ls["-a"] ls = ls["-l"] ls = ls["-t"] return ls
: /usr/bin/ls -a -l -t
Finally, commands can be nested by binding them with another commands as argumens. If I want to run a physics calculator through a memory profiler I can do:
from plumbum import local mprof = local["mprof"]["--include-children"] siesta = local["siesta"] bound = mprof[siesta] # `bound()` will call a chain of nested commands
One can realize a this point that complex nested and pipelined command compositions can be composed. Even written as one-liner, but it will be a lot of weird-looking nested lists, which is by no means a good python style.
And as a rule of thumb:
Binding a command with predefined args in
[]
is not resulting in calling it.Calling a command (or a chain or a pipe) is done by adding
()
as with regular python functions.
§ Session setup workarounds on remotes
Plumbum does really well in providing transparent and uniform shell scripting on both local and remote machines. I found it possible to go as far as using the HPC environments using its interface with Paramiko that can preserve session among the commands being executed! But here's a bit saying...
The warning inset on the documentation section linked above shouts (at the moment of writing) that:
Piping and input/output redirection don’t really work with
ParamikoMachine
commands...This will be solved in a future release; in the meanwhile, you can use the machine’s
.session()
method
The example given afterwards is totally working, but note that it does
not alter the environment of the remote shell. But what if I DO need
to set up the remote shell environment after login? And what if the
env is quite complex to explicitly enumerate all the variables in the
script - like in case of dependencies provisioned with Spack that I
usually spack load ...
?
Well, here I ran into first problem with plumbum
being not perfect.
One can work with machine.session()
object of some remote machine
as the documentation says, where all the side effects like environment
variables will be preserved, but they are not reflected for the
regular command interface of plumbum
. Which means getting rid of
commands being composable for example, after which there is no much
point using it at all - one could use paramiko
directly and say
hello to popen()
and other stuff we'd better have swiped under
the carpet.
One can consult internal documentation and see that
machine.session()
returns a new session object. What I couldn't find
is how to affect the global session. Each machine instance has a
._session
property (with underscore, which means "private" in
python's convention). But working through it does not help: the
regular machine["command"][...]
declarations are ignorant on the
environmental changes.
Workaround? Well, put all needed settings in ~/.profile
, as is quite
common for login shell auto-configuration. And also not the heavy
stuff since if there will be a significant delay in a shell wind-up
Paramiko will treat it as a connection error.
Fortunately for me, Spack comes with environment-modules pre-built and hints on the module loading incantations with:
spack module tcl loads <package-name>
so I can expose all the needed dependencies in ~/.profile
using
those.
It's important to mention that Plumbum wraps the executable files it can find in
$PATH
. Shell functions like"source ..."
or"module ..."
are not such files - and will cause Plumbum to fail with an error.
Another problem (a bug?) with Plumbum*+*Paramiko on remote is the
absence of custom environment variables, even when they are set on
login and present in machine.env
. Like $LD_LIBRARY_PATH
that's
needed to link shared libraries at runtime... That was annoying
indeed!
But a workaround can be found in the logs that show the whole lengthy
command lines that are send on remotes: one can prepend the desired
command with a call to env
that sets the needed variable to the
value it reads from that command's machine.env
:)
The following works:
def wrap_ld_library_path(command): machine = command.machine ld_lib_env = machine["env"]["LD_LIBRARY_PATH=%s" % machine.env.get("LD_LIBRARY_PATH")] return ld_lib_env[command]
and is actually an example of usefulness of commands composition. And at the end of this rant I'll show an idiomatic approach to creation of such command wrappers.
§ Creating simple workflow system with Plumbum
The following sections define simple yet functional workflow system
built on top of Plumbum shell combinators package. I call it
trivial_actions
for how basic it is.
The key point of the whole demo is to demonstrate the ease of tailoring an inter-operation with shell commands in a way that suits one's specific needs.
The code is available here under MIT
license: trivial_actions.py
§ Header section definitions
Although simple, our workflow Actions
instances will be proper state
machines and also play nicely with python's default logging subsystem.
import copy import logging from enum import Enum, auto from plumbum.commands.processes import ProcessExecutionError from plumbum.path import LocalPath
Enumeration of possible states:
class State(Enum): NEW = auto() PREPARED = auto() RUNNING = auto() SUCCEEDED = auto() FAILED = auto() IGNORED = auto()
The purpose of ShellCommandRunner
class is to execute any given
Plumbum command in a certain manner:
Errors on non-zero exit codes are suppressed - in case of a single shell command error we don't want the whole workflow fail.
Rather
exit_code
and contents ofstdout
andstderr
are stored.Each such command can be executed once-only.
class ShellCommandRunner(): # A basic wrapper over shell commands. # Executes only once. Stores split `stdout` and `stderr` # in `self.out_log` and `self.error_log` vectors. def __init__(self, command, args, cwd="./"): self._exit_code = None self._out_log = [] self._err_log = [] self.args = args self.cwd = cwd self.command = command def add_arg(self, arg, value=None): if value is not None: self.args.append(f"{arg}={value}") else: self.args.append(arg) @property def exit_code(self): return self._exit_code @property def out_log(self): return copy.deepcopy(self._out_log) @property def err_log(self): return copy.deepcopy(self._err_log) def run(self): if self._exit_code is not None: return False, "Command already executed! Skipping." cmd = self.command[self.args].with_cwd(self.cwd) exit_code, stdout, stderr = cmd.run(retcode=None) self._exit_code = exit_code self._out_log = stdout.split("\n") self._err_log = stderr.split("\n") return True, "Attemped command execution."
§ Actions protocol declaration with metaclass
This metaclass declares the protocol for the Actions
behavior.
Initially I created it because I supposed that I'll need several
Action
types: separate for local and remote shell command actions,
to begin with. That was before I realised the fact that Plumbum
tries to unify the interface for all these command options and that
I would need just one Action
to fit all needs.
Still I'm keeping this declaration as a shorthand spec for Actions
.
Each of them will operate as we got used to in Computational Physics,
where some heavy complex physics calculator does a whole lot of
processing within a certain .path
where a set of input files are
presumably placed beforehand. That preparation is to be performed by
.prepare()
method before each action is .run()
. In a chain of
actions
optionally a preceding .parent
calculation can be linked.
I wanted each subclass of Action
itself to become a clear and short
specification of a workflow stage. That's why .command
is a Plumbum
command to be specified at that class declaration. At the same time
each action
instance is to obtain its own set of argument values for
said command. And as the final shell command incantation could be
quite different from how I'd like to parameterize the workflow stages,
the class also specifies via .make_args_list()
method how to unpack
its .args_source
collection that is passed to each action
instance
on creation.
Likewise, .make_prefix()
and .make_path()
of the Action
subclass
concretize the pathname creation for each action
. .make_prefix()
is such called here because its result is also reflected as prefix in
logs, but in fact it outputs the endpoint directory name.
class ActionMeta(type): """Metaclass for definitions of workflow Action classes. Ensures presence of `default_attrs` and definition of `required_methods`.""" default_attrs = { 'parent': None, 'path': LocalPath(""), 'command': None, 'args_source': [], 'runner': None, 'state': State.NEW, 'logger': logging.getLogger(''), } required_methods = [ 'make_prefix', 'make_path', 'make_args_list', 'change_state_on_prepare', 'change_state_on_run', 'prepare', 'run', ] @staticmethod def _check_field(field_name, bases, fields): if field_name in fields: return True for base in bases: if hasattr(base, field_name): return True return False def __new__(mcs, name, bases=None, fields=None): for attr_name in mcs.default_attrs: if not mcs._check_field(attr_name, bases, fields): fields[attr_name] = mcs.default_attrs[attr_name] for method_name in mcs.required_methods: if not mcs._check_field(method_name, bases, fields): raise Exception(f"Method {method_name} must be defined in {name}.") return type.__new__(mcs, name, bases, fields)
§ Definition of Action class
Implementation of the Action
protocol.
Public methods .prepare()
and .run()
are short and to be redefined
in concrete subclasses of Action
. Especially .prepare()
, since
.run()
as defined below will suit in most cases.
In this simplistic approach the state of each action
is coupled with
its .path
endpoint: if it's already present at the time of call to
.prepare()
, the state will become IGNORED
and subsequent
execution
will be skipped entirely. Otherwise action.state
will
cycle through enumerated states from NEW
on creation towards
SUCCEEDED
or FAILED
.
And since all Action
types in principle have the same .state
life-cycle and should not expose it to a user, common behavior is
placed in @Action.change_state_on_prepare
and
@Action.change_state_on_run
decorator methods. So that the user
needs to place them on top of the redefined public methods.
Note the usage of Plumbum's machine.session()
for constructing a
pipe that outputs execution logs. It's used so that it works on all
kind of Plumbum's machines (local, remote, paramiko), and it's Ok
because the state of the shell environment in question is not a
subject to a change.
class Action(metaclass=ActionMeta): "Specificates each Action stage for the TrivialActions workflow system." def make_prefix(self): return self.__class__.__name__.lower() def make_path(self): if self.parent is None: pwd = self.command.machine["pwd"] path = self.command.machine.path(pwd().strip()) return path / self.make_prefix() return self.parent.path / self.make_prefix() def make_args_list(self): return self.args_source def __init__(self, args_source, parent=None): # First store the `args_source` collection # and link to `parent` if present. # # Other methods used after will depend on them. self.args_source = args_source self.parent = parent self.path = self.make_path() self.logger = logging.getLogger(self.make_prefix()) # Action initialized. self.logger.debug("%-10s", self.state.name) self.logger.debug("Args source collection received: %s", self.args_source) if self.parent is not None: self.logger.debug("Parent action linked: %s", self.parent.make_prefix()) @staticmethod def change_state_on_prepare(f): def wrapper(*args): # args[0] refers to self if args[0].command.machine.path(args[0].path).exists(): args[0].state = State.IGNORED args[0].logger.info( "%-10s Action-related path exists. Skipping.", State.IGNORED.name) return mkdir = args[0].command.machine["mkdir"] mkdir("-p", args[0].path) f(*args) args[0].state = State.PREPARED # Action-related path ready for execution. args[0].logger.info("%-10s", State.PREPARED.name) return wrapper @change_state_on_prepare def prepare(self): pass @staticmethod def change_state_on_run(f): def wrapper(*args): if args[0].state == State.PREPARED: args[0].state = State.RUNNING # Submtting the action command for execution. args[0].logger.debug("%-10s", State.RUNNING.name) f(*args) # machine = args[0].command.machine session = args[0].command.machine.session() if args[0].runner.exit_code != 0: args[0].state = State.FAILED args[0].logger.error( "%-10s Action command execution resulted in non-zero exit code.", State.FAILED.name) err_path = args[0].path / "err.log" session.run("echo -n \"%s\" | tee %s" % ("\n".join(args[0].runner.err_log), err_path)) args[0].logger.error("Error log written at: %s", err_path) else: args[0].state = State.SUCCEEDED # Action command successfully executed. args[0].logger.info("%-10s", State.SUCCEEDED.name) out_path = args[0].path / "out.log" session.run("echo -n \"%s\" | tee %s" % ("\n".join(args[0].runner.out_log), out_path)) args[0].logger.debug("Output written at: %s", out_path) return wrapper @change_state_on_run def run(self): self.runner = ShellCommandRunner( self.command, self.make_args_list(), cwd=self.path, ) with self.command.machine.cwd(self.path): launch, message = self.runner.run() if launch: self.logger.debug(message) else: self.logger.warning(message)
§ TODO: Link some actual examples
As soon as they go public.
§ Bonus: idiomatic Plumbum commands composition
Instances of the following class can serve as composers of Plumbum nested commands:
class CommandComposer(object): @staticmethod def __compose_commands(*fs): def composition(x, **kws): for f in fs[::-1]: x = f(x, **kws) return x return composition def __init__(self, *commands): self._composition = self.__compose_commands(*commands) def __call__(self, command, **kws): return self._composition(command, **kws)
Those are used as following. First command wrappers or nesting
functions are to be defined. Each should accept a Plumbum command
object together with arbitrary **kws
keyword arguments. Each
wrapper can filter **kws
for specific entries; the protocol of
which ones can be expected should be established for each
CommandComposer
instance somewhere.
# Sample nesting wrappers definitions: def wrap_mpirun(command, **kws): if "num_mpi_procs" in kws: mpirun = command.machine["mpirun"]["-np", kws["num_mpi_procs"]] else: mpirun = command.machine["mpirun"] return mpirun[command] def wrap_mprof(command, **kws): if "mprof_include_children" in kws and kws["mprof_include_children"]: mprof = command.machine["mprof"]["run", "--include-children"] elif "mprof_multiprocess" in kws and kws["mprof_multiprocess"]: mprof = command.machine["mprof"]["run", "--multipocess"] else: mprof = command.machine["mprof"]["run"] return mprof[command] def wrap_ld_library_path(command, **kws): machine = command.machine ld_lib_env = machine["env"]["LD_LIBRARY_PATH=%s" % machine.env.get("LD_LIBRARY_PATH")] return ld_lib_env[command]
Each composer instance is created by enumerating the nesting functions in reverse order of application (in the order corresponding commands are to be written in the console):
hpc_wrapper = CommandComposer( wrap_ld_library_path, wrap_mprof, wrap_mpirun)
Finally this single instance can be used to alter required commands:
from plumbum import local command = hpc_wrapper(local["siesta"], num_mpi_procs=4, mprof_include_children=True)