I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and
Sink:

   1. Run flink app, simply transferring messages from one topic to another
   with parallelism=1, checkpoint interval 20 seconds
   2. Generate messages with incrementing integer numbers using Python
   script each 2 seconds.
   3. Read output topic with console consumer in read_committed isolation
   level.
   4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic
regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic.
Also i expected to see results in output topic only after checkpointing
i.e. each 20 seconds, but messages appeared in output immediately as they
where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
producer and read-committed console comsumer with custom code and it worked
perfectly well reading messages only after commitTransaction on producer.

My Flink code:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new
RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        
producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        
producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
"true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new
FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(),
consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new
FlinkKafkaProducer<String>("test",  new
KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

Reply via email to