Great to hear it works!

`setStartFromGroupOffset` [1] will start reading partitions from the
consumer group’s (group.id setting in the consumer properties) committed
offsets in Kafka brokers. If offsets could not be found for a partition,
the 'auto.offset.reset' setting in the properties will be used. And the
default value of 'auto.offset.reset' property is latest [2].

I think that's why `setStartFromGroupOffset` doesn't consume all the events.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
[2]: https://kafka.apache.org/documentation/#auto.offset.reset

On Fri, 6 Nov 2020 at 07:04, Kevin Kwon <fsw0...@gmail.com> wrote:

> Hi Jark, setStartFromEarliest actually worked. It's strange since my test
> is stateless (complete teardown of all docker containers) and the consumer
> creates the topic once it starts consuming a topic. I was assuming the
> setStartFromGroupOffset will let the consumer consume from the beginning
> anyways. I'll share the code if I have any further problems, since I can't
> just copy paste code created inside my company
>
> Thanks though! I appreciate your help
>
> On Thu, Nov 5, 2020 at 4:55 AM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Kevin,
>>
>> Could you share the code of how you register the FlinkKafkaConsumer as a
>> table?
>>
>> Regarding your initialization of FlinkKafkaConsumer, I would recommend to
>> setStartFromEarliest() to guarantee it consumes all the records in
>> partitions.
>>
>> Regarding the flush(), it seems it is in the foreach loop? So it is not 
>> flushing
>> after publishing ALL events?
>> I'm not experienced with the flush() API, could this method block and the
>> following random events can't be published to Kafka?
>>
>> Best,
>> Jark
>>
>> On Wed, 4 Nov 2020 at 04:04, Robert Metzger <rmetz...@apache.org> wrote:
>>
>>> Hi Kevin,
>>> thanks a lot for posting this problem.
>>> I'm adding Jark to the thread, he or another committer working on Flink
>>> SQL can maybe provide some insights.
>>>
>>> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fsw0...@gmail.com> wrote:
>>>
>>>> Looks like the event time that I've specified in the consumer is not
>>>> being respected. Does the timestamp assigner actually work in Kafka
>>>> consumers?
>>>>
>>>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>>>         override def extractTimestamp(order: Order, recordTimestamp: 
>>>> Long): Long = {
>>>>           order.getTimestamp
>>>>         }
>>>>       })
>>>>
>>>>
>>>> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote:
>>>>
>>>>> Hi guys, I've been recently experimenting with end-to-end testing
>>>>> environment with Kafka and Flink (1.11)
>>>>>
>>>>> I've setup an infrastructure with Docker Compose composed of single
>>>>> Kafka broker / Flink (1.11) / MinIO for checkpoint saves
>>>>>
>>>>> Here's the test scenario
>>>>>
>>>>> 1. Send 1000 messages with manual timestamp assigned to each event
>>>>> increased by 100 milliseconds per loop (first message and last message has
>>>>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>>>>> writing to. Below code is the test message producer using Confluent's
>>>>> Python SDK
>>>>>
>>>>> order_producer = get_order_producer()
>>>>> current_timestamp = int(round(time() * 1000))
>>>>> for i in range(0, 1000):
>>>>>     order_producer.produce(
>>>>>         topic="order",
>>>>>         key={"key": i % 100},
>>>>>         value={
>>>>>             "id": 1000,
>>>>>             "customerId": i % 10,
>>>>>             "timestamp": current_timestamp + i * 100
>>>>>         }
>>>>>     )
>>>>>     order_producer.flush()
>>>>>
>>>>>
>>>>> 2. Flink performs an SQL query on this stream and publishes it back to
>>>>> Kafka topic that has 3 partitions. Below is the SQL code
>>>>>
>>>>> | SELECT
>>>>> |   o.id,
>>>>> |   COUNT(*),
>>>>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>>>>> | FROM
>>>>> |   order o
>>>>> | GROUP BY
>>>>> |   o.id,
>>>>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>>>>
>>>>> So I expect the sum of all the counts of the result to be equal to
>>>>> 1000 but it seems that a lot of messages are missing (797 as below). I
>>>>> can't seem to figure out why though. I'm using event time for the
>>>>> environment
>>>>>
>>>>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>>>>
>>>>> *Below is the configuration code*
>>>>> Here's the code for the consumer settings for Kafka
>>>>>
>>>>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>>>>   val properties = new Properties()
>>>>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>>>>   properties.setProperty("group.id", "awesome_order")
>>>>>
>>>>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>>>>     "order",
>>>>>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>>>>       classOf[Order],
>>>>>       kafkaSchemaRegistry
>>>>>     ),
>>>>>     properties
>>>>>   )
>>>>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>>>>   kafkaConsumer.setStartFromGroupOffsets()
>>>>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>>>>     WatermarkStrategy
>>>>>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>>>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>>>>         override def extractTimestamp(order: Order, recordTimestamp: 
>>>>> Long): Long = {
>>>>>           order.getTimestamp
>>>>>         }
>>>>>       })
>>>>>   }
>>>>>   kafkaConsumer
>>>>> }
>>>>>
>>>>> Afterwards,
>>>>> 1. I create a tempview from this source data stream
>>>>> 2. perform SQL queries on it
>>>>> 3. append it back to a processed datastream
>>>>> 4. attach the stream to kafka sink
>>>>>
>>>>> Here's the code for the producer settings for Kafka
>>>>>
>>>>> private def initProcessedModelProducer(): 
>>>>> FlinkKafkaProducer[ProcessedModel] = {
>>>>>   val properties: Properties = new Properties()
>>>>>   properties.put("bootstrap.servers", kafkaBrokers)
>>>>>   properties.put("transaction.timeout.ms", "60000")
>>>>>
>>>>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>>>>     "processed_model",
>>>>>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>>>>>       classOf[ProcessedModel],
>>>>>       "procssed_model-value",
>>>>>       kafkaSchemaRegistry
>>>>>     ),
>>>>>     properties,
>>>>>     null,
>>>>>     Semantic.EXACTLY_ONCE,
>>>>>     5
>>>>>   )
>>>>>   kafkaProducer
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> *Side Note*
>>>>> Another interesting part is that, if I flush "after" publishing all 
>>>>> events, the processed event doesn't even seem to arrive at the sink at 
>>>>> all. The source is still populated in normally in Flink. It's as if there 
>>>>> is no progress after the message arrived to source
>>>>>
>>>>> order_producer = get_order_producer()
>>>>> current_timestamp = int(round(time() * 1000))
>>>>> for i in range(0, 1000):
>>>>>     order_producer.produce(
>>>>>         topic="order",
>>>>>         key={"key": i % 100},
>>>>>         value={
>>>>>             "id": 1000,
>>>>>             "customerId": i % 10,
>>>>>             "timestamp": current_timestamp + i * 100
>>>>>         }
>>>>>     )
>>>>>     order_producer.flush()  # if I flush "AFTER" the loop, there is no 
>>>>> processed data in the sink of Flink. event itself arrives without any 
>>>>> problem in the source in Flink though
>>>>>
>>>>>

Reply via email to