Looks like the event time that I've specified in the consumer is not being respected. Does the timestamp assigner actually work in Kafka consumers?
.withTimestampAssigner(new SerializableTimestampAssigner[Order] { override def extractTimestamp(order: Order, recordTimestamp: Long): Long = { order.getTimestamp } }) On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote: > Hi guys, I've been recently experimenting with end-to-end testing > environment with Kafka and Flink (1.11) > > I've setup an infrastructure with Docker Compose composed of single Kafka > broker / Flink (1.11) / MinIO for checkpoint saves > > Here's the test scenario > > 1. Send 1000 messages with manual timestamp assigned to each event > increased by 100 milliseconds per loop (first message and last message has > a difference of 100 seconds). There are 3 partitions for the topic I'm > writing to. Below code is the test message producer using Confluent's > Python SDK > > order_producer = get_order_producer() > current_timestamp = int(round(time() * 1000)) > for i in range(0, 1000): > order_producer.produce( > topic="order", > key={"key": i % 100}, > value={ > "id": 1000, > "customerId": i % 10, > "timestamp": current_timestamp + i * 100 > } > ) > order_producer.flush() > > > 2. Flink performs an SQL query on this stream and publishes it back to > Kafka topic that has 3 partitions. Below is the SQL code > > | SELECT > | o.id, > | COUNT(*), > | TUMBLE_END(o.ts, INTERVAL '5' SECONDS) > | FROM > | order o > | GROUP BY > | o.id, > | TUMBLE(o.ts, INTERVAL '5' SECONDS) > > So I expect the sum of all the counts of the result to be equal to 1000 > but it seems that a lot of messages are missing (797 as below). I can't > seem to figure out why though. I'm using event time for the environment > > [image: Screenshot 2020-11-02 at 23.35.23.png] > > *Below is the configuration code* > Here's the code for the consumer settings for Kafka > > private def initOrderConsumer(): FlinkKafkaConsumer[Order] = { > val properties = new Properties() > properties.setProperty("bootstrap.servers", kafkaBrokers) > properties.setProperty("group.id", "awesome_order") > > val kafkaConsumer = new FlinkKafkaConsumer[Order]( > "order", > ConfluentRegistryAvroDeserializationSchema.forSpecific[Order]( > classOf[Order], > kafkaSchemaRegistry > ), > properties > ) > kafkaConsumer.setCommitOffsetsOnCheckpoints(true) > kafkaConsumer.setStartFromGroupOffsets() > kafkaConsumer.assignTimestampsAndWatermarks { > WatermarkStrategy > .forBoundedOutOfOrderness[Order](Duration.ofMillis(100)) > .withTimestampAssigner(new SerializableTimestampAssigner[Order] { > override def extractTimestamp(order: Order, recordTimestamp: Long): > Long = { > order.getTimestamp > } > }) > } > kafkaConsumer > } > > Afterwards, > 1. I create a tempview from this source data stream > 2. perform SQL queries on it > 3. append it back to a processed datastream > 4. attach the stream to kafka sink > > Here's the code for the producer settings for Kafka > > private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] > = { > val properties: Properties = new Properties() > properties.put("bootstrap.servers", kafkaBrokers) > properties.put("transaction.timeout.ms", "60000") > > val kafkaProducer = new FlinkKafkaProducer[ProcessedModel]( > "processed_model", > ConfluentRegistryAvroSerializationSchema.forSpecific( > classOf[ProcessedModel], > "procssed_model-value", > kafkaSchemaRegistry > ), > properties, > null, > Semantic.EXACTLY_ONCE, > 5 > ) > kafkaProducer > } > > > > *Side Note* > Another interesting part is that, if I flush "after" publishing all events, > the processed event doesn't even seem to arrive at the sink at all. The > source is still populated in normally in Flink. It's as if there is no > progress after the message arrived to source > > order_producer = get_order_producer() > current_timestamp = int(round(time() * 1000)) > for i in range(0, 1000): > order_producer.produce( > topic="order", > key={"key": i % 100}, > value={ > "id": 1000, > "customerId": i % 10, > "timestamp": current_timestamp + i * 100 > } > ) > order_producer.flush() # if I flush "AFTER" the loop, there is no > processed data in the sink of Flink. event itself arrives without any problem > in the source in Flink though > >