Toulouse, 12 September 2022
Subprocess is the bread-and-butter of software automation. In Python, the
standard library provides the subprocess module for this with many options
that make it suitable for a broad range of usages. In this article, we’ll
cover the specific topic of monitoring such subprocesses. The techniques for
this are numerous and some of them a bit advanced, so it will be an
opportunity to compare them. This will also be a pretext to study the
subprocess module (and its
asyncio counterpart) in details.
What’s a subprocess?
A subprocess is the execution of a program or command as a process external to where it got launched. As such, the parent program (for instance, a Python application) will create and manage a child process for this external program.
There are two main ways to use such subprocesses:
- by running the external program until termination, or,
- by starting the external program as a daemon.
In Python, the subprocess module of the standard library provides entry points for these main use cases:
- the run() function which, as it name suggests, fulfills the first use case, and,
- the Popen class, which is used to create a process in more versatile ways, in particular the daemon use case.
Popen comes from the popen(3) function, from the C standard
library, which “opens a process by creating a pipe, forking, and invoking the
shell”; we’ll get back to this later on.
The standard library also ships with asyncio.subprocess for Async I/O programs (we’ll demonstrate how to use this module for our monitoring topic later on).
One important aspect of subprocesses is communication because one typically
wants to be able to pass data from/to the parent and child. In Python, this is
possibly by using
stderr parameters of
subprocess.run(), though this function also has a
input parameter). As their name suggests, these are standard
I/O streams that are normally attached to any program when started: one for
input, one for output and one for errors.
Looking at Popen documentation, we can notice that these parameters can take different kinds of value:
Amongst them, the
PIPE case is particularly interesting as it provides an
idiomatic way for communication between the parent (Python) and the child
through nice Python stream objects (which can be either read from or
Monitoring subprocesses (and a guiding example)
As mentioned in the introduction, the topic of this article is not only subprocesses in Python, but rather their monitoring. The reader might wonder why would we want to monitor a subprocess and what does it actually mean. Before introducing the motivation for this topic, let’s first define a guiding example in which we’ll use the pg_basebackup program from PostgreSQL which takes a backup of a database cluster (this is typically used to setup a streaming-replication standby server). The command is, in general, issued from a backup (or would-be standby) host and typically looks like:
$ pg_basebackup -d "host=primary.example.com user=admin" -D pgdata -v -P pg_basebackup: initiating base backup, waiting for checkpoint to complete pg_basebackup: checkpoint completed pg_basebackup: write-ahead log start point: 0/E9000028 on timeline 1 pg_basebackup: starting background WAL receiver pg_basebackup: created temporary replication slot "pg_basebackup_12956" 1808807/1808807 kB (100%), 1/1 tablespace pg_basebackup: write-ahead log end point: 0/E9000138 pg_basebackup: waiting for background process to finish streaming ... pg_basebackup: syncing data to disk ... pg_basebackup: renaming backup_manifest.tmp to backup_manifest pg_basebackup: base backup completed
For completeness (though it does not matter much to understand the following),
we used the following options:
-d holds connection information to the source
-D indicates the target directory for backup files,
-v triggers verbose mode and
-P enables progress reporting.
In addition, this command might ask for a password to connect to the primary host.
This command might take a long time to complete when there is a lot of data to transfer and depending on the network connection or even depending on the actual state of the primary database cluster; and that’s typically one reason why we’d like to monitor it.
There are other reasons one might want to monitor a subprocess, for instance:
- let the user know live if the commands we are launching (and which can possibly be long) are running okay;
- and even if they are, let the user interrupt such command if they wish (for instance, reading a log message indicating something undesired).
More generally, when working with distributed systems, where components are typically waiting for others to get ready, programs typically never end even in case of dead-lock or such. So letting the user know live about the situation is nice.
Interface for monitoring Python subprocess
As mentioned earlier, subprocesses as represented by Popen objects expose
their standard streams. In general, we are mostly interested by
monitoring purposes. Getting back to the available interfaces for standard
streams, we have: parent’s stream (i.e. child output will be redirected to
parent’s one), pipes and files.
For many reasons, using parent’s streams is not a very good fit because we’re mixing streams of several programs into one and it can get confusing. Also, there is no easy way to control the rendering of child’s output when forwarded to the parent.
The file approach (i.e. creating file objects in the parent Python program and passing their file descriptor to the child) is better but working with local files does not always fit with some applications’ design which might rather rely on some external logging system (such as Sentry or Grafana Loki).
This leaves us with the pipe approach which is indeed very flexible because all data is processed in memory leaving us the possibility for post-processing, filtering and forwarding based on our own logic.
With all this set up, assuming we’ll have access to process’s
monitor this through Python logging in the form of a simple log function:
import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(message)s", datefmt="[%X]") def log_stderr(program: str, line: str) -> None: logger.debug("%s: %s", program, line.rstrip())
pg_basebackup command with
subprocess.run() would look like:
import subprocess cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"] result = subprocess.run(cmd, capture_output=True, check=True, text=True)
There is typically no way to monitor what happens during child process
execution here. We’re only left with what the
result return value, a
CompletedProcess instance, which we can use only after program
for errline in result.stderr.splitlines(): log_stderr(cmd, errline)
When using Popen, we pass the
PIPE special value as
parameters. This will make Python create a “pipe” for each stream, as a
unidirectional data channel for interprocess communication (quoting
pipe(2)); this uses os.pipe() on POSIX systems. In addition,
object can be used as a context manager which will wait for the child process
to terminate while letting us run arbitrary code under the
with block. So
cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"] with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc: for line in proc.stderr: log_stderr(cmd, line)
Here we are able to monitor live child
stderr while the process is
Popen as a context manager will also close streams at exit,
meaning that we need to process them within the context. Similarly, we need to
handle input ourselves, which would usually need a call to
All in all, the previous example would become:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc: for line in proc.stderr: log_stderr(cmd, line) stdout, stderr = proc.communicate() result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, stderr)
On par with subprocess.run(), we built a CompletedProcess result.
However, inspecting it would reveal that its
stderr attribute is an empty
string. This is because
proc.communicate() returned an empty string in
the second value of the tuple. In turns, this is because we actually already
stderr pipe in the
for loop for monitoring. So if we’d like
to keep the original
stderr while also monitoring it, we’d need to do it
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc: errs =  for line in proc.stderr: log_stderr(cmd, line) errs.append(line) stdout, _ = proc.communicate() result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, "\n".join(errs))
Also note that the
communicate step comes after the monitoring loop,
meaning that this will not work if we need to pass input data. This issue is in
fact more general as we might be interested in doing concurrent things while
the process is running and while we’re monitoring.
Monitoring concurrently, with
So we want to keep our logging task while communicating with the process (or doing something else while it’s running). Threading might be an option, but as this essentially involves I/O, we’re instead heading towards the asyncio (standard) library which also provides builtin support for suprocesses. However, we’ll see that things are not that straightforward.
Let’s try that:
import asyncio.subprocess proc = await asyncio.subprocess.create_subprocess_exec( *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) async def log_stderr() -> bytes: errs =  async for line in proc.stderr: logger.debug("%s: %s", cmd, line.decode().rstrip()) errs.append(line) return b"".join(errs) t = asyncio.create_task(log_stderr()) stdout, stderr = await asyncio.gather(proc.stdout.read(), t)
We’re first building an asyncio Process with
create_subprocess_exec(). Then we’re spawning a task to log its
And finally, we gather our monitoring task with reading process’s
Also note that all communications involve
bytes with the asyncio library.
So we have achieved concurrent monitoring of child’s
stderr, but using
proc.communicate() is still not possible because we’re already reading
asyncio does not permit several readers
to be attached to a pipe.
At this point, we have (at least) two options:
Process.communicate()to plug in our monitoring logic,
- use low-level asyncio, to get more control on
Processhandling of streams.
In order to be able to properly communicate with our asyncio
stderr, we need to dive into the low-level API of asyncio.
This involves using loop.subprocess_exec() and, while we gain more
flexibility, this also implies handling “protocols” and “transports”
ourselves. But luckily, this is documented including nice
examples. Hold on!
Our starting point is loop.subprocess_exec(), which takes a protocol_factory argument to build a SubprocessProtocol responsible for handling communication with the child, as well as process exit. We’ll leverage this to define a custom protocol which binds an extra reader to stderr file-description (fd=2), this implements the SubprocessProtocol interface:
class MyProtocol(asyncio.subprocess.SubprocessStreamProtocol): def __init__(self, reader, limit, loop): super().__init__(limit=limit, loop=loop) self._reader = reader def pipe_data_received(self, fd, data): """Called when the child process writes data into its stdout or stderr pipe. """ super().pipe_data_received(fd, data) if fd == 2: self._reader.feed_data(data) def pipe_connection_lost(self, fd, exc): """Called when one of the pipes communicating with the child process is closed. """ super().pipe_connection_lost(fd, exc) if fd == 2: if exc: self._reader.set_exception(exc) else: self._reader.feed_eof()
reader argument to our previous protocol is simply an
asyncio.StreamReader instance. Thanks to our protocol, this reader will
receive the same data as process’
stderr pipe, sort of a clone. This will
be our handler for monitoring.
Finally, with a bit of plumbing:
loop = asyncio.get_event_loop() reader = asyncio.StreamReader(loop=loop) protocol_factory = functools.partial( MyProtocol, reader, limit=2**16, loop=loop ) async def log_stderr(): async for line in reader: logger.debug("%s: %s", cmd, line.decode().rstrip()) transport, protocol = await loop.subprocess_exec( protocol_factory, *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) proc = asyncio.subprocess.Process(transport, protocol, loop) (out, err), _ = await asyncio.gather(proc.communicate(), log_stderr())
Running this program produces the following logs:
[10:50:55] - Using selector: EpollSelector [10:50:55] - pg_basebackup: pg_basebackup: initiating base backup, waiting for checkpoint to complete [10:50:55] - pg_basebackup: pg_basebackup: checkpoint completed [10:50:55] - pg_basebackup: pg_basebackup: write-ahead log start point: 1/10000028 on timeline 1 [10:50:55] - pg_basebackup: pg_basebackup: starting background WAL receiver [10:50:55] - pg_basebackup: pg_basebackup: created temporary replication slot "pg_basebackup_11979" [10:50:55] - pg_basebackup: 0/1808798 kB (0%), 0/1 tablespace (pgdata/backup_label ) [10:50:56] - pg_basebackup: 109809/1808798 kB (6%), 0/1 tablespace (pgdata/base/49533/49540 ) [10:50:57] - pg_basebackup: 714149/1808798 kB (39%), 0/1 tablespace (pgdata/base/49533/49549 ) [10:50:58] - pg_basebackup: 1274532/1808798 kB (70%), 0/1 tablespace (pgdata/base/49533/49546 ) [10:50:59] - pg_basebackup: 1661093/1808798 kB (91%), 0/1 tablespace (pgdata/base/16387/17957 ) [10:50:59] - pg_basebackup: 1808807/1808807 kB (100%), 0/1 tablespace (pgdata/global/pg_control ) [10:50:59] - pg_basebackup: 1808807/1808807 kB (100%), 1/1 tablespace [10:50:59] - pg_basebackup: pg_basebackup: write-ahead log end point: 1/10000138 [10:50:59] - pg_basebackup: pg_basebackup: waiting for background process to finish streaming ... [10:50:59] - pg_basebackup: pg_basebackup: syncing data to disk ... [10:51:00] - pg_basebackup: pg_basebackup: renaming backup_manifest.tmp to backup_manifest [10:51:00] - pg_basebackup: pg_basebackup: base backup completed
With this approach, we can also do more interesting things than just logging. In
particular, reading the previous output from
pg_basebackup, we could
intercept the progress messages like “ 714149/1808798 kB (39%), 0/1 tablespace
[…]” and use them to report progress to the user.
We’ll use the excellent rich for rendering log messages and report progress in separate panels:
from rich.logging import RichHandler from rich.progress import Progress, TaskID logger = logging.getLogger(__name__) logging.basicConfig( level=logging.DEBUG, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(show_path=False)], ) loop = asyncio.get_event_loop() reader = asyncio.StreamReader(loop=loop) protocol_factory = functools.partial(MyProtocol, reader, limit=2**16, loop=loop) async def log_stderr(progress: Progress, taskid: TaskID) -> None: async for line in reader: m = re.search(r"\((\d+)%\)", line.decode()) if m: p = int(m.group(1)) progress.update(taskid, advance=p) else: logger.debug("%s", line.decode().rstrip()) transport, protocol = await loop.subprocess_exec( protocol_factory, *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) proc = asyncio.subprocess.Process(transport, protocol, loop) with Progress() as progress: taskid = progress.add_task(str(cmd), total=100) (out, err), _ = await asyncio.gather( proc.communicate(), log_stderr(progress, taskid) )
See the screen cast: