Calico Shop created FLINK-39192:
-----------------------------------
Summary: SinkWriterOperator in post-commit topology throws
IllegalStateException with bounded source in streaming mode
Key: FLINK-39192
URL: https://issues.apache.org/jira/browse/FLINK-39192
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Affects Versions: 2.2.0, 2.1.0
Reporter: Calico Shop
When a *Sink* implements *SupportsPostCommitTopology* and the job uses a
*bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), the
*SinkWriterOperator* wrapping the post-commit sink throws:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput{code}
h3. Root cause
With a bounded source in streaming mode, the following sequence occurs:
1. Source emits all records and finishes
2. *endOfInput* propagates through the operator chain; the post-commit
{*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
3. A final checkpoint is triggered
4. During checkpoint completion, the upstream *CommitterOperator* emits
*CommittableMessage* instances downstream into the post-commit topology
5. The post-commit *SinkWriterOperator.processElement()* rejects these elements
because *endOfInput* is already *true*
The *endOfInput* guard assumes no elements arrive after the flag is set, but
*CommitterOperator* legitimately emits committables during the final
checkpoint's completion callback, which happens _after_ *endOfInput* has
propagated.
This only affects the *SinkWriterOperator* instance _inside the post-commit
topology_ (the one wrapping the post-commit sink). The primary
*SinkWriterOperator* (the one receiving user data) is not affected.
h3. How to reproduce
The following self-contained program reproduces the bug. It uses a
*DataGeneratorSource* that emits 100 integers into a sink with a post-commit
topology that simply discards the committables:
{code:java}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
public class PostCommitBoundedSourceBug {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Streaming mode is the default, but being explicit:
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(1000);
// Bounded source: emits 100 integers, then finishes
DataGeneratorSource<Integer> source =
new DataGeneratorSource<>(index -> index.intValue(), 100,
Types.INT);
env
.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"bounded-source")
.sinkTo(new PostCommitSink());
// This will throw:
// java.lang.IllegalStateException: Received element after endOfInput
env.execute("post-commit-bounded-source-bug");
}
/**
* Minimal sink with a post-commit topology.
* The writer produces an Integer committable per element.
* The committer is a no-op. The post-commit topology discards everything.
*/
static class PostCommitSink
implements Sink<Integer>, SupportsCommitter<Integer>,
SupportsPostCommitTopology<Integer> {
@Override
public CommittingSinkWriter<Integer, Integer>
createWriter(WriterInitContext context) {
return new CommittingSinkWriter<>() {
private int pending = 0;
@Override
public void write(Integer element, Context context) {
pending++;
}
@Override
public void flush(boolean endOfInput) {
}
@Override
public Collection<Integer> prepareCommit() {
int count = pending;
pending = 0;
return count > 0 ? Collections.singletonList(count) :
Collections.emptyList();
}
@Override
public void close() {
}
};
}
@Override
public Committer<Integer> createCommitter(CommitterInitContext context)
{
return new Committer<Integer>() {
@Override
public void commit(Collection<CommitRequest<Integer>>
committables)
throws IOException, InterruptedException {
}
@Override
public void close() throws Exception {
}
};
}
@Override
public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
return new SimpleVersionedSerializer<>() {
@Override
public int getVersion() {
return 0;
}
@Override
public byte[] serialize(Integer value) {
return ByteBuffer.allocate(4).putInt(value).array();
}
@Override
public Integer deserialize(int version, byte[] serialized) {
return ByteBuffer.wrap(serialized).getInt();
}
};
}
@Override
public void
addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
// Any .sinkTo() here creates a SinkWriterOperator that will fail
committables.sinkTo(new DiscardingSink<>());
}
}
}
{code}
h3. Expected behavior
The post-commit topology should successfully process committables emitted
during the final checkpoint. The job should complete with status {*}FINISHED{*}.
h3. Actual behavior
The job fails with:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
{code}
h3. Notes
This bug does _not_ manifest when:
- Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
-- (no *endOfInput* is ever called)
- Running in *RuntimeExecutionMode.BATCH*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)