Hi, Eduardo. Maybe i should describe experiment design precisely : 1) I run Flink on YARN (YARN Session method). 2) I do not stop/cancell application, i just kill TaskManager process 3) After that YARN creates another TaskManager Process and auto checkpoint restore from HDFS happens.
That's why i expect to see correct restore. С уважением, Василий Мельник *Glow**Byte Consulting* <http://www.gbconsulting.ru/> =================== Моб. тел.: +7 (903) 101-43-71 vasily.mel...@glowbyteconsulting.com On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor < eduardo.winpe...@gmail.com> wrote: > Hi Vasily, > > You're probably executing this from your IDE or from a local Flink cluster > without starting your job from a checkpoint. > > When you start your Flink job for the second time you need to specify the > path to the latest checkpoint as an argument, otherwise Flink will start > from scratch. > > You're probably thinking that's not great, ideally Flink should be able to > automatically continue from the last produced checkpoint, and actually > that's what the docs say! Well, that's only when you're running in a proper > cluster environment. Flink is able to recover using checkpoints when only > part of the cluster fails, not when the whole job is stopped. For full > stops you need to specify the checkpoint manually. > > Hope that helps! > > > On Fri, 2 Aug 2019, 10:05 Vasily Melnik, < > vasily.mel...@glowbyteconsulting.com> wrote: > >> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source >> and Sink: >> >> 1. Run flink app, simply transferring messages from one topic to >> another with parallelism=1, checkpoint interval 20 seconds >> 2. Generate messages with incrementing integer numbers using Python >> script each 2 seconds. >> 3. Read output topic with console consumer in read_committed >> isolation level. >> 4. Manually kill TaskManager >> >> I expect to see monotonically increasing integers in output topic >> regardless TaskManager killing and recovery. >> >> But actually a see something unexpected in console-consumer output: >> >> 32 >> 33 >> 34 >> 35 >> 36 >> 37 >> 38 >> 39 >> 40 >> -- TaskManager Killed >> 32 >> 34 >> 35 >> 36 >> 40 >> 41 >> 46 >> 31 >> 33 >> 37 >> 38 >> 39 >> 42 >> 43 >> 44 >> 45 >> >> Looks like all messages between checkpoints where replayed in output >> topic. Also i expected to see results in output topic only after >> checkpointing i.e. each 20 seconds, but messages appeared in output >> immediately as they where send to input. >> Is it supposed to be correct behaviour or i do something wrong? >> >> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional >> producer and read-committed console comsumer with custom code and it worked >> perfectly well reading messages only after commitTransaction on producer. >> >> My Flink code: >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.getConfig().setAutoWatermarkInterval(1000); >> env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE); >> env.setStateBackend(new >> RocksDBStateBackend("hdfs:///checkpoints-data")); >> >> Properties producerProperty = new Properties(); >> producerProperty.setProperty("bootstrap.servers", ...); >> producerProperty.setProperty("zookeeper.connect", ...); >> >> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000"); >> >> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction"); >> >> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, >> "true"); >> >> Properties consumerProperty = new Properties(); >> consumerProperty.setProperty("bootstrap.servers", ...); >> consumerProperty.setProperty("zookeeper.connect", ...); >> consumerProperty.setProperty("group.id", "test2"); >> >> FlinkKafkaConsumer<String> consumer1 = new >> FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), >> consumerProperty); >> consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner()); >> >> FlinkKafkaProducer<String> producer1 = new >> FlinkKafkaProducer<String>("test", new KeyedSerializationSchemaWrapper(new >> SimpleStringSchema()), producerProperty, >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> producer1.ignoreFailuresAfterTransactionTimeout(); >> DataStreamSource<String> s1 = env.addSource(consumer1); >> s1.addSink(producer1); >> env.execute("Test"); >> >> >>