Ivory Siege Tower

mobile construct built of thoughts and parentheses

Plumbum: Sanity for the Shell

[2023-06-19]

metaprogramming python shell source ssh st_dev workflow

Inside the crooked Python ecosystem few straight things were ever made that I like and often use. One is Spack - the software package manager that promoted the HPC Dependency Hell to Purgatory. The other that I've discovered quite recently is arguably the Python package. Turning Python finally into an efficient glue for shell scripting. Making it fun again.

Such is Plumbum: the "shell combinators" library toolkit. A actively maintained and highly usable package that allows using shell commands transparently within a python program, without a need to refer to subprocess in most cases.

Sorry, Common Workflow Language, but it looks like you've been overthrown! Alone with its dependencies, Plumbum allows to quickly build a custom workflow or pipeline mechanism - and in this post I'm giving a live example of one such system, together with concerns and problems that occured while I was figuring it out.

§ Laying hands on Plumbum

The docs are clean and clear, the table of contents reflects well the package features: shell scripting of both local and remote machines in a similar manner, plus the built-in definitions for turning python scripts into POSIX CLI executable commands without explicit use of click package or its analogs.

First-class citizens here are commands exposed on machines, paths and envs - state of the shell defined with environment variables. Nothing very different in principle from what one would expect from a system like that.

Yet there is a few words to be said on the syntax. In a sense plumbum defines a DSL for composition of the package inhabitants, which is cool! But also can look a bit confusing since that dialect is defined within python.

First thing to mention is the overload of some standard arithmetic and comparison operators. E.g. / works as paths concatenator (allowing to drop eye-sticking os.path idioms):

  from plumbum import local

  base = local.path("/tmp")
  return type(base / "siegetower")
: <class 'plumbum.path.local.LocalPath'>

Similarly, < and > perform IO redirection and | can be used to build pipes:

  from plumbum import local

  return (local["ls"][local.env.get("HOME")] | local["grep"]["Downloads"])()
: Downloads

Note that in the previous example there's already quite a few parentheses... It will get worse with the command nesting and is actually a price of describing composition in Python.

At first, commands available on machines are defined with []

  from plumbum import local
  ls = local["ls"]

  return type(ls)
: <class 'plumbum.machines.local.LocalCommand'>

This definition, by the way, would throw exception in an unlikely event that ls is not found in $PATH of the local machine.

That's true for all commands interfaced with plumbum. Make sure that:

  • either you call your script from a shell with pre-configured environment by e.g. sourcing all the needed configuration files with extra environment variables exposed - this is easy to do on a local computer,

  • or have all the configuration needed written in ~/.profile - like in the case of module load commands on remote clusters.

The commands can be partially concretized with extra arguments, by adding extra [] with those arguments listed.

  from plumbum import local
  ls = local["ls"]["-lt"]

  ls("-a")                        # calls 'ls -lt -a' in a current working dir
  print(type(ls))
: <class 'plumbum.commands.base.BoundCommand'>

Notice as the type is changed to the BoundCommand. Binding (concretizing) commands can be further done in a partial application or currying style:

  from plumbum import local

  ls = local["ls"]
  ls = ls["-a"]
  ls = ls["-l"]
  ls = ls["-t"]

  return ls
: /usr/bin/ls -a -l -t

Finally, commands can be nested by binding them with another commands as argumens. If I want to run a physics calculator through a memory profiler I can do:

  from plumbum import local

  mprof = local["mprof"]["--include-children"]
  siesta = local["siesta"]

  bound = mprof[siesta]           # `bound()` will call a chain of nested commands

One can realize a this point that complex nested and pipelined command compositions can be composed. Even written as one-liner, but it will be a lot of weird-looking nested lists, which is by no means a good python style.

And as a rule of thumb:

  • Binding a command with predefined args in [] is not resulting in calling it.

  • Calling a command (or a chain or a pipe) is done by adding () as with regular python functions.

§ Session setup workarounds on remotes

Plumbum does really well in providing transparent and uniform shell scripting on both local and remote machines. I found it possible to go as far as using the HPC environments using its interface with Paramiko that can preserve session among the commands being executed! But here's a bit saying...

The warning inset on the documentation section linked above shouts (at the moment of writing) that:

Piping and input/output redirection don’t really work with ParamikoMachine commands...

This will be solved in a future release; in the meanwhile, you can use the machine’s .session() method

The example given afterwards is totally working, but note that it does not alter the environment of the remote shell. But what if I DO need to set up the remote shell environment after login? And what if the env is quite complex to explicitly enumerate all the variables in the script - like in case of dependencies provisioned with Spack that I usually spack load ...?

Well, here I ran into first problem with plumbum being not perfect. One can work with machine.session() object of some remote machine as the documentation says, where all the side effects like environment variables will be preserved, but they are not reflected for the regular command interface of plumbum. Which means getting rid of commands being composable for example, after which there is no much point using it at all - one could use paramiko directly and say hello to popen() and other stuff we'd better have swiped under the carpet.

One can consult internal documentation and see that machine.session() returns a new session object. What I couldn't find is how to affect the global session. Each machine instance has a ._session property (with underscore, which means "private" in python's convention). But working through it does not help: the regular machine["command"][...] declarations are ignorant on the environmental changes.

Workaround? Well, put all needed settings in ~/.profile, as is quite common for login shell auto-configuration. And also not the heavy stuff since if there will be a significant delay in a shell wind-up Paramiko will treat it as a connection error.

Fortunately for me, Spack comes with environment-modules pre-built and hints on the module loading incantations with:

  spack module tcl loads <package-name>

so I can expose all the needed dependencies in ~/.profile using those.

It's important to mention that Plumbum wraps the executable files it can find in $PATH. Shell functions like "source ..." or "module ..." are not such files - and will cause Plumbum to fail with an error.

Another problem (a bug?) with Plumbum*+*Paramiko on remote is the absence of custom environment variables, even when they are set on login and present in machine.env. Like $LD_LIBRARY_PATH that's needed to link shared libraries at runtime... That was annoying indeed!

But a workaround can be found in the logs that show the whole lengthy command lines that are send on remotes: one can prepend the desired command with a call to env that sets the needed variable to the value it reads from that command's machine.env :)

The following works:

  def wrap_ld_library_path(command):
      machine = command.machine
      ld_lib_env = machine["env"]["LD_LIBRARY_PATH=%s" %
                                  machine.env.get("LD_LIBRARY_PATH")]

      return ld_lib_env[command]

and is actually an example of usefulness of commands composition. And at the end of this rant I'll show an idiomatic approach to creation of such command wrappers.

§ Creating simple workflow system with Plumbum

The following sections define simple yet functional workflow system built on top of Plumbum shell combinators package. I call it trivial_actions for how basic it is.

The key point of the whole demo is to demonstrate the ease of tailoring an inter-operation with shell commands in a way that suits one's specific needs.

The code is available here under MIT license: trivial_actions.py

§ Header section definitions

Although simple, our workflow Actions instances will be proper state machines and also play nicely with python's default logging subsystem.

  import copy
  import logging

  from enum import Enum, auto
  from plumbum.commands.processes import ProcessExecutionError
  from plumbum.path import LocalPath

Enumeration of possible states:

  class State(Enum):
      NEW = auto()
      PREPARED = auto()
      RUNNING = auto()
      SUCCEEDED = auto()
      FAILED = auto()
      IGNORED = auto()

The purpose of ShellCommandRunner class is to execute any given Plumbum command in a certain manner:

  • Errors on non-zero exit codes are suppressed - in case of a single shell command error we don't want the whole workflow fail.

  • Rather exit_code and contents of stdout and stderr are stored.

  • Each such command can be executed once-only.

  class ShellCommandRunner():
      # A basic wrapper over shell commands.
      # Executes only once. Stores split `stdout` and `stderr`
      # in `self.out_log` and `self.error_log` vectors.
      def __init__(self, command, args, cwd="./"):
          self._exit_code = None
          self._out_log = []
          self._err_log = []

          self.args = args
          self.cwd = cwd

          self.command = command

      def add_arg(self, arg, value=None):
          if value is not None:
              self.args.append(f"{arg}={value}")
          else:
              self.args.append(arg)

      @property
      def exit_code(self):
          return self._exit_code

      @property
      def out_log(self):
          return copy.deepcopy(self._out_log)

      @property
      def err_log(self):
          return copy.deepcopy(self._err_log)

      def run(self):
          if self._exit_code is not None:
              return False, "Command already executed! Skipping."

          cmd = self.command[self.args].with_cwd(self.cwd)
          exit_code, stdout, stderr = cmd.run(retcode=None)

          self._exit_code = exit_code
          self._out_log = stdout.split("\n")
          self._err_log = stderr.split("\n")

          return True, "Attemped command execution."

§ Actions protocol declaration with metaclass

This metaclass declares the protocol for the Actions behavior. Initially I created it because I supposed that I'll need several Action types: separate for local and remote shell command actions, to begin with. That was before I realised the fact that Plumbum tries to unify the interface for all these command options and that I would need just one Action to fit all needs.

Still I'm keeping this declaration as a shorthand spec for Actions.

Each of them will operate as we got used to in Computational Physics, where some heavy complex physics calculator does a whole lot of processing within a certain .path where a set of input files are presumably placed beforehand. That preparation is to be performed by .prepare() method before each action is .run(). In a chain of actions optionally a preceding .parent calculation can be linked.

I wanted each subclass of Action itself to become a clear and short specification of a workflow stage. That's why .command is a Plumbum command to be specified at that class declaration. At the same time each action instance is to obtain its own set of argument values for said command. And as the final shell command incantation could be quite different from how I'd like to parameterize the workflow stages, the class also specifies via .make_args_list() method how to unpack its .args_source collection that is passed to each action instance on creation.

Likewise, .make_prefix() and .make_path() of the Action subclass concretize the pathname creation for each action. .make_prefix() is such called here because its result is also reflected as prefix in logs, but in fact it outputs the endpoint directory name.

  class ActionMeta(type):
      """Metaclass for definitions of workflow Action classes.
      Ensures presence of `default_attrs` and definition of
      `required_methods`."""

      default_attrs = {
          'parent': None,
          'path': LocalPath(""),
          'command': None,
          'args_source': [],
          'runner': None,
          'state': State.NEW,
          'logger': logging.getLogger(''),
      }

      required_methods = [
          'make_prefix',
          'make_path',
          'make_args_list',
          'change_state_on_prepare',
          'change_state_on_run',
          'prepare',
          'run',
      ]

      @staticmethod
      def _check_field(field_name, bases, fields):
          if field_name in fields:
              return True

          for base in bases:
              if hasattr(base, field_name):
                  return True

          return False

      def __new__(mcs, name, bases=None, fields=None):
          for attr_name in mcs.default_attrs:
              if not mcs._check_field(attr_name, bases, fields):
                  fields[attr_name] = mcs.default_attrs[attr_name]

          for method_name in mcs.required_methods:
              if not mcs._check_field(method_name, bases, fields):
                  raise Exception(f"Method {method_name} must be defined in {name}.")

          return type.__new__(mcs, name, bases, fields)

§ Definition of Action class

Implementation of the Action protocol.

Public methods .prepare() and .run() are short and to be redefined in concrete subclasses of Action. Especially .prepare(), since .run() as defined below will suit in most cases.

In this simplistic approach the state of each action is coupled with its .path endpoint: if it's already present at the time of call to .prepare(), the state will become IGNORED and subsequent execution will be skipped entirely. Otherwise action.state will cycle through enumerated states from NEW on creation towards SUCCEEDED or FAILED.

And since all Action types in principle have the same .state life-cycle and should not expose it to a user, common behavior is placed in @Action.change_state_on_prepare and @Action.change_state_on_run decorator methods. So that the user needs to place them on top of the redefined public methods.

Note the usage of Plumbum's machine.session() for constructing a pipe that outputs execution logs. It's used so that it works on all kind of Plumbum's machines (local, remote, paramiko), and it's Ok because the state of the shell environment in question is not a subject to a change.

  class Action(metaclass=ActionMeta):
      "Specificates each Action stage for the TrivialActions workflow system."

      def make_prefix(self):
          return self.__class__.__name__.lower()

      def make_path(self):
          if self.parent is None:
              pwd = self.command.machine["pwd"]
              path = self.command.machine.path(pwd().strip())

              return path / self.make_prefix()

          return self.parent.path / self.make_prefix()

      def make_args_list(self):
          return self.args_source

      def __init__(self, args_source, parent=None):
          # First store the `args_source` collection
          # and link to `parent` if present.
          #
          # Other methods used after will depend on them.
          self.args_source = args_source
          self.parent = parent

          self.path = self.make_path()
          self.logger = logging.getLogger(self.make_prefix())

          # Action initialized.
          self.logger.debug("%-10s", self.state.name)
          self.logger.debug("Args source collection received: %s",
                            self.args_source)
          if self.parent is not None:
              self.logger.debug("Parent action linked: %s",
                                self.parent.make_prefix())

      @staticmethod
      def change_state_on_prepare(f):
          def wrapper(*args):
              # args[0] refers to self
              if args[0].command.machine.path(args[0].path).exists():
                  args[0].state = State.IGNORED
                  args[0].logger.info(
                      "%-10s Action-related path exists. Skipping.",
                      State.IGNORED.name)

                  return

              mkdir = args[0].command.machine["mkdir"]
              mkdir("-p", args[0].path)
              f(*args)
              args[0].state = State.PREPARED
              # Action-related path ready for execution.
              args[0].logger.info("%-10s", State.PREPARED.name)

          return wrapper

      @change_state_on_prepare
      def prepare(self):
          pass

      @staticmethod
      def change_state_on_run(f):
          def wrapper(*args):
              if args[0].state == State.PREPARED:
                  args[0].state = State.RUNNING
                  # Submtting the action command for execution.
                  args[0].logger.debug("%-10s", State.RUNNING.name)

                  f(*args)

                  # machine = args[0].command.machine
                  session = args[0].command.machine.session()
                  if args[0].runner.exit_code != 0:
                      args[0].state = State.FAILED
                      args[0].logger.error(
                          "%-10s Action command execution resulted in non-zero exit code.",
                          State.FAILED.name)
                      err_path = args[0].path / "err.log"
                      session.run("echo -n \"%s\" | tee %s" %
                                  ("\n".join(args[0].runner.err_log), err_path))
                      args[0].logger.error("Error log written at: %s", err_path)
                  else:
                      args[0].state = State.SUCCEEDED
                      # Action command successfully executed.
                      args[0].logger.info("%-10s", State.SUCCEEDED.name)

                  out_path = args[0].path / "out.log"
                  session.run("echo -n \"%s\" | tee %s" %
                              ("\n".join(args[0].runner.out_log), out_path))
                  args[0].logger.debug("Output written at: %s", out_path)

          return wrapper

      @change_state_on_run
      def run(self):
          self.runner = ShellCommandRunner(
              self.command,
              self.make_args_list(),
              cwd=self.path,
          )

          with self.command.machine.cwd(self.path):
              launch, message = self.runner.run()
              if launch:
                  self.logger.debug(message)
              else:
                  self.logger.warning(message)

§ TODO: Link some actual examples

As soon as they go public.

§ Bonus: idiomatic Plumbum commands composition

Instances of the following class can serve as composers of Plumbum nested commands:

  class CommandComposer(object):
      @staticmethod
      def __compose_commands(*fs):
          def composition(x, **kws):
              for f in fs[::-1]:
                  x = f(x, **kws)
              return x
          return composition

      def __init__(self, *commands):
          self._composition = self.__compose_commands(*commands)
        
      def __call__(self, command, **kws):
          return self._composition(command, **kws)

Those are used as following. First command wrappers or nesting functions are to be defined. Each should accept a Plumbum command object together with arbitrary **kws keyword arguments. Each wrapper can filter **kws for specific entries; the protocol of which ones can be expected should be established for each CommandComposer instance somewhere.

  # Sample nesting wrappers definitions:

  def wrap_mpirun(command, **kws):
      if "num_mpi_procs" in kws:
          mpirun = command.machine["mpirun"]["-np", kws["num_mpi_procs"]]
      else:
          mpirun = command.machine["mpirun"]

      return mpirun[command]


  def wrap_mprof(command, **kws):
      if "mprof_include_children" in kws and kws["mprof_include_children"]:
          mprof = command.machine["mprof"]["run", "--include-children"]
      elif "mprof_multiprocess" in kws and kws["mprof_multiprocess"]:
          mprof = command.machine["mprof"]["run", "--multipocess"]
      else:
          mprof = command.machine["mprof"]["run"]

      return mprof[command]


  def wrap_ld_library_path(command, **kws):
      machine = command.machine
      ld_lib_env = machine["env"]["LD_LIBRARY_PATH=%s" %
                                  machine.env.get("LD_LIBRARY_PATH")]

      return ld_lib_env[command]

Each composer instance is created by enumerating the nesting functions in reverse order of application (in the order corresponding commands are to be written in the console):

  hpc_wrapper = CommandComposer(
      wrap_ld_library_path,
      wrap_mprof,
      wrap_mpirun)

Finally this single instance can be used to alter required commands:

  from plumbum import local

  command = hpc_wrapper(local["siesta"],
                        num_mpi_procs=4,
                        mprof_include_children=True)