Showing posts with label Replication performance. Show all posts
Showing posts with label Replication performance. Show all posts

Monday 7 November 2022

Logical Replication Improvements in PostgreSQL-15

There are various areas in PostgreSQL like Partitioning, Logical Replication, Parallel Query, Vacuum, etc. which improve with each new version. In this blog, I'll summarize the various enhancements in Logical Replication that users could see in the recently released PostgreSQL 15. You can read the enhancements in this area in the previous release in one of my previous blogs.

Allow replication of prepared transactions:

In the last release, we allowed logical decoding of prepared transactions and with this release, we added the support to replicate prepared transactions to built-in logical replication. Previously, we send the changes of the prepared transaction only once the commit prepared had been done. Users can enable replication at PREPARE time with the following syntax:

CREATE PUBLICATION  mypub FOR ALL TABLES;

CREATE SUBSCRIPTION mysub CONNECTION 'dbname=postgres' PUBLICATION  mypub WITH (two_phase = true);

The key advantages of this feature are:

(a) Reduces the lag to replicate data by replicating it at PREPARE time instead of waiting till the COMMIT PREPARED

(b) This provides the base to build conflict-free logical replication because if the prepare fails on subscriber nodes then one can rollback it on the publisher node as well.

The key implementation points:

(a) The replication of prepared transactions is enabled once the initial sync for all the tables is finished.

(b) To avoid conflicts in the prepared transaction during APPLY, we use the prepare identifier as pg_gid_<subscriber-id>_<transaction-id>.

(c) It is not allowed to change this option with ALTER SUBSCRIPTION command.

(d) ALTER SUBSCRIPTION REFRESH PUBLICATION is allowed with copy_data=false once the two_phase is enabled for a subscription.

For a detailed description of this feature, see the blog.

Allow replication of all tables in the schema:

Previously, one needs to specify all the tables of a particular schema while creating a publication if they want to publish all tables of that schema. Then, if later, the user creates more tables in that schema, they also need to be added to the publication separately. This would be inconvenient for users. This feature makes it much easier for users by allowing them to specify just the schema name in case they want all tables of the schema to be published. The syntax to specify schema name (TABLES IN SCHEMA) is as follows:

CREATE PUBLICATION mypub FOR TABLES IN SCHEMA mysch;

CREATE PUBLICATION mypub FOR TABLE mytab, TABLES IN SCHEMA mysch;

Note that it is allowed to specify schemas with individual tables from other schemas. Users can add the schemas to existing publications with the following syntax:

ALTER PUBLICATION mypub ADD TABLES IN SCHEMA mysch;

Note that adding schemas to a publication that is already subscribed to by some subscribers will require an ALTER SUBSCRIPTION … REFRESH PUBLICATION action on the subscriber side in order to become effective.

For a detailed description of this feature, see the blog.

Allow specifying row filters for logical replication of tables:

This feature allows specifying an additional WHERE clause after each table in the publication definition. Rows that don't satisfy this WHERE clause will be filtered out. This allows a set of tables to be partially replicated. The row filter is per table. The WHERE clause must be enclosed in parentheses. Users can define row filters with the following command:

CREATE PUBLICATION mypub FOR TABLE mytab1 WHERE (c1 > 10 and c2 < 20), mytab2 WHERE (c3 LIKE 'bob');

Users are allowed to specify row filters for existing tables in publication with the command:

ALTER PUBLICATION mypub SET TABLE mytab1 WHERE (c1 > 10 and c2 < 20), mytab2 WHERE (c3 LIKE 'bob');

This can help distribute data among nodes, improve performance by sending data selectively, and by hiding some sensitive data.

Key points to note about this feature:

(a) The row filter WHERE clause for a table added to a publication that publishes UPDATES and/or DELETES must contain only columns that are covered by REPLICA IDENTITY.

(b) The row filter WHERE clause for a table added to a publication that publishes INSERT can use any column.

(c) Row filters are ignored for TRUNCATE TABLE commands.

(d) If the row filter evaluates to NULL, it is regarded as "false" aka the corresponding row won't be replicated.

(e) The WHERE clause only allows simple expressions that don't have user-defined functions, user-defined operators, user-defined types, user-defined collations, non-immutable built-in functions, or references to system columns.

(f) During initial table synchronization, only data that satisfies the row filters is copied to the subscriber.

(g) For partitioned tables, the publication parameter publish_via_partition_root determines if it uses the partition's row filter (if the parameter is false, the default) or the root partitioned table's row filter.

For a detailed description of this feature, see docs and blog.

Allow specifying column lists for logical replication of tables:

This feature allows specifying an optional column list when adding a table to logical replication. Columns not included in this list are not sent to the subscriber, allowing the schema on the subscriber to be a subset of the publisher schema. The choice of columns can be based on behavioral or performance reasons. Users can define column lists with the following syntax:

CREATE PUBLICATION mypub FOR TABLE mytab1 (c1, c2), mytab2 (c3);

Users are allowed to specify column lists for existing tables in publication with the command:

ALTER PUBLICATION mypub SET TABLE mytab1 (c1, c2);

Key points to note about this feature:

(a) If a publication publishes UPDATES and/or DELETES, any column list must include the table's replica identity columns.

(b) If a publication publishes only INSERT operations, then the column list may omit replica identity columns.

(c) Column lists are ignored for TRUNCATE TABLE commands.

(d) A column list can contain only simple column references.

(e) A column list can't be specified if the publication also publishes FOR TABLES IN SCHEMA.

(f) During initial data synchronization, only the published columns are copied.

(g) For partitioned tables, the publication parameter publish_via_partition_root determines which column list is used. If publish_via_partition_root is true, the root partitioned table's column list is used. Otherwise, if publish_via_partition_root is false (the default), each partition's column list is used.

For a detailed description of this feature, see docs and blog.

Allows logical replication to run as the owner of the subscription:

Previously, the subscription's APPLY process will run with the privileges of a superuser but now with PostgreSQL 15, it will run with the privileges of the subscription owner. So, this would prevent logical replication workers from performing insert, update, delete, truncate, or copy commands on tables unless the subscription owner has permission to do so. We allow only superusers, roles with bypassrls, and table owners can replicate into tables with row-level security policies.

The purpose of this work is to allow subscriptions to be managed by non-superusers and protect servers with subscriptions from malicious activity on the publisher side.

Conflict Resolution:

The conflicts can happen due to various reasons like PRIMARY KEY violation, schema being different, etc. during apply of transactions in subscriber. By default, PostgreSQL will keep retrying the operation on an error. Before PostgreSQL 15, users have the following options (a) They can manually remove the conflicting data to allow replication to proceed. (b) Use pg_replication_origin_advance() to advance the LSN to a location beyond the failed transaction so that on restart replication starts from a point after the conflicting transaction.

It is quite inconvenient for users to use any of these methods because for option (a) users are forced to remove/change data on subscribers even though they want corresponding data from the publisher to be ignored. To use option (b), users need to find the LSN of the failing transaction probably by using pg_waldump or some other tool on the publisher side, and also the origin information is not apparent as it was generated internally for the purpose of replication. While using pg_replication_origin_advance(), if users by mistake set the wrong LSN (either of a future commit or of some operation in-between the transaction) then the system can omit the data that it was not supposed to leading to an inconsistent replica.

The other problem is that the system will keep retrying to apply the transaction even when it can't succeed without the users intervention and users don't have any way to stop it apart from manually disabling the subscription by using ALTER SUBSCRIPTION mysub DISABLE;

In PostgreSQL 15, we tried to make the use of pg_replication_origin_advance() easier by providing the required information and by providing a similar but more robust way. The other feature it provides is to allow subscriptions to be disabled on error.

Introduced a new subscription option 'disable_on_error' which allows subscription to be automatically disabled if any errors are detected by subscription workers during data replication from the publisher. Users can specify this option either during CREATE SUBSCRIPTION or in ALTER SUBSCRIPTION command.

CREATE SUBSCRIPTION mysub CONNECTION '…' PUBLICATION mypub WITH (disable_on_error = true);

ALTER SUBSCRIPTION mysub SET (disable_on_error = true);

Then, we extended the error context information of subscription worker error by adding (a) Finish LSN. It will indicate commit_lsn for committed transactions, and prepare_lsn for prepared transactions. (b) Replication origin name. This will contain the name of the replication origin that keeps track of replication progress and is created automatically with the subscription definition.

The extended error context information can make the use of pg_replication_origin_advance() easier for users.

Then, we also introduced a more robust way to skip the conflicting transactions by using the command: ALTER SUBSCRIPTION mysub SKIP (lsn = '0/1566D10'); This is more robust because it will prevent users to set some wrong LSN by performing checks against the specified LSN. We do ensure that the specified LSN must be the same as the first transaction's finish LSN that is sent by the publisher after the restart. The first transaction successfully applied to the subscriber will clear the specified LSN. We also ensure that the specified LSN must be greater than the origin's current LSN.

For a detailed description of this feature, see blogs [1] and [2].

pg_stat_subscription_stats:

A new view that shows stats about errors that occurred during the application of logical replication changes or during initial table synchronization. See docs for more information on this.

Communication improvements between publisher and subscriber:

PostgreSQL has made enhancements in communication to (a) prevent sending the transaction BEGIN/END messages where all the transaction data is filtered, and (b) prevent replication to restart due to timeouts while processing large transactions where most or all the data is filtered.

Before PostgreSQL 15, we use to send BEGIN/END messages for empty transactions (where all changes are skipped/filtered) which waste a lot of CPU cycles and network bandwidth to build and transmit such messages. To avoid sending messages for empty transactions, we started sending the BEGIN message only with the first change transmitted from publisher to subscriber and then we allow to send the END (COMMIT) message only when BEGIN is sent. To avoid any delays in synchronous replication, we do send a keepalive message after skipping an empty transaction and process its feedback.

While processing long transactions where most of the changes are filtered due to say the particular operations are not published, the publisher doesn't send any communication to the subscriber which times out after a certain threshold time leading to a restart of replication. To fix this, we start periodically sending keep_alive messages in such cases.

For a detailed description of this work, see the blog.

I believe this is a good mix of improvements for logical replication in PostgreSQL 15 which will help users. Your feedback here or on PostgreSQL mailing lists is welcome!

Wednesday 15 September 2021

Logical Replication Improvements In PostgreSQL-14

In the upcoming release of PostgreSQL-14, we will see multiple enhancements in Logical Replication which I hope will further increase its usage. This blog is primarily to summarize and briefly explain all the enhancements in Logical Replication.

Decoding of large transactions:

Allow streaming large in-progress transactions to subscribers. Before PostgreSQL-14, the transactions were streamed only at commit time which leads to a large apply lag for large transactions. With this feature, we will see apply lag to be reduced, and in certain scenarios that will lead to a big performance win. I have explained this feature in detail in my previous blog.

Performance of logical decoding:

Reduced the CPU usage and improve decoding performance of transactions having a lot of DDLs. It has been observed that decoding of a transaction containing truncation of a table with 1000 partitions would be finished in 1s whereas before this work it used to take 4-5 minutes. Before explaining, how we have achieved this performance gain, let me briefly tell what an invalidation message is in PostgreSQL as that is important to understand this optimization. These are messages to flush invisible system cache entries in each backend session. We normally execute these at the command end in the backend which generated them and send them at the transaction end via a shared queue to other backends for processing. These are normally generated for insert/delete/update operations on system catalogs which happens for DDL operations.

While decoding we use to execute all the invalidations of an entire transaction at each command end as we had no way of knowing which invalidations happened before that command. Due to this, transactions involving large amounts of DDLs use to take more time and also lead to high CPU usage. But now we know specific invalidations at each command end so we execute only required invalidations. This work has been accomplished by commit d7eb52d718.

Initial table sync:

The initial table synchronization involves copying the initial snapshot of the table by the table sync worker and then the table is brought up to a synchronized state with the main apply worker. This whole work use to be done in a single transaction using a temporary replication slot which has major drawbacks: (a) The slot will hold the WAL till the entire sync is complete. (b) Any error during the sync phase will rollback the entire copy which is painful for large copies. (c) There is a risk of exceeding the CID limit.

We did below improvements to overcome the drawbacks:
Allowed multiple transactions in tablesync phase.
Used permanent slots and origins to track the progress of tablesync.

This work has been explained in detail in the blog. This work has been accomplished by commit ce0fdbfe97.

Logical decoding of two-phase commits:

This will allow us to decode the transactions at prepare time and send the same to the output plugin instead of doing it at commit time. This will allow the plugins to decipher the transaction at prepare time and route it to another node if required. This has two advantages (a) allows two-phase distributed transactions across multiple nodes via logical replication, (b) reduces the apply-lag by sending and replaying the transaction on another node at prepare time.

This work has been explained in detail in the blog. This work has been accomplished by commits [1][2][3].

Monitor logical decoding:

Replication slots are used to keep state about replication streams originating from this cluster. Their primary purpose is to prevent the premature removal of WAL. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.

We have added a system view pg_stat_replication_slots to report replication slot activity. This can be used to monitor the amount of data streamed to output plugin or subscriber, spilled to disk. Additionally user can monitor total amount of transaction data decoded for sending transactions to the decoding output plugin while decoding changes from WAL for this slot. Note that this includes data that is streamed and/or spilled. The function pg_stat_reset_replication_slot() resets slot statistics.

Example:

CREATE TABLE stats_test(data text); 
SET logical_decoding_work_mem to '64kB';
SELECT 'init' FROM pg_create_logical_replication_slot('slot_stats', 'test_decoding');
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
SELECT count(*) FROM pg_logical_slot_peek_changes('slot_stats', NULL, NULL, 'skip-empty-xacts', '1');

SELECT slot_name, spill_txns, spill_count, spill_bytes, total_txns, total_bytes FROM pg_stat_replication_slots; 
 slot_name  | spill_txns | spill_count | spill_bytes | total_txns | total_bytes
------------+------------+-------------+-------------+------------+-------------
 slot_stats |          1 |          12 |      763893 |          1 |      763893
(1 row) 

DROP TABLE stats_test; 

SELECT pg_drop_replication_slot('slot_stats'); 

Allow publications to be easily added and removed:

Currently, if the user needs to add/remove additional publications to a subscription, she needs to mention all the existing publications along with it. Consider a case where a subscription is subscribed to two publications and we want to add an additional publication to it then the user needs to mention all the three (two previous and one new) while doing Alter Subscription. The same is explained with an example below:

Initial Subscription
CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION mypub1, mypub2;

Add a new Publication mypub3.
ALTER SUBSCRIPTION mysub SET PUBLICATION mypub1, mypub2, mypub3;

This could be inconvenient for users especially if there are many existing publications to which a subscription is subscribed. We have added a new way to make this easier by supporting ADD/DROP individual publications. See docs for the syntax. With the new way, in the above case, to add a new publication, the user needs to perform 
ALTER SUBSCRIPTION mysub ADD mypub3;

This work has been accomplished by commit  82ed7748b7.

Binary transfer mode:

This feature provides an option during Create/Alter Subscription to allow data from publishers to be sent in binary format. The default value of this option is false. Even when this option is enabled, only data types that have binary send and receive functions will be transferred in binary. When doing cross-version replication, if the subscriber lacks a binary receive function for the type, the data transfer will fail, and this option can't be used. This mode is generally faster. Example to enable binary mode:

CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION mypub1 WITH (binary = true);

This work has been accomplished by commit 9de77b5453.

Allow to get messages via pgoutput:

Provide a “messages” option to the pgoutput plugin. This allows logical decoding messages (i.e. generated via pg_logical_emit_message) to be sent to the slot consumer. This is useful for pgoutput plugin users that use it for Change Data Capture. An example of the same is given below:

SELECT pg_create_logical_replication_slot('pgout_slot','pgoutput');

SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')

SELECT get_byte(data, 1), encode(substr(data, 24, 23), 'escape')  FROM pg_logical_slot_peek_binary_changes('pgout_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pgout_slot', 'messages', 'true') OFFSET 1 LIMIT 1;

 get_byte |         encode
----------+-------------------------
        1 | a transactional message
(1 row)

SELECT pg_drop_replication_slot('pgout_slot');
While getting changes, the publication_names is not required for logical decoding messages but is specified just so that function doesn't give an error. You can refer to logical replication message formats in the PostgreSQL docs. This work has been accomplished by commit ac4645c015.

Saturday 17 July 2021

Logical Replication Of In-Progress Transactions

Logical Replication was introduced in PostgreSQL-10 and since then it is being improved with each version. Logical Replication is a method to replicate the data selectively unlike physical replication where the data of the entire cluster is copied. This can be used to build a multi-master or bi-directional replication solution. One of the main differences as compared with physical replication was that it allows replicating the transaction only at commit time. This leads to apply lag for large transactions where we need to wait to transfer the data till the transaction is finished. In the upcoming PostgreSQL-14 release, we are introducing a mechanism to stream the large in-progress transactions. We have seen the replication performance improved by 2 or more times due to this for large transactions especially due to early filtering. See the performance test results reported on hackers and in another blog on the same topic. This will reduce the apply lag to a good degree.

The first thing we need for this feature was to decide when to start streaming the WAL content. One could think if we have such a technology why not stream each change of transaction separately as and when we retrieve it from WAL but that would actually lead to sending much more data across the network because we need to send some additional transaction information with each change so that the apply-side can recognize the transaction to which the change belongs. To address this, in PostgreSQL-13, we have introduced a new GUC parameter logical_decoding_work_mem which allows users to specify the maximum amount of memory to be used by logical decoding, before which some of the decoded changes are either written to local disk or stream to the subscriber. The parameter is also used to control the memory used by logical decoding as explained in the blog.

The next thing that prevents incremental decoding was the delay in finding the association of subtransaction and top-level XID. During logical decoding, we accumulate all changes along with its (sub)transaction. Now, while sending the changes to the output plugin or stream to the other node, we need to combine all the changes that happened in the transaction which requires us to find the association of each top-level transaction with its subtransactions. Before PostgreSQL-14, we build this association at XLOG_XACT_ASSIGNMENT WAL record which we normally log after 64 subtransactions or at commit time because these are the only two times when we get such an association in the WAL. To find this association as it happened, we now also write the assignment info into WAL immediately, as part of the first WAL record for each subtransaction. This is done only when wal_level=logical to minimize the overhead.

Yet, another thing that is required for incremental decoding was to process invalidations at each command end. The basic idea of invalidations is that they make the caches (like relation cache) up-to-date to allow the next command to use up-to-date schema. This was required to correctly decode WAL incrementally as while decoding we will use the relation attributes from the caches. For this, when wal_level=logical, we write invalidations at the command end into WAL so that decoding can use this information. The invalidations are decoded and accumulated in top-transaction, and then executed during replay. This obviates the need to decode the invalidations as part of a commit record.

In previous paragraphs, the enhancements required in the server infrastructure to allow incremental decoding are explained. The next step was to provide APIs (stream methods) for out-of-core logical replication to stream large in-progress transactions. We added seven methods to the output plugin API to allow this. Those are: (stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb and stream_change_cb) and two optional callbacks (stream_message_cb and stream_truncate_cb). For details about these APIs, refer to PostgreSQL docs.

When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded changes are transmitted, the transaction can be committed using the stream_commit_cb callback (or possibly aborted using the stream_abort_cb callback). One example sequence of streaming transaction may look like the following:

/* Change logical_decoding_work_mem to 64kB in the session */
postgres=# show logical_decoding_work_mem;
 logical_decoding_work_mem
---------------------------
 64kB
(1 row)
postgres=# CREATE TABLE stream_test(data text);
CREATE TABLE
postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ?column?
----------
 init
(1 row)
postgres=# INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 500) g(i);
INSERT 0 500
postgres=# SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '1', 'skip-empty-xacts', '1', 'stream-changes', '1');
                       data
--------------------------------------------------
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 committing streamed transaction TXN 741
(505 rows)

The actual sequence of callback calls may be more complicated depending on the server operations. There may be blocks for multiple streamed transactions, some of the transactions may get aborted, etc.

Note that streaming is triggered when the total amount of changes decoded from the WAL (for all in-progress transactions) exceeds the limit defined by the logical_decoding_work_mem setting. At that point, the largest top-level transaction (measured by the amount of memory currently used for decoded changes) is selected and streamed. However, in some cases we still have to spill to disk even if streaming is enabled because we exceed the memory threshold but still have not decoded the complete tuple e.g., only decoded toast table insert but not the main table insert or decoded speculative insert but not the corresponding confirm record. However, as soon as we get the complete tuple we stream the transaction including the serialized changes.

While streaming in-progress transactions, the concurrent aborts may cause failures when the output plugin (or decoding of WAL records) consults catalogs (both system and user-defined). Let me explain this with an example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple and after that we will have two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction say 502 updates the same catalog tuple then the first tuple will be changed to (xmin: 500, xmax: 502). So, the problem is that when we try to decode the tuple inserted/updated in 501 after the catalog update, we will see the catalog tuple with (xmin: 500, xmax: 502) as visible because it will consider that the tuple is deleted by xid 502 which is not visible to our snapshot. And when we will try to decode with that catalog tuple, it can lead to a wrong result or a crash.  So, it is necessary to detect concurrent aborts to allow streaming of in-progress transactions. For detecting the concurrent abort, during catalog scan we can check the status of the xid and if it is aborted we will report a specific error so that we can stop streaming current transaction and discard the already streamed changes on such an error. We might have already streamed some of the changes for the aborted (sub)transaction, but that is fine because when we decode the abort we will stream the abort message to truncate the changes in the subscriber.

To add support for streaming of in-progress transactions into the built-in logical replication, we need to primarily do four things:

(a) Extend the logical replication protocol to identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). Refer to PostgreSQL docs for the protocol details. 

(b) Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol.

(c) Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit.

(d) Provide a new option for streaming while creating a subscription.

The below example demonstrates how to set up the streaming via built-in logical replication:

Publisher node:

Set logical_decoding_work_mem = '64kB';
# Set up publication with some initial data

CREATE TABLE test_tab (a int primary key, b varchar);
INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar');
CREATE PUBLICATION tap_pub FOR TABLE test_tab;

Subscriber node:

CREATE TABLE test_tab (a int primary key, b varchar);
CREATE SUBSCRIPTION tap_sub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION tap_pub WITH (streaming = on);

Publisher Node:

# Ensure the corresponding replication slot is created on publisher node
select slot_name, plugin, slot_type from pg_replication_slots;
 slot_name |  plugin  | slot_type
-----------+----------+-----------
 tap_sub   | pgoutput | logical
(1 row)

# Confirm there is no streamed bytes yet
postgres=# SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           0 |            0 |            0
(1 row)

# Insert, update and delete enough rows to exceed the logical_decoding_work_mem (64kB) limit.
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;

# Confirm that streaming happened
SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           1 |           22 |      1444410
(1 row)

Subscriber Node:
# The streamed data is still not visible.
select * from test_tab;
 a |  b
---+-----
 1 | foo
 2 | bar
(2 rows)

Publisher Node:
# Commit the large transactions
Commit;

Subscriber Node:
# The data must be visible on the subscriber
select count(*) from test_tab;
 count
-------
  3334
(1 row)

This feature was proposed in 2017 and committed in 2020 as part of various commits 0bead9af48c55040ccd045fdc9738b7259736a6e, and 464824323e. It took a long time to complete this feature because of the various infrastructure pieces required to achieve this. I would really like to thank all the people involved in this feature especially Tomas Vondra who has initially proposed it and then Dilip Kumar who along with me had completed various remaining parts and made it a reality. Then also to other people like Neha Sharma, Mahendra Singh Thalor, Ajin Cherian, and Kuntal Ghosh who helped throughout the project to do reviews and various tests. Also, special thanks to Andres Freund and other community members who have suggested solutions to some of the key problems of this feature. Last but not least, thanks to EDB and Fujitsu's management who encouraged me and some of the other members to work on this feature.