raminqaf commented on code in PR #28166:
URL: https://github.com/apache/flink/pull/28166#discussion_r3252511817


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's 
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are 
partitioned across upstream brokers. If the source itself does not guarantee 
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref 
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each 
partition before they are processed. The framework buffers events per partition 
and flushes them to the function in sorted order once the watermark advances. 
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in 
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined 
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+  id INT,
+  op STRING,
+  name STRING,
+  event_time TIMESTAMP_LTZ(3),
+  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+    PARTITION BY id
+    ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Assume the watermark strategy advances the watermark `5` minutes behind the 
largest observed `event_time`, and the current watermark is `10:00`:
+
+| Incoming row | Current watermark | Outcome |
+|---|---|---|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted 
later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. 
Timestamp is below the current watermark. |
+| `+I[id: 7, op: 'INSERT', event_time: '10:11']` | `10:06` | Record `id=6` is 
emitted; this row is buffered until the watermark passes `10:11`. |

Review Comment:
   Goode one! Updated the table and made it more precise and clear. Hope it is 
good now!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to