[ 
https://issues.apache.org/jira/browse/FLINK-39192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calico Shop updated FLINK-39192:
--------------------------------
    Description: 
When a *Sink* implements *SupportsPostCommitTopology* and the job uses a 
*bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), the 
*SinkWriterOperator* wrapping the post-commit sink throws:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput{code}
h3. Root cause

With a bounded source in streaming mode, the following sequence occurs:

1. Source emits all records and finishes
2. *endOfInput* propagates through the operator chain; the post-commit 
{*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
3. A final checkpoint is triggered
4. During checkpoint completion, the upstream *CommitterOperator* emits 
*CommittableMessage* instances downstream into the post-commit topology
5. The post-commit *SinkWriterOperator.processElement()* rejects these elements 
because *endOfInput* is already *true*

The *endOfInput* guard assumes no elements arrive after the flag is set, but 
*CommitterOperator* legitimately emits committables during the final 
checkpoint's completion callback, which happens _after_ *endOfInput* has 
propagated.

This only affects the *SinkWriterOperator* instance _inside the post-commit 
topology_ (the one wrapping the post-commit sink). The primary 
*SinkWriterOperator* (the one receiving user data) is not affected.
h3. How to reproduce

The following self-contained program reproduces the bug. It uses a 
*DataGeneratorSource* that emits 100 integers into a sink with a post-commit 
topology that simply discards the committables:
{code:java}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;

public class PostCommitBoundedSourceBug {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        // Streaming mode is the default, but being explicit:
        
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(1000);

        // Bounded source: emits 100 integers, then finishes
        DataGeneratorSource<Integer> source =
                new DataGeneratorSource<>(index -> index.intValue(), 100, 
Types.INT);

        env
                .fromSource(
                        source,
                        
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
                        "bounded-source")
                .sinkTo(new PostCommitSink());

        // This will throw:
        // java.lang.IllegalStateException: Received element after endOfInput
        env.execute("post-commit-bounded-source-bug");
    }

    /**
     * Minimal sink with a post-commit topology.
     * The writer produces an Integer committable per element.
     * The committer is a no-op. The post-commit topology discards everything.
     */
    static class PostCommitSink
            implements Sink<Integer>, SupportsCommitter<Integer>, 
SupportsPostCommitTopology<Integer> {

        @Override
        public CommittingSinkWriter<Integer, Integer> 
createWriter(WriterInitContext context) {
            return new CommittingSinkWriter<>() {
                private int pending = 0;

                @Override
                public void write(Integer element, Context context) {
                    pending++;
                }

                @Override
                public void flush(boolean endOfInput) {
                }

                @Override
                public Collection<Integer> prepareCommit() {
                    int count = pending;
                    pending = 0;
                    return count > 0 ? Collections.singletonList(count) : 
Collections.emptyList();
                }

                @Override
                public void close() {
                }
            };
        }

        @Override
        public Committer<Integer> createCommitter(CommitterInitContext context) 
{
            return new Committer<Integer>() {
                @Override
                public void commit(Collection<CommitRequest<Integer>> 
committables)
                        throws IOException, InterruptedException {
                }

                @Override
                public void close() throws Exception {
                }
            };
        }

        @Override
        public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
            return new SimpleVersionedSerializer<>() {
                @Override
                public int getVersion() {
                    return 0;
                }

                @Override
                public byte[] serialize(Integer value) {
                    return ByteBuffer.allocate(4).putInt(value).array();
                }

                @Override
                public Integer deserialize(int version, byte[] serialized) {
                    return ByteBuffer.wrap(serialized).getInt();
                }
            };
        }

        @Override
        public void 
addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
            // Any .sinkTo() here creates a SinkWriterOperator that will fail
            committables.sinkTo(new DiscardingSink<>());
        }
    }
}
{code}
h3. Expected behavior

The post-commit topology should successfully process committables emitted 
during the final checkpoint. The job should complete with status {*}FINISHED{*}.
h3. Actual behavior

The job fails with:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
{code}
h3. Notes

This bug does _not_ manifest when:
 - Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
 -- (no *endOfInput* is ever called)
 - Running in *RuntimeExecutionMode.BATCH*

  was:
When a *Sink* implements *SupportsPostCommitTopology* and the job uses a 
*bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), the 
*SinkWriterOperator* wrapping the post-commit sink throws:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput{code}
 
h3. Root cause

With a bounded source in streaming mode, the following sequence occurs:

1. Source emits all records and finishes
2. *endOfInput* propagates through the operator chain; the post-commit 
{*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
3. A final checkpoint is triggered
4. During checkpoint completion, the upstream *CommitterOperator* emits 
*CommittableMessage* instances downstream into the post-commit topology
5. The post-commit *SinkWriterOperator.processElement()* rejects these elements 
because *endOfInput* is already *true*

The *endOfInput* guard assumes no elements arrive after the flag is set, but 
*CommitterOperator* legitimately emits committables during the final 
checkpoint's completion callback, which happens _after_ *endOfInput* has 
propagated.

This only affects the *SinkWriterOperator* instance _inside the post-commit 
topology_ (the one wrapping the post-commit sink). The primary 
*SinkWriterOperator* (the one receiving user data) is not affected.
h3. How to reproduce

The following self-contained program reproduces the bug. It uses a 
*DataGeneratorSource* that emits 100 integers into a sink with a post-commit 
topology that simply discards the committables:
{code:java}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;

public class PostCommitBoundedSourceBug {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        // Streaming mode is the default, but being explicit:
        
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(1000);

        // Bounded source: emits 100 integers, then finishes
        DataGeneratorSource<Integer> source =
                new DataGeneratorSource<>(index -> index.intValue(), 100, 
Types.INT);

        env
                .fromSource(
                        source,
                        
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
                        "bounded-source")
                .sinkTo(new PostCommitSink());

        // This will throw:
        // java.lang.IllegalStateException: Received element after endOfInput
        env.execute("post-commit-bounded-source-bug");
    }

    /**
     * Minimal sink with a post-commit topology.
     * The writer produces an Integer committable per element.
     * The committer is a no-op. The post-commit topology discards everything.
     */
    static class PostCommitSink
            implements Sink<Integer>, SupportsCommitter<Integer>, 
SupportsPostCommitTopology<Integer> {

        @Override
        public CommittingSinkWriter<Integer, Integer> 
createWriter(WriterInitContext context) {
            return new CommittingSinkWriter<>() {
                private int pending = 0;

                @Override
                public void write(Integer element, Context context) {
                    pending++;
                }

                @Override
                public void flush(boolean endOfInput) {
                }

                @Override
                public Collection<Integer> prepareCommit() {
                    int count = pending;
                    pending = 0;
                    return count > 0 ? Collections.singletonList(count) : 
Collections.emptyList();
                }

                @Override
                public void close() {
                }
            };
        }

        @Override
        public Committer<Integer> createCommitter(CommitterInitContext context) 
{
            return new Committer<Integer>() {
                @Override
                public void commit(Collection<CommitRequest<Integer>> 
committables)
                        throws IOException, InterruptedException {
                }

                @Override
                public void close() throws Exception {
                }
            };
        }

        @Override
        public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
            return new SimpleVersionedSerializer<>() {
                @Override
                public int getVersion() {
                    return 0;
                }

                @Override
                public byte[] serialize(Integer value) {
                    return ByteBuffer.allocate(4).putInt(value).array();
                }

                @Override
                public Integer deserialize(int version, byte[] serialized) {
                    return ByteBuffer.wrap(serialized).getInt();
                }
            };
        }

        @Override
        public void 
addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
            // Any .sinkTo() here creates a SinkWriterOperator that will fail
            committables.sinkTo(new DiscardingSink<>());
        }
    }
}
{code}
h3. Expected behavior

The post-commit topology should successfully process committables emitted 
during the final checkpoint. The job should complete with status {*}FINISHED{*}.
h3. Actual behavior

The job fails with:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
{code}
h3. Notes

This bug does _not_ manifest when:
 - Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
 -- (no *endOfInput* is ever called)
 - Running in *RuntimeExecutionMode.BATCH*


> SinkWriterOperator in post-commit topology throws IllegalStateException with 
> bounded source in streaming mode
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39192
>                 URL: https://issues.apache.org/jira/browse/FLINK-39192
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Calico Shop
>            Priority: Major
>
> When a *Sink* implements *SupportsPostCommitTopology* and the job uses a 
> *bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), 
> the *SinkWriterOperator* wrapping the post-commit sink throws:
> {code:java}
> java.lang.IllegalStateException: Received element after endOfInput{code}
> h3. Root cause
> With a bounded source in streaming mode, the following sequence occurs:
> 1. Source emits all records and finishes
> 2. *endOfInput* propagates through the operator chain; the post-commit 
> {*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
> 3. A final checkpoint is triggered
> 4. During checkpoint completion, the upstream *CommitterOperator* emits 
> *CommittableMessage* instances downstream into the post-commit topology
> 5. The post-commit *SinkWriterOperator.processElement()* rejects these 
> elements because *endOfInput* is already *true*
> The *endOfInput* guard assumes no elements arrive after the flag is set, but 
> *CommitterOperator* legitimately emits committables during the final 
> checkpoint's completion callback, which happens _after_ *endOfInput* has 
> propagated.
> This only affects the *SinkWriterOperator* instance _inside the post-commit 
> topology_ (the one wrapping the post-commit sink). The primary 
> *SinkWriterOperator* (the one receiving user data) is not affected.
> h3. How to reproduce
> The following self-contained program reproduces the bug. It uses a 
> *DataGeneratorSource* that emits 100 integers into a sink with a post-commit 
> topology that simply discards the committables:
> {code:java}
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.connector.sink2.Committer;
> import org.apache.flink.api.connector.sink2.CommitterInitContext;
> import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
> import org.apache.flink.api.connector.sink2.Sink;
> import org.apache.flink.api.connector.sink2.SupportsCommitter;
> import org.apache.flink.api.connector.sink2.WriterInitContext;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.core.io.SimpleVersionedSerializer;
> import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
> import 
> org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.util.Collection;
> import java.util.Collections;
> public class PostCommitBoundedSourceBug {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // Streaming mode is the default, but being explicit:
>         
> env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
>         env.enableCheckpointing(1000);
>         // Bounded source: emits 100 integers, then finishes
>         DataGeneratorSource<Integer> source =
>                 new DataGeneratorSource<>(index -> index.intValue(), 100, 
> Types.INT);
>         env
>                 .fromSource(
>                         source,
>                         
> org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
>                         "bounded-source")
>                 .sinkTo(new PostCommitSink());
>         // This will throw:
>         // java.lang.IllegalStateException: Received element after endOfInput
>         env.execute("post-commit-bounded-source-bug");
>     }
>     /**
>      * Minimal sink with a post-commit topology.
>      * The writer produces an Integer committable per element.
>      * The committer is a no-op. The post-commit topology discards everything.
>      */
>     static class PostCommitSink
>             implements Sink<Integer>, SupportsCommitter<Integer>, 
> SupportsPostCommitTopology<Integer> {
>         @Override
>         public CommittingSinkWriter<Integer, Integer> 
> createWriter(WriterInitContext context) {
>             return new CommittingSinkWriter<>() {
>                 private int pending = 0;
>                 @Override
>                 public void write(Integer element, Context context) {
>                     pending++;
>                 }
>                 @Override
>                 public void flush(boolean endOfInput) {
>                 }
>                 @Override
>                 public Collection<Integer> prepareCommit() {
>                     int count = pending;
>                     pending = 0;
>                     return count > 0 ? Collections.singletonList(count) : 
> Collections.emptyList();
>                 }
>                 @Override
>                 public void close() {
>                 }
>             };
>         }
>         @Override
>         public Committer<Integer> createCommitter(CommitterInitContext 
> context) {
>             return new Committer<Integer>() {
>                 @Override
>                 public void commit(Collection<CommitRequest<Integer>> 
> committables)
>                         throws IOException, InterruptedException {
>                 }
>                 @Override
>                 public void close() throws Exception {
>                 }
>             };
>         }
>         @Override
>         public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
>             return new SimpleVersionedSerializer<>() {
>                 @Override
>                 public int getVersion() {
>                     return 0;
>                 }
>                 @Override
>                 public byte[] serialize(Integer value) {
>                     return ByteBuffer.allocate(4).putInt(value).array();
>                 }
>                 @Override
>                 public Integer deserialize(int version, byte[] serialized) {
>                     return ByteBuffer.wrap(serialized).getInt();
>                 }
>             };
>         }
>         @Override
>         public void 
> addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
>             // Any .sinkTo() here creates a SinkWriterOperator that will fail
>             committables.sinkTo(new DiscardingSink<>());
>         }
>     }
> }
> {code}
> h3. Expected behavior
> The post-commit topology should successfully process committables emitted 
> during the final checkpoint. The job should complete with status 
> {*}FINISHED{*}.
> h3. Actual behavior
> The job fails with:
> {code:java}
> java.lang.IllegalStateException: Received element after endOfInput
>     at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
> {code}
> h3. Notes
> This bug does _not_ manifest when:
>  - Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
>  -- (no *endOfInput* is ever called)
>  - Running in *RuntimeExecutionMode.BATCH*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to