[ 
https://issues.apache.org/jira/browse/FLINK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17341114#comment-17341114
 ] 

buom edited comment on FLINK-22551 at 5/8/21, 3:34 AM:
-------------------------------------------------------

[~HunterHunter] The logic is wrong in cases 2 & 3 - there is no text _Flink_ 
was printed.
 Maybe the issue related to the implementation of 
_org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction_


was (Author: buom):
[~HunterHunter] The logic is wrong in cases 2 & 3 - there is no text _Flink_ 
was printed.
May be the issue related to the implementation of 
_org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction_

> checkpoints: strange behaviour 
> -------------------------------
>
>                 Key: FLINK-22551
>                 URL: https://issues.apache.org/jira/browse/FLINK-22551
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.0
>         Environment: {code:java}
>  java -version
> openjdk version "11.0.2" 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
> {code}
>            Reporter: buom
>            Priority: Critical
>
> * +*Case 1*:+ Work as expected
> {code:java}
> public class Example {
>     public static class ExampleSource extends RichSourceFunction<String>
>             implements CheckpointedFunction {
>         private volatile boolean isRunning = true;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             System.out.println("[source] invoke open()");
>         }
>         @Override
>         public void close() throws Exception {
>             isRunning = false;
>             System.out.println("[source] invoke close()");
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             System.out.println("[source] invoke run()");
>             while (isRunning) {
>                 ctx.collect("Flink");
>                 Thread.sleep(500);
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>             System.out.println("[source] invoke cancel()");
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[source] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[source] invoke initializeState()");
>         }
>     }
>     public static class ExampleSink extends PrintSinkFunction<String>
>             implements CheckpointedFunction {
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[sink] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[sink] invoke initializeState()");
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
>         DataStream<String> stream = env.addSource(new ExampleSource());
>         stream.addSink(new ExampleSink()).setParallelism(1);
>         env.execute();
>     }
> }
> {code}
> {code:java}
> $ java -jar ./example.jar
> [sink] invoke initializeState()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> Flink
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> Flink
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> ^C
> {code}
>  * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)
> {code:java}
> public class Example {
>     public static class ExampleSource extends RichSourceFunction<String>
>             implements CheckpointedFunction {
>         private volatile boolean isRunning = true;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             System.out.println("[source] invoke open()");
>         }
>         @Override
>         public void close() throws Exception {
>             isRunning = false;
>             System.out.println("[source] invoke close()");
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             System.out.println("[source] invoke run()");
>             while (isRunning) {
>                 ctx.collect("Flink");
>                 Thread.sleep(500);
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>             System.out.println("[source] invoke cancel()");
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[source] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[source] invoke initializeState()");
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
>         DataStream<String> stream = env.addSource(new ExampleSource());
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         String topic = "my-topic";
>         FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
>                 topic,
>                 (element, timestamp) -> {
>                     byte[] value = element.getBytes(StandardCharsets.UTF_8);
>                     return new ProducerRecord<>(topic, null, timestamp, null, 
> value, null);
>                 },
>                 properties,
>                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>         stream.addSink(kafkaProducer).setParallelism(1);
>         env.execute();
>     }
> }
> {code}
> {code:java}
>  $ java -jar ./example.jar
> [source] invoke cancel() 
> [source] invoke cancel() 
> [source] invoke cancel() 
> [source] invoke cancel() 
> ^C%
> {code}
> +*Case 3*+: Run as unexpected (wo/ _parallelism_)
> {code:java}
> public class Example {
>     public static class ExampleSource extends RichSourceFunction<String>
>             implements CheckpointedFunction {
>         private volatile boolean isRunning = true;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             System.out.println("[source] invoke open()");
>         }
>         @Override
>         public void close() throws Exception {
>             isRunning = false;
>             System.out.println("[source] invoke close()");
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             System.out.println("[source] invoke run()");
>             while (isRunning) {
>                 ctx.collect("Flink");
>                 Thread.sleep(500);
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>             System.out.println("[source] invoke cancel()");
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[source] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[source] invoke initializeState()");
>         }
>     }
>     public static class ExampleSink extends PrintSinkFunction<String>
>             implements CheckpointedFunction {
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[sink] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[sink] invoke initializeState()");
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
>         DataStream<String> stream = env.addSource(new ExampleSource());
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         String topic = "my-topic";
>         FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
>                 topic,
>                 (element, timestamp) -> {
>                     byte[] value = element.getBytes(StandardCharsets.UTF_8);
>                     return new ProducerRecord<>(topic, null, timestamp, null, 
> value, null);
>                 },
>                 properties,
>                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>         stream.addSink(kafkaProducer);
>         env.execute();
>     }
> }{code}
> {code:java}
> $ java -jar ./example.jar
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke cancel()
> [source] invoke close()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke cancel()
> [source] invoke close()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke snapshotState()
> [source] invoke cancel()
> [source] invoke close()
> ^C%
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to