buom created FLINK-22551:
----------------------------
Summary: checkpoints: strange behaviour
Key: FLINK-22551
URL: https://issues.apache.org/jira/browse/FLINK-22551
Project: Flink
Issue Type: Bug
Affects Versions: 1.13.0
Environment: {code:java}
java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
{code}
Reporter: buom
* +*Case 1*:+ Work as expected
{code:java}
public class Example {
public static class ExampleSource extends RichSourceFunction<String>
implements CheckpointedFunction {
private volatile boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("[source] invoke open()");
}
@Override
public void close() throws Exception {
isRunning = false;
System.out.println("[source] invoke close()");
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
System.out.println("[source] invoke run()");
while (isRunning) {
ctx.collect("Flink");
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
System.out.println("[source] invoke cancel()");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
System.out.println("[source] invoke snapshotState()");
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
System.out.println("[source] invoke initializeState()");
}
}
public static class ExampleSink extends PrintSinkFunction<String>
implements CheckpointedFunction {
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
System.out.println("[sink] invoke snapshotState()");
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
System.out.println("[sink] invoke initializeState()");
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
DataStream<String> stream = env.addSource(new ExampleSource());
stream.addSink(new ExampleSink()).setParallelism(1);
env.execute();
}
}
{code}
{code:java}
$ java -jar ./example.jar
[sink] invoke initializeState()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
^C
{code}
* *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)
{code:java}
public class Example {
public static class ExampleSource extends RichSourceFunction<String>
implements CheckpointedFunction {
private volatile boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("[source] invoke open()");
}
@Override
public void close() throws Exception {
isRunning = false;
System.out.println("[source] invoke close()");
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
System.out.println("[source] invoke run()");
while (isRunning) {
ctx.collect("Flink");
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
System.out.println("[source] invoke cancel()");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
System.out.println("[source] invoke snapshotState()");
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
System.out.println("[source] invoke initializeState()");
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
DataStream<String> stream = env.addSource(new ExampleSource());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
String topic = "my-topic";
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
(element, timestamp) -> {
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>(topic, null, timestamp, null,
value, null);
},
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
stream.addSink(kafkaProducer).setParallelism(1);
env.execute();
}
}
{code}
{code:java}
$ java -jar ./example.jar
[source] invoke cancel()
[source] invoke cancel()
[source] invoke cancel()
[source] invoke cancel()
^C%
{code}
+*Case 3*+: Run as unexpected (w/ _parallelism = defaul_t)
{code:java}
public class Example {
public static class ExampleSource extends RichSourceFunction<String>
implements CheckpointedFunction {
private volatile boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("[source] invoke open()");
}
@Override
public void close() throws Exception {
isRunning = false;
System.out.println("[source] invoke close()");
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
System.out.println("[source] invoke run()");
while (isRunning) {
ctx.collect("Flink");
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
System.out.println("[source] invoke cancel()");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
System.out.println("[source] invoke snapshotState()");
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
System.out.println("[source] invoke initializeState()");
}
}
public static class ExampleSink extends PrintSinkFunction<String>
implements CheckpointedFunction {
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
System.out.println("[sink] invoke snapshotState()");
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
System.out.println("[sink] invoke initializeState()");
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
DataStream<String> stream = env.addSource(new ExampleSource());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
String topic = "my-topic";
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
(element, timestamp) -> {
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>(topic, null, timestamp, null,
value, null);
},
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
stream.addSink(kafkaProducer);
env.execute();
}
}{code}
{code:java}
$ java -jar ./example.jar
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke snapshotState()
[source] invoke cancel()
[source] invoke close()
^C%
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)