Hi Kevin,

Could you share the code of how you register the FlinkKafkaConsumer as a
table?

Regarding your initialization of FlinkKafkaConsumer, I would recommend to
setStartFromEarliest() to guarantee it consumes all the records in
partitions.

Regarding the flush(), it seems it is in the foreach loop? So it is
not flushing
after publishing ALL events?
I'm not experienced with the flush() API, could this method block and the
following random events can't be published to Kafka?

Best,
Jark

On Wed, 4 Nov 2020 at 04:04, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Kevin,
> thanks a lot for posting this problem.
> I'm adding Jark to the thread, he or another committer working on Flink
> SQL can maybe provide some insights.
>
> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fsw0...@gmail.com> wrote:
>
>> Looks like the event time that I've specified in the consumer is not
>> being respected. Does the timestamp assigner actually work in Kafka
>> consumers?
>>
>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>         override def extractTimestamp(order: Order, recordTimestamp: Long): 
>> Long = {
>>           order.getTimestamp
>>         }
>>       })
>>
>>
>> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote:
>>
>>> Hi guys, I've been recently experimenting with end-to-end testing
>>> environment with Kafka and Flink (1.11)
>>>
>>> I've setup an infrastructure with Docker Compose composed of single
>>> Kafka broker / Flink (1.11) / MinIO for checkpoint saves
>>>
>>> Here's the test scenario
>>>
>>> 1. Send 1000 messages with manual timestamp assigned to each event
>>> increased by 100 milliseconds per loop (first message and last message has
>>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>>> writing to. Below code is the test message producer using Confluent's
>>> Python SDK
>>>
>>> order_producer = get_order_producer()
>>> current_timestamp = int(round(time() * 1000))
>>> for i in range(0, 1000):
>>>     order_producer.produce(
>>>         topic="order",
>>>         key={"key": i % 100},
>>>         value={
>>>             "id": 1000,
>>>             "customerId": i % 10,
>>>             "timestamp": current_timestamp + i * 100
>>>         }
>>>     )
>>>     order_producer.flush()
>>>
>>>
>>> 2. Flink performs an SQL query on this stream and publishes it back to
>>> Kafka topic that has 3 partitions. Below is the SQL code
>>>
>>> | SELECT
>>> |   o.id,
>>> |   COUNT(*),
>>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>>> | FROM
>>> |   order o
>>> | GROUP BY
>>> |   o.id,
>>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>>
>>> So I expect the sum of all the counts of the result to be equal to 1000
>>> but it seems that a lot of messages are missing (797 as below). I can't
>>> seem to figure out why though. I'm using event time for the environment
>>>
>>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>>
>>> *Below is the configuration code*
>>> Here's the code for the consumer settings for Kafka
>>>
>>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>>   val properties = new Properties()
>>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>>   properties.setProperty("group.id", "awesome_order")
>>>
>>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>>     "order",
>>>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>>       classOf[Order],
>>>       kafkaSchemaRegistry
>>>     ),
>>>     properties
>>>   )
>>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>>   kafkaConsumer.setStartFromGroupOffsets()
>>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>>     WatermarkStrategy
>>>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>>         override def extractTimestamp(order: Order, recordTimestamp: Long): 
>>> Long = {
>>>           order.getTimestamp
>>>         }
>>>       })
>>>   }
>>>   kafkaConsumer
>>> }
>>>
>>> Afterwards,
>>> 1. I create a tempview from this source data stream
>>> 2. perform SQL queries on it
>>> 3. append it back to a processed datastream
>>> 4. attach the stream to kafka sink
>>>
>>> Here's the code for the producer settings for Kafka
>>>
>>> private def initProcessedModelProducer(): 
>>> FlinkKafkaProducer[ProcessedModel] = {
>>>   val properties: Properties = new Properties()
>>>   properties.put("bootstrap.servers", kafkaBrokers)
>>>   properties.put("transaction.timeout.ms", "60000")
>>>
>>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>>     "processed_model",
>>>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>>>       classOf[ProcessedModel],
>>>       "procssed_model-value",
>>>       kafkaSchemaRegistry
>>>     ),
>>>     properties,
>>>     null,
>>>     Semantic.EXACTLY_ONCE,
>>>     5
>>>   )
>>>   kafkaProducer
>>> }
>>>
>>>
>>>
>>> *Side Note*
>>> Another interesting part is that, if I flush "after" publishing all events, 
>>> the processed event doesn't even seem to arrive at the sink at all. The 
>>> source is still populated in normally in Flink. It's as if there is no 
>>> progress after the message arrived to source
>>>
>>> order_producer = get_order_producer()
>>> current_timestamp = int(round(time() * 1000))
>>> for i in range(0, 1000):
>>>     order_producer.produce(
>>>         topic="order",
>>>         key={"key": i % 100},
>>>         value={
>>>             "id": 1000,
>>>             "customerId": i % 10,
>>>             "timestamp": current_timestamp + i * 100
>>>         }
>>>     )
>>>     order_producer.flush()  # if I flush "AFTER" the loop, there is no 
>>> processed data in the sink of Flink. event itself arrives without any 
>>> problem in the source in Flink though
>>>
>>>

Reply via email to