[
https://issues.apache.org/jira/browse/FLINK-39192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Calico Shop updated FLINK-39192:
--------------------------------
Description:
When a *Sink* implements *SupportsPostCommitTopology* and the job uses a
*bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), the
*SinkWriterOperator* wrapping the post-commit sink throws:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput{code}
h3. Root cause
With a bounded source in streaming mode, the following sequence occurs:
1. Source emits all records and finishes
2. *endOfInput* propagates through the operator chain; the post-commit
{*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
3. A final checkpoint is triggered
4. During checkpoint completion, the upstream *CommitterOperator* emits
*CommittableMessage* instances downstream into the post-commit topology
5. The post-commit *SinkWriterOperator.processElement()* rejects these elements
because *endOfInput* is already *true*
The *endOfInput* guard assumes no elements arrive after the flag is set, but
*CommitterOperator* legitimately emits committables during the final
checkpoint's completion callback, which happens _after_ *endOfInput* has
propagated.
This only affects the *SinkWriterOperator* instance _inside the post-commit
topology_ (the one wrapping the post-commit sink). The primary
*SinkWriterOperator* (the one receiving user data) is not affected.
h3. How to reproduce
The following self-contained program reproduces the bug. It uses a
*DataGeneratorSource* that emits 100 integers into a sink with a post-commit
topology that simply discards the committables:
{code:java}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
public class PostCommitBoundedSourceBug {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Streaming mode is the default, but being explicit:
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(1000);
// Bounded source: emits 100 integers, then finishes
DataGeneratorSource<Integer> source =
new DataGeneratorSource<>(index -> index.intValue(), 100,
Types.INT);
env
.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"bounded-source")
.sinkTo(new PostCommitSink());
// This will throw:
// java.lang.IllegalStateException: Received element after endOfInput
env.execute("post-commit-bounded-source-bug");
}
/**
* Minimal sink with a post-commit topology.
* The writer produces an Integer committable per element.
* The committer is a no-op. The post-commit topology discards everything.
*/
static class PostCommitSink
implements Sink<Integer>, SupportsCommitter<Integer>,
SupportsPostCommitTopology<Integer> {
@Override
public CommittingSinkWriter<Integer, Integer>
createWriter(WriterInitContext context) {
return new CommittingSinkWriter<>() {
private int pending = 0;
@Override
public void write(Integer element, Context context) {
pending++;
}
@Override
public void flush(boolean endOfInput) {
}
@Override
public Collection<Integer> prepareCommit() {
int count = pending;
pending = 0;
return count > 0 ? Collections.singletonList(count) :
Collections.emptyList();
}
@Override
public void close() {
}
};
}
@Override
public Committer<Integer> createCommitter(CommitterInitContext context)
{
return new Committer<Integer>() {
@Override
public void commit(Collection<CommitRequest<Integer>>
committables)
throws IOException, InterruptedException {
}
@Override
public void close() throws Exception {
}
};
}
@Override
public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
return new SimpleVersionedSerializer<>() {
@Override
public int getVersion() {
return 0;
}
@Override
public byte[] serialize(Integer value) {
return ByteBuffer.allocate(4).putInt(value).array();
}
@Override
public Integer deserialize(int version, byte[] serialized) {
return ByteBuffer.wrap(serialized).getInt();
}
};
}
@Override
public void
addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
// Any .sinkTo() here creates a SinkWriterOperator that will fail
committables.sinkTo(new DiscardingSink<>());
}
}
}
{code}
h3. Expected behavior
The post-commit topology should successfully process committables emitted
during the final checkpoint. The job should complete with status {*}FINISHED{*}.
h3. Actual behavior
The job fails with:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
{code}
h3. Notes
This bug does _not_ manifest when:
- Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
-- (no *endOfInput* is ever called)
- Running in *RuntimeExecutionMode.BATCH*
was:
When a *Sink* implements *SupportsPostCommitTopology* and the job uses a
*bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), the
*SinkWriterOperator* wrapping the post-commit sink throws:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput{code}
h3. Root cause
With a bounded source in streaming mode, the following sequence occurs:
1. Source emits all records and finishes
2. *endOfInput* propagates through the operator chain; the post-commit
{*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
3. A final checkpoint is triggered
4. During checkpoint completion, the upstream *CommitterOperator* emits
*CommittableMessage* instances downstream into the post-commit topology
5. The post-commit *SinkWriterOperator.processElement()* rejects these elements
because *endOfInput* is already *true*
The *endOfInput* guard assumes no elements arrive after the flag is set, but
*CommitterOperator* legitimately emits committables during the final
checkpoint's completion callback, which happens _after_ *endOfInput* has
propagated.
This only affects the *SinkWriterOperator* instance _inside the post-commit
topology_ (the one wrapping the post-commit sink). The primary
*SinkWriterOperator* (the one receiving user data) is not affected.
h3. How to reproduce
The following self-contained program reproduces the bug. It uses a
*DataGeneratorSource* that emits 100 integers into a sink with a post-commit
topology that simply discards the committables:
{code:java}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
public class PostCommitBoundedSourceBug {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Streaming mode is the default, but being explicit:
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(1000);
// Bounded source: emits 100 integers, then finishes
DataGeneratorSource<Integer> source =
new DataGeneratorSource<>(index -> index.intValue(), 100,
Types.INT);
env
.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"bounded-source")
.sinkTo(new PostCommitSink());
// This will throw:
// java.lang.IllegalStateException: Received element after endOfInput
env.execute("post-commit-bounded-source-bug");
}
/**
* Minimal sink with a post-commit topology.
* The writer produces an Integer committable per element.
* The committer is a no-op. The post-commit topology discards everything.
*/
static class PostCommitSink
implements Sink<Integer>, SupportsCommitter<Integer>,
SupportsPostCommitTopology<Integer> {
@Override
public CommittingSinkWriter<Integer, Integer>
createWriter(WriterInitContext context) {
return new CommittingSinkWriter<>() {
private int pending = 0;
@Override
public void write(Integer element, Context context) {
pending++;
}
@Override
public void flush(boolean endOfInput) {
}
@Override
public Collection<Integer> prepareCommit() {
int count = pending;
pending = 0;
return count > 0 ? Collections.singletonList(count) :
Collections.emptyList();
}
@Override
public void close() {
}
};
}
@Override
public Committer<Integer> createCommitter(CommitterInitContext context)
{
return new Committer<Integer>() {
@Override
public void commit(Collection<CommitRequest<Integer>>
committables)
throws IOException, InterruptedException {
}
@Override
public void close() throws Exception {
}
};
}
@Override
public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
return new SimpleVersionedSerializer<>() {
@Override
public int getVersion() {
return 0;
}
@Override
public byte[] serialize(Integer value) {
return ByteBuffer.allocate(4).putInt(value).array();
}
@Override
public Integer deserialize(int version, byte[] serialized) {
return ByteBuffer.wrap(serialized).getInt();
}
};
}
@Override
public void
addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
// Any .sinkTo() here creates a SinkWriterOperator that will fail
committables.sinkTo(new DiscardingSink<>());
}
}
}
{code}
h3. Expected behavior
The post-commit topology should successfully process committables emitted
during the final checkpoint. The job should complete with status {*}FINISHED{*}.
h3. Actual behavior
The job fails with:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
{code}
h3. Notes
This bug does _not_ manifest when:
- Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
-- (no *endOfInput* is ever called)
- Running in *RuntimeExecutionMode.BATCH*
> SinkWriterOperator in post-commit topology throws IllegalStateException with
> bounded source in streaming mode
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39192
> URL: https://issues.apache.org/jira/browse/FLINK-39192
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 2.1.0, 2.2.0
> Reporter: Calico Shop
> Priority: Major
>
> When a *Sink* implements *SupportsPostCommitTopology* and the job uses a
> *bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}),
> the *SinkWriterOperator* wrapping the post-commit sink throws:
> {code:java}
> java.lang.IllegalStateException: Received element after endOfInput{code}
> h3. Root cause
> With a bounded source in streaming mode, the following sequence occurs:
> 1. Source emits all records and finishes
> 2. *endOfInput* propagates through the operator chain; the post-commit
> {*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
> 3. A final checkpoint is triggered
> 4. During checkpoint completion, the upstream *CommitterOperator* emits
> *CommittableMessage* instances downstream into the post-commit topology
> 5. The post-commit *SinkWriterOperator.processElement()* rejects these
> elements because *endOfInput* is already *true*
> The *endOfInput* guard assumes no elements arrive after the flag is set, but
> *CommitterOperator* legitimately emits committables during the final
> checkpoint's completion callback, which happens _after_ *endOfInput* has
> propagated.
> This only affects the *SinkWriterOperator* instance _inside the post-commit
> topology_ (the one wrapping the post-commit sink). The primary
> *SinkWriterOperator* (the one receiving user data) is not affected.
> h3. How to reproduce
> The following self-contained program reproduces the bug. It uses a
> *DataGeneratorSource* that emits 100 integers into a sink with a post-commit
> topology that simply discards the committables:
> {code:java}
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.connector.sink2.Committer;
> import org.apache.flink.api.connector.sink2.CommitterInitContext;
> import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
> import org.apache.flink.api.connector.sink2.Sink;
> import org.apache.flink.api.connector.sink2.SupportsCommitter;
> import org.apache.flink.api.connector.sink2.WriterInitContext;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.core.io.SimpleVersionedSerializer;
> import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
> import
> org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.util.Collection;
> import java.util.Collections;
> public class PostCommitBoundedSourceBug {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // Streaming mode is the default, but being explicit:
>
> env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
> env.enableCheckpointing(1000);
> // Bounded source: emits 100 integers, then finishes
> DataGeneratorSource<Integer> source =
> new DataGeneratorSource<>(index -> index.intValue(), 100,
> Types.INT);
> env
> .fromSource(
> source,
>
> org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
> "bounded-source")
> .sinkTo(new PostCommitSink());
> // This will throw:
> // java.lang.IllegalStateException: Received element after endOfInput
> env.execute("post-commit-bounded-source-bug");
> }
> /**
> * Minimal sink with a post-commit topology.
> * The writer produces an Integer committable per element.
> * The committer is a no-op. The post-commit topology discards everything.
> */
> static class PostCommitSink
> implements Sink<Integer>, SupportsCommitter<Integer>,
> SupportsPostCommitTopology<Integer> {
> @Override
> public CommittingSinkWriter<Integer, Integer>
> createWriter(WriterInitContext context) {
> return new CommittingSinkWriter<>() {
> private int pending = 0;
> @Override
> public void write(Integer element, Context context) {
> pending++;
> }
> @Override
> public void flush(boolean endOfInput) {
> }
> @Override
> public Collection<Integer> prepareCommit() {
> int count = pending;
> pending = 0;
> return count > 0 ? Collections.singletonList(count) :
> Collections.emptyList();
> }
> @Override
> public void close() {
> }
> };
> }
> @Override
> public Committer<Integer> createCommitter(CommitterInitContext
> context) {
> return new Committer<Integer>() {
> @Override
> public void commit(Collection<CommitRequest<Integer>>
> committables)
> throws IOException, InterruptedException {
> }
> @Override
> public void close() throws Exception {
> }
> };
> }
> @Override
> public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
> return new SimpleVersionedSerializer<>() {
> @Override
> public int getVersion() {
> return 0;
> }
> @Override
> public byte[] serialize(Integer value) {
> return ByteBuffer.allocate(4).putInt(value).array();
> }
> @Override
> public Integer deserialize(int version, byte[] serialized) {
> return ByteBuffer.wrap(serialized).getInt();
> }
> };
> }
> @Override
> public void
> addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
> // Any .sinkTo() here creates a SinkWriterOperator that will fail
> committables.sinkTo(new DiscardingSink<>());
> }
> }
> }
> {code}
> h3. Expected behavior
> The post-commit topology should successfully process committables emitted
> during the final checkpoint. The job should complete with status
> {*}FINISHED{*}.
> h3. Actual behavior
> The job fails with:
> {code:java}
> java.lang.IllegalStateException: Received element after endOfInput
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
> {code}
> h3. Notes
> This bug does _not_ manifest when:
> - Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
> -- (no *endOfInput* is ever called)
> - Running in *RuntimeExecutionMode.BATCH*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)