raminqaf commented on code in PR #28166:
URL: https://github.com/apache/flink/pull/28166#discussion_r3252511343
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+ id INT,
+ op STRING,
+ name STRING,
+ event_time TIMESTAMP_LTZ(3),
+ WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream
+ PARTITION BY id
+ ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Assume the watermark strategy advances the watermark `5` minutes behind the
largest observed `event_time`, and the current watermark is `10:00`:
+
+| Incoming row | Current watermark | Outcome |
+|---|---|---|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted
later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped.
Timestamp is below the current watermark. |
Review Comment:
Good idea!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]