[ https://issues.apache.org/jira/browse/FLINK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17341114#comment-17341114 ]
buom edited comment on FLINK-22551 at 5/8/21, 3:34 AM: ------------------------------------------------------- [~HunterHunter] The logic is wrong in cases 2 & 3 - there is no text _Flink_ was printed. Maybe the issue related to the implementation of _org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction_ was (Author: buom): [~HunterHunter] The logic is wrong in cases 2 & 3 - there is no text _Flink_ was printed. May be the issue related to the implementation of _org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction_ > checkpoints: strange behaviour > ------------------------------- > > Key: FLINK-22551 > URL: https://issues.apache.org/jira/browse/FLINK-22551 > Project: Flink > Issue Type: Bug > Affects Versions: 1.13.0 > Environment: {code:java} > java -version > openjdk version "11.0.2" 2019-01-15 > OpenJDK Runtime Environment 18.9 (build 11.0.2+9) > OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode) > {code} > Reporter: buom > Priority: Critical > > * +*Case 1*:+ Work as expected > {code:java} > public class Example { > public static class ExampleSource extends RichSourceFunction<String> > implements CheckpointedFunction { > private volatile boolean isRunning = true; > @Override > public void open(Configuration parameters) throws Exception { > System.out.println("[source] invoke open()"); > } > @Override > public void close() throws Exception { > isRunning = false; > System.out.println("[source] invoke close()"); > } > @Override > public void run(SourceContext<String> ctx) throws Exception { > System.out.println("[source] invoke run()"); > while (isRunning) { > ctx.collect("Flink"); > Thread.sleep(500); > } > } > @Override > public void cancel() { > isRunning = false; > System.out.println("[source] invoke cancel()"); > } > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > System.out.println("[source] invoke snapshotState()"); > } > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > System.out.println("[source] invoke initializeState()"); > } > } > public static class ExampleSink extends PrintSinkFunction<String> > implements CheckpointedFunction { > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > System.out.println("[sink] invoke snapshotState()"); > } > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > System.out.println("[sink] invoke initializeState()"); > } > } > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000); > DataStream<String> stream = env.addSource(new ExampleSource()); > stream.addSink(new ExampleSink()).setParallelism(1); > env.execute(); > } > } > {code} > {code:java} > $ java -jar ./example.jar > [sink] invoke initializeState() > [source] invoke initializeState() > [source] invoke open() > [source] invoke run() > Flink > [sink] invoke snapshotState() > [source] invoke snapshotState() > Flink > Flink > [sink] invoke snapshotState() > [source] invoke snapshotState() > Flink > Flink > [sink] invoke snapshotState() > [source] invoke snapshotState() > ^C > {code} > * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_) > {code:java} > public class Example { > public static class ExampleSource extends RichSourceFunction<String> > implements CheckpointedFunction { > private volatile boolean isRunning = true; > @Override > public void open(Configuration parameters) throws Exception { > System.out.println("[source] invoke open()"); > } > @Override > public void close() throws Exception { > isRunning = false; > System.out.println("[source] invoke close()"); > } > @Override > public void run(SourceContext<String> ctx) throws Exception { > System.out.println("[source] invoke run()"); > while (isRunning) { > ctx.collect("Flink"); > Thread.sleep(500); > } > } > @Override > public void cancel() { > isRunning = false; > System.out.println("[source] invoke cancel()"); > } > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > System.out.println("[source] invoke snapshotState()"); > } > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > System.out.println("[source] invoke initializeState()"); > } > } > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000); > DataStream<String> stream = env.addSource(new ExampleSource()); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > String topic = "my-topic"; > FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( > topic, > (element, timestamp) -> { > byte[] value = element.getBytes(StandardCharsets.UTF_8); > return new ProducerRecord<>(topic, null, timestamp, null, > value, null); > }, > properties, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > stream.addSink(kafkaProducer).setParallelism(1); > env.execute(); > } > } > {code} > {code:java} > $ java -jar ./example.jar > [source] invoke cancel() > [source] invoke cancel() > [source] invoke cancel() > [source] invoke cancel() > ^C% > {code} > +*Case 3*+: Run as unexpected (wo/ _parallelism_) > {code:java} > public class Example { > public static class ExampleSource extends RichSourceFunction<String> > implements CheckpointedFunction { > private volatile boolean isRunning = true; > @Override > public void open(Configuration parameters) throws Exception { > System.out.println("[source] invoke open()"); > } > @Override > public void close() throws Exception { > isRunning = false; > System.out.println("[source] invoke close()"); > } > @Override > public void run(SourceContext<String> ctx) throws Exception { > System.out.println("[source] invoke run()"); > while (isRunning) { > ctx.collect("Flink"); > Thread.sleep(500); > } > } > @Override > public void cancel() { > isRunning = false; > System.out.println("[source] invoke cancel()"); > } > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > System.out.println("[source] invoke snapshotState()"); > } > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > System.out.println("[source] invoke initializeState()"); > } > } > public static class ExampleSink extends PrintSinkFunction<String> > implements CheckpointedFunction { > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > System.out.println("[sink] invoke snapshotState()"); > } > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > System.out.println("[sink] invoke initializeState()"); > } > } > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000); > DataStream<String> stream = env.addSource(new ExampleSource()); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > String topic = "my-topic"; > FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( > topic, > (element, timestamp) -> { > byte[] value = element.getBytes(StandardCharsets.UTF_8); > return new ProducerRecord<>(topic, null, timestamp, null, > value, null); > }, > properties, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > stream.addSink(kafkaProducer); > env.execute(); > } > }{code} > {code:java} > $ java -jar ./example.jar > [source] invoke initializeState() > [source] invoke open() > [source] invoke run() > [source] invoke cancel() > [source] invoke close() > [source] invoke initializeState() > [source] invoke open() > [source] invoke run() > [source] invoke cancel() > [source] invoke close() > [source] invoke initializeState() > [source] invoke open() > [source] invoke run() > [source] invoke snapshotState() > [source] invoke cancel() > [source] invoke close() > ^C% > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)