Toulouse, 12 September 2022
Subprocess is the bread-and-butter of software automation. In Python, the
standard library provides the subprocess module for this with many options
that make it suitable for a broad range of usages. In this article, we’ll
cover the specific topic of monitoring such subprocesses. The techniques for
this are numerous and some of them a bit advanced, so it will be an
opportunity to compare them. This will also be a pretext to study the
subprocess module (and its asyncio
counterpart) in details.
What’s a subprocess?
A subprocess is the execution of a program or command as a process external to where it got launched. As such, the parent program (for instance, a Python application) will create and manage a child process for this external program.
There are two main ways to use such subprocesses:
- by running the external program until termination, or,
- by starting the external program as a daemon.
In Python, the subprocess module of the standard library provides entry points for these main use cases:
- the run() function which, as it name suggests, fulfills the first use case, and,
- the Popen class, which is used to create a process in more versatile ways, in particular the daemon use case.
The name Popen
comes from the popen(3) function, from the C standard
library, which “opens a process by creating a pipe, forking, and invoking the
shell”; we’ll get back to this later on.
The standard library also ships with asyncio.subprocess for Async I/O programs (we’ll demonstrate how to use this module for our monitoring topic later on).
Communication
One important aspect of subprocesses is communication because one typically
wants to be able to pass data from/to the parent and child. In Python, this is
possibly by using stdin
, stdout
and stderr
parameters of
subprocess.Popen
(or subprocess.run()
, though this function also has a
convenient input
parameter). As their name suggests, these are standard
I/O streams that are normally attached to any program when started: one for
input, one for output and one for errors.
Looking at Popen documentation, we can notice that these parameters can take different kinds of value:
PIPE, DEVNULL, an existing file descriptor (a positive integer), an existing file object, and
None
.
Amongst them, the PIPE
case is particularly interesting as it provides an
idiomatic way for communication between the parent (Python) and the child
through nice Python stream objects (which can be either read from or
written to).
Monitoring subprocesses (and a guiding example)
As mentioned in the introduction, the topic of this article is not only subprocesses in Python, but rather their monitoring. The reader might wonder why would we want to monitor a subprocess and what does it actually mean. Before introducing the motivation for this topic, let’s first define a guiding example in which we’ll use the pg_basebackup program from PostgreSQL which takes a backup of a database cluster (this is typically used to setup a streaming-replication standby server). The command is, in general, issued from a backup (or would-be standby) host and typically looks like:
$ pg_basebackup -d "host=primary.example.com user=admin" -D pgdata -v -P
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/E9000028 on timeline 1
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_12956"
1808807/1808807 kB (100%), 1/1 tablespace
pg_basebackup: write-ahead log end point: 0/E9000138
pg_basebackup: waiting for background process to finish streaming ...
pg_basebackup: syncing data to disk ...
pg_basebackup: renaming backup_manifest.tmp to backup_manifest
pg_basebackup: base backup completed
For completeness (though it does not matter much to understand the following),
we used the following options: -d
holds connection information to the source
database cluster, -D
indicates the target directory for backup files,
-v
triggers verbose mode and -P
enables progress reporting.
In addition, this command might ask for a password to connect to the primary host.
This command might take a long time to complete when there is a lot of data to transfer and depending on the network connection or even depending on the actual state of the primary database cluster; and that’s typically one reason why we’d like to monitor it.
There are other reasons one might want to monitor a subprocess, for instance:
- let the user know live if the commands we are launching (and which can possibly be long) are running okay;
- and even if they are, let the user interrupt such command if they wish (for instance, reading a log message indicating something undesired).
More generally, when working with distributed systems, where components are typically waiting for others to get ready, programs typically never end even in case of dead-lock or such. So letting the user know live about the situation is nice.
Interface for monitoring Python subprocess
As mentioned earlier, subprocesses as represented by Popen objects expose
their standard streams. In general, we are mostly interested by stderr
for
monitoring purposes. Getting back to the available interfaces for standard
streams, we have: parent’s stream (i.e. child output will be redirected to
parent’s one), pipes and files.
For many reasons, using parent’s streams is not a very good fit because we’re mixing streams of several programs into one and it can get confusing. Also, there is no easy way to control the rendering of child’s output when forwarded to the parent.
The file approach (i.e. creating file objects in the parent Python program and passing their file descriptor to the child) is better but working with local files does not always fit with some applications’ design which might rather rely on some external logging system (such as Sentry or Grafana Loki).
This leaves us with the pipe approach which is indeed very flexible because all data is processed in memory leaving us the possibility for post-processing, filtering and forwarding based on our own logic.
Accordingly, we proceed by using either the capture_output=True
parameter of
subprocess.run() and/or stderr=subprocess.PIPE
in subprocess.Popen or
subprocess.run().
With all this set up, assuming we’ll have access to process’s stderr
, we’ll
monitor this through Python logging in the form of a simple log function:
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(message)s", datefmt="[%X]")
def log_stderr(program: str, line: str) -> None:
logger.debug("%s: %s", program, line.rstrip())
Using subprocess.run()
Running our pg_basebackup
command with subprocess.run()
would look like:
import subprocess
cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run(cmd, capture_output=True, check=True, text=True)
There is typically no way to monitor what happens during child process
execution here. We’re only left with what the result
return value, a
CompletedProcess instance, which we can use only after program
termination:
for errline in result.stderr.splitlines():
log_stderr(cmd[0], errline)
Using subprocess.Popen
When using Popen, we pass the PIPE
special value as stdout
and stderr
parameters. This will make Python create a “pipe” for each stream, as a
unidirectional data channel for interprocess communication (quoting
pipe(2)); this uses os.pipe() on POSIX systems. In addition, Popen
object can be used as a context manager which will wait for the child process
to terminate while letting us run arbitrary code under the with
block. So
typically:
cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
for line in proc.stderr:
log_stderr(cmd[0], line)
Here we are able to monitor live child stderr
while the process is
running.
However, using Popen
as a context manager will also close streams at exit,
meaning that we need to process them within the context. Similarly, we need to
handle input ourselves, which would usually need a call to
Popen.communicate().
All in all, the previous example would become:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
for line in proc.stderr:
log_stderr(cmd[0], line)
stdout, stderr = proc.communicate()
result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, stderr)
On par with subprocess.run(), we built a CompletedProcess result.
However, inspecting it would reveal that its stderr
attribute is an empty
string. This is because proc.communicate()
returned an empty string in
the second value of the tuple. In turns, this is because we actually already
consumed the stderr
pipe in the for
loop for monitoring. So if we’d like
to keep the original stderr
while also monitoring it, we’d need to do it
ourselves:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
errs = []
for line in proc.stderr:
log_stderr(cmd[0], line)
errs.append(line)
stdout, _ = proc.communicate()
result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, "\n".join(errs))
Also note that the communicate
step comes after the monitoring loop,
meaning that this will not work if we need to pass input data. This issue is in
fact more general as we might be interested in doing concurrent things while
the process is running and while we’re monitoring.
Monitoring concurrently, with asyncio
So we want to keep our logging task while communicating with the process (or doing something else while it’s running). Threading might be an option, but as this essentially involves I/O, we’re instead heading towards the asyncio (standard) library which also provides builtin support for suprocesses. However, we’ll see that things are not that straightforward.
Let’s try that:
import asyncio.subprocess
proc = await asyncio.subprocess.create_subprocess_exec(
*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
async def log_stderr() -> bytes:
errs = []
async for line in proc.stderr:
logger.debug("%s: %s", cmd[0], line.decode().rstrip())
errs.append(line)
return b"".join(errs)
t = asyncio.create_task(log_stderr())
stdout, stderr = await asyncio.gather(proc.stdout.read(), t)
We’re first building an asyncio Process with
create_subprocess_exec()
. Then we’re spawning a task to log its stderr
.
And finally, we gather our monitoring task with reading process’s stdout
.
Also note that all communications involve bytes
with the asyncio library.
So we have achieved concurrent monitoring of child’s stderr
, but using
proc.communicate()
is still not possible because we’re already reading
stderr
through log_stderr()
and asyncio
does not permit several readers
to be attached to a pipe.
At this point, we have (at least) two options:
- re-implement
Process.communicate()
to plug in our monitoring logic, - use low-level asyncio, to get more control on
Process
handling of streams.
Low-level asyncio
In order to be able to properly communicate with our asyncio Process
while
monitoring its stderr
, we need to dive into the low-level API of asyncio.
This involves using loop.subprocess_exec() and, while we gain more
flexibility, this also implies handling “protocols” and “transports”
ourselves. But luckily, this is documented including nice
examples. Hold on!
Our starting point is loop.subprocess_exec(), which takes a protocol_factory argument to build a SubprocessProtocol responsible for handling communication with the child, as well as process exit. We’ll leverage this to define a custom protocol which binds an extra reader to stderr file-description (fd=2), this implements the SubprocessProtocol interface:
class MyProtocol(asyncio.subprocess.SubprocessStreamProtocol):
def __init__(self, reader, limit, loop):
super().__init__(limit=limit, loop=loop)
self._reader = reader
def pipe_data_received(self, fd, data):
"""Called when the child process writes data into its stdout
or stderr pipe.
"""
super().pipe_data_received(fd, data)
if fd == 2:
self._reader.feed_data(data)
def pipe_connection_lost(self, fd, exc):
"""Called when one of the pipes communicating with the child
process is closed.
"""
super().pipe_connection_lost(fd, exc)
if fd == 2:
if exc:
self._reader.set_exception(exc)
else:
self._reader.feed_eof()
The reader
argument to our previous protocol is simply an
asyncio.StreamReader instance. Thanks to our protocol, this reader will
receive the same data as process’ stderr
pipe, sort of a clone. This will
be our handler for monitoring.
Finally, with a bit of plumbing:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(loop=loop)
protocol_factory = functools.partial(
MyProtocol, reader, limit=2**16, loop=loop
)
async def log_stderr():
async for line in reader:
logger.debug("%s: %s", cmd[0], line.decode().rstrip())
transport, protocol = await loop.subprocess_exec(
protocol_factory,
*cmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
proc = asyncio.subprocess.Process(transport, protocol, loop)
(out, err), _ = await asyncio.gather(proc.communicate(), log_stderr())
Running this program produces the following logs:
[10:50:55] - Using selector: EpollSelector
[10:50:55] - pg_basebackup: pg_basebackup: initiating base backup, waiting for checkpoint to complete
[10:50:55] - pg_basebackup: pg_basebackup: checkpoint completed
[10:50:55] - pg_basebackup: pg_basebackup: write-ahead log start point: 1/10000028 on timeline 1
[10:50:55] - pg_basebackup: pg_basebackup: starting background WAL receiver
[10:50:55] - pg_basebackup: pg_basebackup: created temporary replication slot "pg_basebackup_11979"
[10:50:55] - pg_basebackup: 0/1808798 kB (0%), 0/1 tablespace (pgdata/backup_label )
[10:50:56] - pg_basebackup: 109809/1808798 kB (6%), 0/1 tablespace (pgdata/base/49533/49540 )
[10:50:57] - pg_basebackup: 714149/1808798 kB (39%), 0/1 tablespace (pgdata/base/49533/49549 )
[10:50:58] - pg_basebackup: 1274532/1808798 kB (70%), 0/1 tablespace (pgdata/base/49533/49546 )
[10:50:59] - pg_basebackup: 1661093/1808798 kB (91%), 0/1 tablespace (pgdata/base/16387/17957 )
[10:50:59] - pg_basebackup: 1808807/1808807 kB (100%), 0/1 tablespace (pgdata/global/pg_control )
[10:50:59] - pg_basebackup: 1808807/1808807 kB (100%), 1/1 tablespace
[10:50:59] - pg_basebackup: pg_basebackup: write-ahead log end point: 1/10000138
[10:50:59] - pg_basebackup: pg_basebackup: waiting for background process to finish streaming ...
[10:50:59] - pg_basebackup: pg_basebackup: syncing data to disk ...
[10:51:00] - pg_basebackup: pg_basebackup: renaming backup_manifest.tmp to backup_manifest
[10:51:00] - pg_basebackup: pg_basebackup: base backup completed
Fancy processing
With this approach, we can also do more interesting things than just logging. In
particular, reading the previous output from pg_basebackup
, we could
intercept the progress messages like “ 714149/1808798 kB (39%), 0/1 tablespace
[…]” and use them to report progress to the user.
We’ll use the excellent rich for rendering log messages and report progress in separate panels:
from rich.logging import RichHandler
from rich.progress import Progress, TaskID
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(show_path=False)],
)
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(loop=loop)
protocol_factory = functools.partial(MyProtocol, reader, limit=2**16, loop=loop)
async def log_stderr(progress: Progress, taskid: TaskID) -> None:
async for line in reader:
m = re.search(r"\((\d+)%\)", line.decode())
if m:
p = int(m.group(1))
progress.update(taskid, advance=p)
else:
logger.debug("%s", line.decode().rstrip())
transport, protocol = await loop.subprocess_exec(
protocol_factory,
*cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
proc = asyncio.subprocess.Process(transport, protocol, loop)
with Progress() as progress:
taskid = progress.add_task(str(cmd[0]), total=100)
(out, err), _ = await asyncio.gather(
proc.communicate(), log_stderr(progress, taskid)
)
See the screen cast: