Dear Flink Community,

I'm writing to inquire about the integration between Apache Flink's Delta Join 
and Apache Fluss, specifically regarding UPDATE_BEFORE record support.


Background


According to Fluss community feedback, Fluss 0.8 has adjusted its source 
to output UPDATE_BEFORE change types (while removing DELETE type output). They 
mentioned that Flink 2.2 would add integration support for UPDATE_BEFORE.


Fluss 0.8 Documentation Note:
Fluss 0.8 documentation states:


Supported Features
- Support for optimizing a dual-stream join from CDC sources that do not 
include delete messages into a delta join.
- Disable delete on the source table to guarantee there is no delete 
message in the table, by adding the option `'table.delete.behavior' = 
'IGNORE'` or `'DISABLE'` on the table.
- The source table is no more required to be a `first_row` merge 
engine table since this version.


Current Issue


However, after examining the Flink 2.2 codebase (specifically 
`DeltaJoinUtil.java`), I found that the 
`onlyProduceInsertOrUpdateAfter` method still explicitly excludes 
UPDATE_BEFORE:


```java
private static boolean 
onlyProduceInsertOrUpdateAfter(StreamPhysicalRel node) {
ChangelogMode changelogMode = getChangelogMode(node);
Set<RowKind&gt; allKinds = changelogMode.getContainedKinds();
return !allKinds.contains(RowKind.UPDATE_BEFORE) &amp;&amp; 
!allKinds.contains(RowKind.DELETE);
}
```


And in `areAllInputsInsertOrUpdateAfter`:


```java
private static boolean 
areAllInputsInsertOrUpdateAfter(StreamPhysicalJoin&nbsp;join) {
for&nbsp;(RelNode input : join.getInputs()) {
if&nbsp;(!onlyProduceInsertOrUpdateAfter(unwrapNode(input, false))) {
return false;
}
}
return true;
}
```


This means Delta Join still only accepts INSERT and UPDATE_AFTER records, not 
UPDATE_BEFORE.


Questions


1. Integration Gap: Is there a gap between Fluss 0.8's UPDATE_BEFORE support 
and Flink's current Delta Join implementation? Or is this support planned for a 
future release?


2. Timeline: If UPDATE_BEFORE support is planned, what is the expected timeline 
for this integration?


3. Workaround: For streaming scenarios that require handling UPDATE_BEFORE 
records in multi-level Delta Join chains, what is the recommended approach 
currently?


Use Case


We're building real-time CDC data pipelines where:
-&nbsp;First level: Delta Join processes CDC streams
-&nbsp;Second level: Needs to join intermediate results (which may also contain 
UPDATE_BEFORE and UPDATE_AFTER)


This creates a fundamental limitation for building the realtime wide table.


Environment
-&nbsp;Flink Version: 2.2
-&nbsp;Fluss Version: 0.8
-&nbsp;Use Case: Real-time CDC processing with multi-level joins


Thank you for your time and guidance on this matter. I'm really excited about 
the potential of Delta Join and look forward to receiving more updates on this 
matter.





Best regards,


Rise



Rise
[email protected]

Reply via email to