diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 44a5985..8d3bc7d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2588,6 +2588,39 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
          Amount of decoded transaction data spilled to disk.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>stream_txns</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Number of in-progress transactions streamed to subscriber after
+         memory used by logical decoding exceeds <literal>logical_decoding_work_mem</literal>.
+         Streaming only works with toplevel transactions (subtransactions can't
+         be streamed independently), so the counter does not get incremented for
+         subtransactions.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>stream_count</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Number of times in-progress transactions were streamed to subscriber.
+         Transactions may get streamed repeatedly, and this counter gets incremented
+         on every such invocation.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>stream_bytes</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Amount of decoded in-progress transaction data streamed to subscriber.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 4ab14eb..042278e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -795,7 +795,10 @@ CREATE VIEW pg_stat_replication_slots AS
             s.name,
             s.spill_txns,
             s.spill_count,
-            s.spill_bytes
+            s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes
     FROM pg_stat_get_replication_slots() AS s;
 
 CREATE VIEW pg_stat_slru AS
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 12d6c59..0a6c452 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1643,7 +1643,7 @@ pgstat_report_tempfile(size_t filesize)
  */
 void
 pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-					   int spillbytes)
+					   int spillbytes, int streamtxns, int streamcount, int  streambytes)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1656,6 +1656,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
 	msg.m_spill_txns = spilltxns;
 	msg.m_spill_count = spillcount;
 	msg.m_spill_bytes = spillbytes;
+	msg.m_stream_txns = streamtxns;
+	msg.m_stream_count = streamcount;
+	msg.m_stream_bytes = streambytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -6674,6 +6677,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		replSlotStats[idx].spill_txns += msg->m_spill_txns;
 		replSlotStats[idx].spill_count += msg->m_spill_count;
 		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+		replSlotStats[idx].stream_txns += msg->m_stream_txns;
+		replSlotStats[idx].stream_count += msg->m_stream_count;
+		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
 	}
 }
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2b216a3..d510068 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1469,12 +1469,19 @@ UpdateSpillStats(LogicalDecodingContext *ctx)
         rb,
         (long long) rb->spillTxns,
         (long long) rb->spillCount,
-        (long long) rb->spillBytes);
+        (long long) rb->spillBytes,
+        (long long) rb->streamTxns,
+        (long long) rb->streamCount,
+        (long long) rb->streamBytes);
 
    pgstat_report_replslot(NameStr(ctx->slot->data.name),
-                          rb->spillTxns, rb->spillCount, rb->spillBytes);
+                          rb->spillTxns, rb->spillCount, rb->spillBytes,
+						   rb->streamTxns, rb->streamCount, rb->streamBytes);
    rb->spillTxns = 0;
    rb->spillCount = 0;
    rb->spillBytes = 0;
+   rb->streamTxns = 0;
+   rb->streamCount = 0;
+   rb->streamBytes = 0;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4ea0356..ac4422b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -347,6 +347,9 @@ ReorderBufferAllocate(void)
 	buffer->spillCount = 0;
 	buffer->spillTxns = 0;
 	buffer->spillBytes = 0;
+	buffer->streamCount = 0;
+	buffer->streamTxns = 0;
+	buffer->streamBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -3496,10 +3499,18 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->snapshot_now = NULL;
 	}
 
+
+	rb->streamCount += 1;
+	rb->streamBytes += txn->total_size;
+
+	/* Don't consider already streamed transaction. */
+	rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1;
+
 	/* Process and send the changes to output plugin. */
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
 
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a677365..0d50e35 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -324,7 +324,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	ConditionVariableBroadcast(&slot->active_cv);
 
 	/* Create statistics entry for the new slot */
-	pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+	pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
 }
 
 /*
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 7cb186e..a26a503 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2096,7 +2096,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_CLOS 4
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 7
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2144,6 +2144,9 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		values[1] = Int64GetDatum(stat.spill_txns);
 		values[2] = Int64GetDatum(stat.spill_count);
 		values[3] = Int64GetDatum(stat.spill_bytes);
+		values[4] = Int64GetDatum(stat.stream_txns);
+		values[5] = Int64GetDatum(stat.stream_count);
+		values[6] = Int64GetDatum(stat.stream_bytes);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a3d94af..b538962 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5255,9 +5255,9 @@
   proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8}',
-  proargmodes => '{o,o,o,o}',
-  proargnames => '{name,spill_txns,spill_count,spill_bytes}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8}',
+  proargmodes => '{o,o,o,o,o,o,o}',
+  proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index cdb9a65..c07cdd6 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -467,6 +467,9 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter	m_spill_txns;
 	PgStat_Counter	m_spill_count;
 	PgStat_Counter	m_spill_bytes;
+	PgStat_Counter	m_stream_txns;
+	PgStat_Counter	m_stream_count;
+	PgStat_Counter	m_stream_bytes;
 } PgStat_MsgReplSlot;
 
 
@@ -787,6 +790,9 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter	spill_txns;
 	PgStat_Counter	spill_count;
 	PgStat_Counter	spill_bytes;
+	PgStat_Counter	stream_txns;
+	PgStat_Counter	stream_count;
+	PgStat_Counter	stream_bytes;
 } PgStat_ReplSlotStats;
 
 /* ----------
@@ -1344,7 +1350,7 @@ extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-								   int spillbytes);
+								   int spillbytes, int streamtxns, int streamcount, int streambytes);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fba950c..edc51b1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -536,6 +536,9 @@ struct ReorderBuffer
 	int64		spillCount;		/* spill-to-disk invocation counter */
 	int64		spillTxns;		/* number of transactions spilled to disk  */
 	int64		spillBytes;		/* amount of data spilled to disk */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamTxns;		/* number of transactions spilled to disk */
+	int64		streamBytes;	/* amount of data streamed to subscriber */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5353f24..197a86c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2011,8 +2011,11 @@ pg_stat_replication| SELECT s.pid,
 pg_stat_replication_slots| SELECT s.name,
     s.spill_txns,
     s.spill_count,
-    s.spill_bytes
-   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes);
+    s.spill_bytes,
+    s.stream_txns,
+    s.stream_count,
+    s.stream_bytes
+   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
