Toulouse, 19 September 2022

Psycopg, the PostgreSQL database adapter for Python, recently added support for libpq pipeline mode thus bringing significant performance boost, especially when network latency is important. In this article, we’ll briefly describe how it works from users’ perspective and under the hood while also providing a few implementation details.

Psycopg logo

Psycopg 3.1, released in late August, is the second major release of the Psycopg 3 project, which is a complete rewrite of the venerable psycopg2. Supporting libpq pipeline mode involved significant changes to the query processing logic in the driver. Yet, the challenge was to make it compatible with the “normal” query mode in order to keep the API almost unchanged and thus bring performance benefits to users without exposing the complexity of the batch query mode.

For the impatient, head out to the pipeline mode documentation of Psycopg: it’s self-consistent, explains nicely the details for client/server communication, as well as how things work from the user’s perspective.

Using the pipeline mode in Psycopg

Connection objects gained a pipeline() method to enable the pipeline mode through a context manager (with statement); so using it is as simple as:

conn = psycopg.connect()
with conn.pipeline():
   # do work

What is the pipeline mode for?

Postgres documentation contains advices on when the pipeline mode is useful. One particular case is when the application is doing many write operations (INSERT, UPDATE, DELETE).

For instance, let’s consider the following schema:

CREATE TABLE t (x numeric, d timestamp, p boolean)

and assume an application does a lot of queries like:

INSERT INTO t (x, d, p) VALUES ($1, $2, $3)

with distinct values, and with x possibly being a large integer (n!, n<1000). Maybe the application could make use of batch inserts such as executemany(), maybe not (e.g. because it needs to do some other operations between inserts, like querying another resource): this does not matter much.

Let’s put this up into a little demo.py Python program:

import math
import sys
from datetime import datetime
import psycopg

def create_table(conn: psycopg.Connection) -> None:
    conn.execute("DROP TABLE IF EXISTS t")
    conn.execute("CREATE UNLOGGED TABLE t (x numeric, d timestamp, p boolean)")

def do_insert(conn: psycopg.Connection, *, pipeline: bool, count: int = 1000) -> None:
    query = "INSERT INTO t (x, d, p) VALUES (%s, %s, %s)"
    for n in range(count):
        params = (math.factorial(n), datetime.now(), pipeline)
        conn.execute(query, params, prepare=True)

with psycopg.connect(autocommit=True) as conn:
    create_table(conn)
    if "--pipeline" in sys.argv:
        with conn.pipeline():
            do_insert(conn, pipeline=True)
    else:
        do_insert(conn, pipeline=False)
    row_count = conn.execute("select count(*) from t").fetchone()[0]
    print(f"→ {row_count} rows")

we’ll run our script as python demo.py [--pipeline], the --pipeline flag allowing to enable pipeline mode. Note that we passed prepare=True to Connection.execute(), in order to issue a PREPARE statement as we’ll emit the same query many times.

In general, each INSERT query will be fast to execute server-side. Without the pipeline mode enabled, the client will typically issue the query and then wait for its result (though it is unused here): thus the client/server round-trip time will probably be much larger than the execution time (on server). With the pipeline mode, we basically save these round-trips most of the times.

Interlude: tracing

When working on optimizing client/server communication, it’s essential to be able to monitor this communication at a reasonably low level. From Psycopg’s perspective, the boundary is the libpq. Fortunately, the library provides a tracing mechanism through the PQtrace function and friends.

The output of this function looks like (example taken from the PostgreSQL test suite):

F	68	Parse	 "select_one" "SELECT $1, '42', $1::numeric, interval '1 sec'" 1 NNNN
F	16	Describe	 S "select_one"
F	4	Sync
B	4	ParseComplete
B	10	ParameterDescription	 1 NNNN
B	113	RowDescription	 4 "?column?" NNNN 0 NNNN 4 -1 0 "?column?" NNNN 0 NNNN 65535 -1 0 "numeric" NNNN 0 NNNN 65535 -1 0 "interval" NNNN 0 NNNN 16 -1 0
B	5	ReadyForQuery	 I
F	10	Query	 "BEGIN"
B	10	CommandComplete	 "BEGIN"
B	5	ReadyForQuery	 T
F	43	Query	 "DECLARE cursor_one CURSOR FOR SELECT 1"
B	19	CommandComplete	 "DECLARE CURSOR"
B	5	ReadyForQuery	 T
F	16	Describe	 P "cursor_one"
F	4	Sync
B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
B	5	ReadyForQuery	 T
F	4	Terminate

Each row contains the “direction indicator” (F for messages from client to server or B for messages from server to client), the message length, the message type, and its content. This example shows messages from the Extended Query protocol.

In Psycopg, we have access to the low-level PGconn object, representing the libpq connection, through Connection.pgconn attribute.

Here’s how to enable tracing to stderr, for our demo.py program above:

from contextlib import contextmanager
from typing import Iterator
from psycopg import pq

@contextmanager
def trace_to_stderr(conn: psycopg.Connection) -> Iterator[None]:
    """Enable tracing of the client/server communication to STDERR."""
    conn.pgconn.trace(sys.stderr.fileno())
    conn.pgconn.set_trace_flags(pq.Trace.SUPPRESS_TIMESTAMPS | pq.Trace.REGRESS_MODE)
    try:
        yield
    finally:
        conn.pgconn.untrace()

def do_insert(conn: psycopg.Connection, *, pipeline: bool, count: int = 1000) -> None:
    # ...
    with trace_to_stderr(conn):
        for _ in range(count):
            conn.execute(query, params, prepare=True)

To pipeline or not to pipeline

If we run our demo script (without pipeline mode), we’ll typically get the following output:

F	69	Parse	 "_pg3_0" "INSERT INTO t (x, d, p) VALUES ($1, $2, $3)" 3 NNNN NNNN NNNN
F	4	Sync
B	4	ParseComplete
B	5	ReadyForQuery	 I
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8fp~WN' 1 '\x00' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	4	Sync
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	5	ReadyForQuery	 I
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8fp~^\xffffff80' 1 '\x00' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	4	Sync
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	5	ReadyForQuery	 I
[ ... and so forth ~1000 more times ... ]

we indeed see the client/server round-trips in the form of sequences of F messages followed by sequences of B messages for each query.

The first message sequence Parse+ParseComplete corresponds to the PREPARE statement. Next ones only have a Bind/Describe/Execute client messages followed by server response.

Now using the pipeline mode (run the script with --pipeline), we get the following trace:

F	69	Parse	 "_pg3_0" "INSERT INTO t (x, d, p) VALUES ($1, $2, $3)" 3 NNNN NNNN NNNN
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82W\xfffffffe\xffffffd0' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\x00\xffffffc0' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x02' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\x01\xffffff81' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
[ ... ~300 more of those ... ]
B	4	ParseComplete
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
[ ... ~300 more of those ... ]
F	383	Bind	 "" "_pg3_3" 3 1 1 1 3 336 '\x00\xffffffa4\x00\xffffffa3\x00\x00\x00\x00\x00\x00\x19k\x18\xffffff8c\x0c\xffffffa3!\x13\x17\xfffffffe!*\x03\xffffff90\x1f\xffffff94\x19V\x1cN$\xffffff8d\x1c[\x1fB\x1e\xffffffbb\x10e\x0f\x05\x0e\xffffff85\x13\xffffffd0\x0dK\x011\x03h\x08_$o!u\x07\xffffffb5\x0a\xffffffa6!\xffffffde\x04\xffffffc6\x0d\xffffffd5\x1d\xffffffa8\x1a\xffffffc9\x12s\x02\xfffffff8\x15\xffffffa0\x04%$\xfffffff0\x1f\xffffffd8\x12\xfffffff5\x17\xffffffd8\x05\xffffff96"\xffffffe7\x03\xfffffff5\x1a\xfffffff3\x1a\x19\x0fR\x19w\x1d\xffffffc5\x0f\xffffffe0\x05r\x03G\x0a\x1e\x062\x06\x07\x06\xffffff9e\x17\xffffffab\x11Y\x1eg\x1c\xffffff82\x15\xffffffb0\x09\xffffffdc\x03\xffffff8b\x0e\xffffffe9\x14\xffffffca\x05E\x08n\x07\xffffffc1\x08\xffffffc2\x11\xffffffc9\x05\x1f$*\x08\xffffffc6\x0b\xffffff8a\x04\xffffffb9 ,$\xffffffd3\x0cR\x12\xffffffb7\x08x\x0d\xffffffa8$,\x1d\x03\x05\x0b\x0a\xffffffcb\x06\x00\x03\xfffffff1\x14\xfffffffa\x0az\x06\xffffff81!/\x1c\x14\x11\xffffffab\x1a\xffffffb4\x12I\x03\xffffffff\x1cn\x10\xffffffe3\x15\xffffff89\x06\xffffffe3\x08B"\x19\x02\xffffff88\x1a\xffffff87\x00\x0d\x1d,\x0b\xffffffe6"\xffffffeb%k\x1e\x18\x08@\x0a\xffffff9f\x10\x1c!G\x14\xffffffff\x05\xffffffe5&o\x0ep\x0d\x01\x06p\x08\xffffffa3#O\x06d!\xffffffaf\x03\xffffffce\x00\x02\x0e\xffffff9e\x19\xffffffd6 \x1c"y'\x07\x00\xffffffbb\x1f\xffffff99%.\x0a\xffffffa7%#\x1e\xfffffffa\x0a\xffffff84\x1cl\x19P\x18\x19\x12\xffffff99\x11\x16\x18\xffffffdb\x1c\xffffffe1\x0f\xfffffff0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\xffffff9f\xffffff97' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	383	Bind	 "" "_pg3_3" 3 1 1 1 3 336 '\x00\xffffffa4\x00\xffffffa3\x00\x00\x00\x00\x00\xffffffcd\x19\x0a\x16\xffffffd6\x09\xffffffdf\x16\xffffff86\x04t\x0b4 \xffffffff\x12\xffffff8c&M\x00\x1f\x1b\xffffff81\x10\x00"\xffffffb0\x17\xffffffd8\x18\xffffffe5\x14\x11\x12|\x0b+\x14\xffffffed\x19\x07\x15\xfffffff3\x1d:\x1d\xffffffb2\x19\xffffffca\x0d\xffffffe2\x06\xffffff99&\x1e\x18w#\xffffffeb$H\x1b1\x09\xffffffbc\x01N$\xffffffc1\x15\xffffffc6 \xffffffa1\x18)\x0e\xffffff9c"\xffffffcd\x08r\x0d\xffffffa4\x01F\x01'\x05'%V\x00\xfffffff4 \xffffffac\x10\xffffffac\x02\x12\x14U!*\x04\xffffffc8\x1d\xffffffd9\x15w\x12\xffffffb0\x0e\x11%\xffffffba\x18\xffffffc7\x11\xffffff9f\x1d\xffffffbc\x1aL\x18\xffffffc4\x07\x02\x18\xffffffd0\x07\xffffffc6\x1c\xffffffa2!\xffffffa7"U\x11\xffffffd8\x15\xffffffde&e\x0d\xffffffae\x09\x00\x0b9#G\x1a\xffffff9f\x0f\xffffffb8\x14N\x13\xffffffa4\x18\xfffffffa\x1b<\x1fk\x0cT\x15\x1f#5\x1b\\x1d\xffffff8c\x19\x08\x12'\x06\x0e%\x0c\x01C$\x0c\x0d\xffffffa9'\x00\x18b\x08s\x1c\x06 k \xffffffc0\x13v\x17D\x10\xfffffff7'\x00\x0b\x02\x13\xffffffa2\x1c'\x11\xffffffb2\x1d5$v\x0d}\x08}!c\x1b\xfffffff2$\x18\x1fi\x07\xffffffe0\x03E#\x01\x18\xffffffe7\x1cP\x13"\x1eh\x02\xffffffee\x0ay\x01\x1b\x1ev#7\x1b\xfffffff9$\xffffff83\x19\x18\x1e^\x07\xfffffff0\x11n\x17M\x03\xffffff85$\xffffffcc\x1e\xffffffc2%R\x12\x06\x09Q\x03\xffffffad\x18\xffffffac$@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\xffffffa6\x1c' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	385	Bind	 "" "_pg3_3" 3 1 1 1 3 338 '\x00\xffffffa5\x00\xffffffa4\x00\x00\x00\x00\x00\x06\x14D\x08k\x0c\xffffffbe\x04\xffffffd9\x1e\xffffffb2\x05\xffffffbe$\xffffffcf\x1e\xffffffa9\x152\x1f\xfffffffa\x002\x08N"\x09\x14!\x142\x01\xffffff93!\xffffff83\x00\xffffff86\x19H \xffffffb4\x04\xffffffad\x05\xffffffb4\x07\xfffffff3\x00+\x0b\xffffff82\x1a\x0f\x16Z\x0d\xffffff9c\x16\x1e\x13\xfffffff5\x11\xffffffa4\x1a;&\xfffffff6\x18 \x0b\x0d\x1c6\x1f\xffffffa1\x02\xfffffffb\x16\xffffffe6\x10}\x15X\x1b>\x0d\x17\x0d\xffffffe4 \xffffffe9&\xffffffa5\x1d\xffffffbd\x05\xfffffff3\x0b\xffffff9c\x1f\xffffffef\x00\xfffffffe\x05X :\x09C\x08\x12\x19\xfffffff2\x07\x1f\x06\xfffffffb\x03j\x00\xffffffe4\x0c\xffffff91\x10\xffffff94&l"\xffffffc3\x0e?\x04&\x0f+\x04\xffffffd1\x18q )\x13\x0d\x17\x10\x00\xffffffcf\x01\xffffffcb\x04\x03\x0b\xffffffe3\x01\xffffffe2\x16\xffffff8c\x1e\xfffffff5\x0f\xffffffee\x1b\xffffffcf\x01z&\x03\x02o\x10\xffffffd0\x1c\xffffffaf\x01\xfffffffb\x1f7\x05\xffffffcb\x0cL\x06r\x19&\x0a{\x15\x0a"\xffffffa1\x14\x05"N\x17\x0a\x11E\x04\x18\x1e\xffffffcd%\x0a\x1f\xfffffffd\x1b\xffffff87\x13\xffffff99\x0d\xffffff89\x0d\xffffff8e\x12\xffffff9a\x18g\x01\xfffffff8#\x1b\x12=#\xffffff97%\xffffff99\x1f\xffffffae$v#d#\xffffff8a\x15\xffffffed\x03G\x04P\x1e[\x0b`\x1d\x7f\x1e\xffffff9a&\xffffff9d&\xffffffe6\x08\xffffffcb\x1f.\x01M\x0c\xffffff82\x19\xfffffffe\x11F\x10\xffffffbd\x12#\x03\xffffffa5\x17\x1b\x18\xfffffff5\x18\xffffffd8"<\x0a\xffffff99\x17\xffffffba!1\x09\xffffffa2\x06\xffffffe0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\xffffffa6\xffffffe6' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
[ ... ]

We can see that the client sends more than 900 messages before the server replies (with the same number of messages). Clearly, this can have a huge impact on performance, especially when network latency matters. And indeed, this runs twice faster even though the Postgres server is on localhost!

What’s actually happening is that the client sends as many queries as possible, until the server decides it cannot manage more (in general because its output buffer is full, typically here because of the large integers we’re inserting), at which point the server sends back the results of all queries; rinse and repeat. Instead of producing small and frequent client/server round-trips, the pipeline mode optimizes network communication by producing large and scarce round-trips. The “downside” (remember we got a 2x speed-up) is that the client program needs to handle more data in memory in general.

How does it work?

As mentioned earlier, the entry point for the pipeline mode is the pipeline() method on Connection object which enters and exists pipeline mode. But what does this mean? Well, basically, this involves calling underlying PQ{enter,exit}PipelineMode functions.

But this does not tell much about how things work in Psycopg.

To actually understand how things work, we need to step back and read libpq pipeline mode documentation, in which section “Interleaving Result Processing and Query Dispatch” states:

The client application should generally maintain a queue of work remaining to be dispatched and a queue of work that has been dispatched but not yet had its results processed. When the socket is writable it should dispatch more work. When the socket is readable it should read results and process them, matching them up to the next entry in its corresponding results queue.

As often with PostgreSQL, everything is there. But this paragraph is somehow enigmatic. However, it, in fact, describes the heart of the algorithm for the Psycopg driver (though it took us a while to grasp all the details implied by these few sentences…).

Socket communication

When the socket is writable it should dispatch more work. When the socket is readable it should read results […].

In Psycopg, socket communication for exchanging libpq messages is implemented through waiting functions and generators that are tied together by the I/O layer (either blocking or async): this is explained in details in Daniele’s blog post.

What’s important for the pipeline mode (mostly) is the generator part, as it is responsible for dispatching queries to or reading results from the socket. In contrast with normal query mode, where these steps are handled sequentially by independent logic, the pipeline mode needs interleaving result processing and query dispatch: this is implemented by the pipeline_communicate() generator. Without going too much into the details, we can notice that:

  • the function takes a queue of “commands”, e.g. pgconn.send_query_params() or similar,
  • it continuously waits for the socket to be either Read or Write ready (or both) (ready = yield Wait.RW),
  • when the socket is Read-ready (if ready & Ready.R:), results are fetched (calling pgconn.get_result()),
  • when the socket is Write-ready (if ready & Ready.W:), commands are sent (calling pgconn.flush() to flush the queue of previously sent commands, and then calling any pending one),
  • until the queue of commands gets empty.

Queueing work, processing results

Around the pipeline_communicate() generator described above, we need to handle the commands queue as well as the queue of results pending processing. The first part, filling the commands queue, is simply managed by stacking commands instead of directly calling them along with keeping a reference of the cursor used for execute(). The second part implies handling the output of pipeline_communicate() generator described above, a list of PGresult. Each fetched result item:

  • is possibly bound back to its respective cursor (the one where respective execute() originates from),
  • might trigger an error if its status is non-OK (e.g. FATAL_ERROR).

All this is handled in methods of the BasePipeline class (see methods prefixed with an _ at the end).

Integration with high-level features: transactions

Beside the low-level logic described above, implementing pipeline mode in Psycopg implied handling some Psycopg-specific features such as: transactions.

Transactions need special attention because of how error handling works in the pipeline mode. There is a few distinct cases that need to be handled properly, depending on whether the pipeline uses an implicit transaction or if it contains explicit transactions. But the general rule is that when an error occurs, the pipeline gets in aborted state meaning subsequent commands are skipped and prior statements might get persisted or not (depending on the usage of explicit transactions or not).

Consider the following statements, executed within a pipeline:

BEGIN;  # transaction 1
INSERT INTO s VALUES ('abc');
COMMIT;
BEGIN;  # transaction 2
INSERT INTO no_such_table VALUES ('x');
ROLLBACK;
BEGIN;  # transaction 3
INSERT INTO s VALUES ('xyz');
COMMIT;

SELECT * from s;
-> abc

The INSERT INTO no_such_table statement would produce an error, making the pipeline aborted; accordingly, the following explicit ROLLBACK will not be executed. And the next statements (“transaction 3”) will also be skipped.

Another example:

BEGIN;  # main transaction
INSERT INTO s VALUES ('abc');
BEGIN;  # sub-transaction
INSERT INTO no_such_table VALUES ('x');
ROLLBACK;
INSERT INTO s VALUES ('xyz');
COMMIT;

SELECT * from s;
-> []

Here, still due to the same error in INSERT INTO no_such_table, the final COMMIT statement is not executed and the main (outer) transaction is not committed (despite the inner sub-transaction is explicitly rolled back).

That’s typically something the user of a high level driver would not want.

In Psycopg, transactions are managed explicitly through the transaction() context manager method on Connection objects. So to preserve a consistent behaviour, its logic needed to be adapted for the pipeline mode. This got achieved by leveraging synchronization points through PQpipelineSync and nested pipelines.

Nested pipelines

In the libpq, there is no such thing as a nested pipeline as the connection can only enter pipeline mode once. What’s referred to as a “nested pipeline” in Psycopg is the operation to “isolate” a sequence of commands in a pipeline session through synchronization points. By doing so, we work around the surprising behaviour described above (where a committed transaction got rolled back). Here’s what happens:

with conn.pipeline():  # emits PQenterPipelineMode
  conn.execute(...)
  with conn.pipeline():  # emits PQpipelineSync
    conn.execute(...)
  # exiting the inner 'with' block emits PQpipelineSync
# exiting the outermost 'with' block emits PQexitPipelineMode

The PQpipelineSync operation resets the pipeline state, thus allowing subsequent commands to be run independently of whether previous ones succeeded or not. (It also triggers results to be sent back from the server, but that’s another matter.)

Pipelined transactions

By using nested pipelines for Psycopg transactions, we typically follow the “logical unit of work” pattern that’s mentioned in libpq pipeline mode documentation:

Pipelines should be scoped to logical units of work, usually (but not necessarily) one transaction per pipeline.

(Except that we’re not strictly use one pipeline per transaction, rather a nested one.)

In practice, it means that it’s safe to use with transaction(): block within a pipeline session as the semantics of both the transaction and the pipeline are preserved: the transaction either succeeds or fails overall, it only gets executed if previous commands in the pipeline session succeeded:

with conn.pipeline():
  conn.execute(...)
  try:
      with conn.transaction():  # implicit nested pipeline (with conn.pipeline())
          conn.execute(...)
  finally:
      # This will be executed independently of whether the previous
      # transaction succeeded or not.
      conn.execute(...)

So back to the (second) example above, if written using Psycopg:

>>> with psycopg.connect(autocommit=True):
...     with conn.pipeline():
...         with conn.transaction():
...             conn.execute("INSERT INTO s VALUES (%s)", ("abc",))
...             try:
...                 with conn.transaction():
...                     conn.execute("INSERT INTO no_such_table VALUES (%s)", ("x",))
...             except errors.UndefinedTable:
...                 pass
...             conn.execute("INSERT INTO s VALUES (%s)", ("xyz",))
...     conn.execute("SELECT * FROM s ).fetchall()
[('abc',), ('xyz',)]

we indeed get the inner transaction rolled back, and the outer one committed, just like without the pipeline mode.

That’s an implementation detail, the user does not need to know about this as the overall behaviour is hopefully natural.


Supporting libpq pipeline mode in Psycopg was a large milestone. It required months of work with a lot of thinking and testing. There is probably more to say about it, like how it transparently manages automatic prepared statement or how executemany() got optimized to use the pipeline mode implicitly (try adapting the demo script above to use it — hint: no need for the with pipeline: block). And be sure to read the Psycopg’s pipeline mode documentation soon!


DALIBO

DALIBO est le spécialiste français de PostgreSQL®. Nous proposons du support, de la formation et du conseil depuis 2005.