Dear Flink Community,
I'm writing to inquire about the integration between Apache Flink's Delta Join
and Apache Fluss, specifically regarding UPDATE_BEFORE record support.
Background
According to Fluss community feedback, Fluss 0.8 has adjusted its source
to output UPDATE_BEFORE change types (while removing DELETE type output). They
mentioned that Flink 2.2 would add integration support for UPDATE_BEFORE.
Fluss 0.8 Documentation Note:
Fluss 0.8 documentation states:
Supported Features
- Support for optimizing a dual-stream join from CDC sources that do not
include delete messages into a delta join.
- Disable delete on the source table to guarantee there is no delete
message in the table, by adding the option `'table.delete.behavior' =
'IGNORE'` or `'DISABLE'` on the table.
- The source table is no more required to be a `first_row` merge
engine table since this version.
Current Issue
However, after examining the Flink 2.2 codebase (specifically
`DeltaJoinUtil.java`), I found that the
`onlyProduceInsertOrUpdateAfter` method still explicitly excludes
UPDATE_BEFORE:
```java
private static boolean
onlyProduceInsertOrUpdateAfter(StreamPhysicalRel node) {
ChangelogMode changelogMode = getChangelogMode(node);
Set<RowKind> allKinds = changelogMode.getContainedKinds();
return !allKinds.contains(RowKind.UPDATE_BEFORE) &&
!allKinds.contains(RowKind.DELETE);
}
```
And in `areAllInputsInsertOrUpdateAfter`:
```java
private static boolean
areAllInputsInsertOrUpdateAfter(StreamPhysicalJoin join) {
for (RelNode input : join.getInputs()) {
if (!onlyProduceInsertOrUpdateAfter(unwrapNode(input, false))) {
return false;
}
}
return true;
}
```
This means Delta Join still only accepts INSERT and UPDATE_AFTER records, not
UPDATE_BEFORE.
Questions
1. Integration Gap: Is there a gap between Fluss 0.8's UPDATE_BEFORE support
and Flink's current Delta Join implementation? Or is this support planned for a
future release?
2. Timeline: If UPDATE_BEFORE support is planned, what is the expected timeline
for this integration?
3. Workaround: For streaming scenarios that require handling UPDATE_BEFORE
records in multi-level Delta Join chains, what is the recommended approach
currently?
Use Case
We're building real-time CDC data pipelines where:
- First level: Delta Join processes CDC streams
- Second level: Needs to join intermediate results (which may also contain
UPDATE_BEFORE and UPDATE_AFTER)
This creates a fundamental limitation for building the realtime wide table.
Environment
- Flink Version: 2.2
- Fluss Version: 0.8
- Use Case: Real-time CDC processing with multi-level joins
Thank you for your time and guidance on this matter. I'm really excited about
the potential of Delta Join and look forward to receiving more updates on this
matter.
Best regards,
Rise
Rise
[email protected]