Great to hear it works! `setStartFromGroupOffset` [1] will start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the 'auto.offset.reset' setting in the properties will be used. And the default value of 'auto.offset.reset' property is latest [2].
I think that's why `setStartFromGroupOffset` doesn't consume all the events. [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration [2]: https://kafka.apache.org/documentation/#auto.offset.reset On Fri, 6 Nov 2020 at 07:04, Kevin Kwon <fsw0...@gmail.com> wrote: > Hi Jark, setStartFromEarliest actually worked. It's strange since my test > is stateless (complete teardown of all docker containers) and the consumer > creates the topic once it starts consuming a topic. I was assuming the > setStartFromGroupOffset will let the consumer consume from the beginning > anyways. I'll share the code if I have any further problems, since I can't > just copy paste code created inside my company > > Thanks though! I appreciate your help > > On Thu, Nov 5, 2020 at 4:55 AM Jark Wu <imj...@gmail.com> wrote: > >> Hi Kevin, >> >> Could you share the code of how you register the FlinkKafkaConsumer as a >> table? >> >> Regarding your initialization of FlinkKafkaConsumer, I would recommend to >> setStartFromEarliest() to guarantee it consumes all the records in >> partitions. >> >> Regarding the flush(), it seems it is in the foreach loop? So it is not >> flushing >> after publishing ALL events? >> I'm not experienced with the flush() API, could this method block and the >> following random events can't be published to Kafka? >> >> Best, >> Jark >> >> On Wed, 4 Nov 2020 at 04:04, Robert Metzger <rmetz...@apache.org> wrote: >> >>> Hi Kevin, >>> thanks a lot for posting this problem. >>> I'm adding Jark to the thread, he or another committer working on Flink >>> SQL can maybe provide some insights. >>> >>> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fsw0...@gmail.com> wrote: >>> >>>> Looks like the event time that I've specified in the consumer is not >>>> being respected. Does the timestamp assigner actually work in Kafka >>>> consumers? >>>> >>>> .withTimestampAssigner(new SerializableTimestampAssigner[Order] { >>>> override def extractTimestamp(order: Order, recordTimestamp: >>>> Long): Long = { >>>> order.getTimestamp >>>> } >>>> }) >>>> >>>> >>>> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote: >>>> >>>>> Hi guys, I've been recently experimenting with end-to-end testing >>>>> environment with Kafka and Flink (1.11) >>>>> >>>>> I've setup an infrastructure with Docker Compose composed of single >>>>> Kafka broker / Flink (1.11) / MinIO for checkpoint saves >>>>> >>>>> Here's the test scenario >>>>> >>>>> 1. Send 1000 messages with manual timestamp assigned to each event >>>>> increased by 100 milliseconds per loop (first message and last message has >>>>> a difference of 100 seconds). There are 3 partitions for the topic I'm >>>>> writing to. Below code is the test message producer using Confluent's >>>>> Python SDK >>>>> >>>>> order_producer = get_order_producer() >>>>> current_timestamp = int(round(time() * 1000)) >>>>> for i in range(0, 1000): >>>>> order_producer.produce( >>>>> topic="order", >>>>> key={"key": i % 100}, >>>>> value={ >>>>> "id": 1000, >>>>> "customerId": i % 10, >>>>> "timestamp": current_timestamp + i * 100 >>>>> } >>>>> ) >>>>> order_producer.flush() >>>>> >>>>> >>>>> 2. Flink performs an SQL query on this stream and publishes it back to >>>>> Kafka topic that has 3 partitions. Below is the SQL code >>>>> >>>>> | SELECT >>>>> | o.id, >>>>> | COUNT(*), >>>>> | TUMBLE_END(o.ts, INTERVAL '5' SECONDS) >>>>> | FROM >>>>> | order o >>>>> | GROUP BY >>>>> | o.id, >>>>> | TUMBLE(o.ts, INTERVAL '5' SECONDS) >>>>> >>>>> So I expect the sum of all the counts of the result to be equal to >>>>> 1000 but it seems that a lot of messages are missing (797 as below). I >>>>> can't seem to figure out why though. I'm using event time for the >>>>> environment >>>>> >>>>> [image: Screenshot 2020-11-02 at 23.35.23.png] >>>>> >>>>> *Below is the configuration code* >>>>> Here's the code for the consumer settings for Kafka >>>>> >>>>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = { >>>>> val properties = new Properties() >>>>> properties.setProperty("bootstrap.servers", kafkaBrokers) >>>>> properties.setProperty("group.id", "awesome_order") >>>>> >>>>> val kafkaConsumer = new FlinkKafkaConsumer[Order]( >>>>> "order", >>>>> ConfluentRegistryAvroDeserializationSchema.forSpecific[Order]( >>>>> classOf[Order], >>>>> kafkaSchemaRegistry >>>>> ), >>>>> properties >>>>> ) >>>>> kafkaConsumer.setCommitOffsetsOnCheckpoints(true) >>>>> kafkaConsumer.setStartFromGroupOffsets() >>>>> kafkaConsumer.assignTimestampsAndWatermarks { >>>>> WatermarkStrategy >>>>> .forBoundedOutOfOrderness[Order](Duration.ofMillis(100)) >>>>> .withTimestampAssigner(new SerializableTimestampAssigner[Order] { >>>>> override def extractTimestamp(order: Order, recordTimestamp: >>>>> Long): Long = { >>>>> order.getTimestamp >>>>> } >>>>> }) >>>>> } >>>>> kafkaConsumer >>>>> } >>>>> >>>>> Afterwards, >>>>> 1. I create a tempview from this source data stream >>>>> 2. perform SQL queries on it >>>>> 3. append it back to a processed datastream >>>>> 4. attach the stream to kafka sink >>>>> >>>>> Here's the code for the producer settings for Kafka >>>>> >>>>> private def initProcessedModelProducer(): >>>>> FlinkKafkaProducer[ProcessedModel] = { >>>>> val properties: Properties = new Properties() >>>>> properties.put("bootstrap.servers", kafkaBrokers) >>>>> properties.put("transaction.timeout.ms", "60000") >>>>> >>>>> val kafkaProducer = new FlinkKafkaProducer[ProcessedModel]( >>>>> "processed_model", >>>>> ConfluentRegistryAvroSerializationSchema.forSpecific( >>>>> classOf[ProcessedModel], >>>>> "procssed_model-value", >>>>> kafkaSchemaRegistry >>>>> ), >>>>> properties, >>>>> null, >>>>> Semantic.EXACTLY_ONCE, >>>>> 5 >>>>> ) >>>>> kafkaProducer >>>>> } >>>>> >>>>> >>>>> >>>>> *Side Note* >>>>> Another interesting part is that, if I flush "after" publishing all >>>>> events, the processed event doesn't even seem to arrive at the sink at >>>>> all. The source is still populated in normally in Flink. It's as if there >>>>> is no progress after the message arrived to source >>>>> >>>>> order_producer = get_order_producer() >>>>> current_timestamp = int(round(time() * 1000)) >>>>> for i in range(0, 1000): >>>>> order_producer.produce( >>>>> topic="order", >>>>> key={"key": i % 100}, >>>>> value={ >>>>> "id": 1000, >>>>> "customerId": i % 10, >>>>> "timestamp": current_timestamp + i * 100 >>>>> } >>>>> ) >>>>> order_producer.flush() # if I flush "AFTER" the loop, there is no >>>>> processed data in the sink of Flink. event itself arrives without any >>>>> problem in the source in Flink though >>>>> >>>>>