Hi, Eduardo.
Maybe i should describe experiment design  precisely :
1) I run Flink on YARN (YARN Session method).
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint
restore from HDFS happens.

That's why i expect to see correct restore.

С уважением,
Василий Мельник

*Glow**Byte Consulting* <http://www.gbconsulting.ru/>

===================

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com


On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com> wrote:

> Hi Vasily,
>
> You're probably executing this from your IDE or from a local Flink cluster
> without starting your job from a checkpoint.
>
> When you start your Flink job for the second time you need to specify the
> path to the latest checkpoint as an argument, otherwise Flink will start
> from scratch.
>
> You're probably thinking that's not great, ideally Flink should be able to
> automatically continue from the last produced checkpoint, and actually
> that's what the docs say! Well, that's only when you're running in a proper
> cluster environment. Flink is able to recover using checkpoints when only
> part of the cluster fails, not when the whole job is stopped. For full
> stops you need to specify the checkpoint manually.
>
> Hope that helps!
>
>
> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>> and Sink:
>>
>>    1. Run flink app, simply transferring messages from one topic to
>>    another with parallelism=1, checkpoint interval 20 seconds
>>    2. Generate messages with incrementing integer numbers using Python
>>    script each 2 seconds.
>>    3. Read output topic with console consumer in read_committed
>>    isolation level.
>>    4. Manually kill TaskManager
>>
>> I expect to see monotonically increasing integers in output topic
>> regardless TaskManager killing and recovery.
>>
>> But actually a see something unexpected in console-consumer output:
>>
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> -- TaskManager Killed
>> 32
>> 34
>> 35
>> 36
>> 40
>> 41
>> 46
>> 31
>> 33
>> 37
>> 38
>> 39
>> 42
>> 43
>> 44
>> 45
>>
>> Looks like all messages between checkpoints where replayed in output
>> topic. Also i expected to see results in output topic only after
>> checkpointing i.e. each 20 seconds, but messages appeared in output
>> immediately as they where send to input.
>> Is it supposed to be correct behaviour or i do something wrong?
>>
>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>> producer and read-committed console comsumer with custom code and it worked
>> perfectly well reading messages only after commitTransaction on producer.
>>
>> My Flink code:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>         env.getConfig().setAutoWatermarkInterval(1000);
>>         env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
>>         env.setStateBackend(new 
>> RocksDBStateBackend("hdfs:///checkpoints-data"));
>>
>>         Properties producerProperty = new Properties();
>>         producerProperty.setProperty("bootstrap.servers", ...);
>>         producerProperty.setProperty("zookeeper.connect", ...);
>>         
>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
>>         
>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>>         
>> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
>> "true");
>>
>>         Properties consumerProperty = new Properties();
>>         consumerProperty.setProperty("bootstrap.servers", ...);
>>         consumerProperty.setProperty("zookeeper.connect", ...);
>>         consumerProperty.setProperty("group.id", "test2");
>>
>>         FlinkKafkaConsumer<String> consumer1 = new 
>> FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), 
>> consumerProperty);
>>         consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
>>
>>         FlinkKafkaProducer<String> producer1 = new 
>> FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new 
>> SimpleStringSchema()), producerProperty, 
>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>         producer1.ignoreFailuresAfterTransactionTimeout();
>>         DataStreamSource<String> s1 = env.addSource(consumer1);
>>         s1.addSink(producer1);
>>         env.execute("Test");
>>
>>
>>

Reply via email to