Hi Pushkar,
I think there is a misunderstanding. If a consumer polls from a
partition, it will always poll the next event independently whether the
offset was committed or not. Committed offsets are used for fault
tolerance, i.e., when a consumer crashes, the consumer that takes over
the work of the crashed consumer will start polling record from the
offset the crashed consumer committed last. This is not only true for
Kafka Streams, but for all applications that use a Kafka consumer with
subscription.
To be clear, my proposal is not a workaround. This is one approach to
solve your problem in Kafka Streams. You could have a look into
stream-stream joins if you can use a stream instead of a global table.
Another approach would be to use a plain Kafka consumer instead of Kafka
Stream with which you would have a more fine-grained control about polls
and commits. In any case, be aware that blocking processing on an event
indefinitely may result in your lag and/or your state growing
indefinitely.
If you think there is something missing in Kafka Streams, you are very
welcome to search through the tickets in
https://issues.apache.org/jira/projects/KAFKA/issues and comment on
tickets that would solve your issue or create a new one if you cannot
find any.
Best,
Bruno
On 22.09.20 05:09, Pushkar Deole wrote:
Bruno,
So, essentially, we are just waiting on the processing of first event that
got an error before going ahead on to the next one.
Second, if application handles storing the events in state store for retry,
Kafka stream would essentially commit the offset of those events, so next
event will be polled by consumer, correct?
Instead of this work around, is there any provision in kafka streams for
this scenario? e.g. in case application registers application level
exceptions then kafka streams will take care of it and do all this
internally, and will not commit the offset of that event and hence will
keep polling the same event again?
Since this is a common scenario, using a particular configuration for users
can achieve this in Kafka streams internally?
On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna <br...@confluent.io> wrote:
Hi Pushkar,
If you want to keep the order, you could still use the state store I
suggested in my previous e-mail and implement a queue on top of it. For
that you need to put the events into the store with a key that
represents the arrival order of the events. Each time a record is
received from the input topic, the events are read in arrival order from
the state store and the data in the global table is probed. If an event
matches data from the global table the event is removed from the state
store and emitted. If an event does not match data from the global table
the processing is stopped and nothing is emitted.
Best,
Bruno
On 21.09.20 14:21, Pushkar Deole wrote:
Bruno,
1. the loading of topic mapped to GlobalKTable is done by some other
service/application so when my application starts up, it will just sync a
GlobalKTable against that topic and if that other service/application is
still starting up then it may not have loaded that data on that topic and
that's the reason it is not available to my application through the
GlobalKTable.
2. I don't want out of order processing to happen, that's the reason I
want
my streams application to keep trying same event until the other
application starts up and required data becomes available in globalKtable
On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna <br...@confluent.io>
wrote:
Thank you for clarifying! Now, I think I understand.
You could put events for which required data in the global table is not
available into a state store and each time an event from the input topic
is processed, you could lookup all events in your state store and see if
required data is now available for them.
However, be aware that this can mix up the original order of the events
in your input topic if required data of later events is available before
required data of earlier events. Furthermore, you need to consider the
case when you have a huge amount of events in the state store and
suddenly all required data in the global table is available, because
processing all those events at once might lead to exceeding
max.poll.interval.ms and the stream thread might be kicked out of the
consumer group. To solve that, you may want to limit the number of
events processed at once. You also need to avoid the state store growing
indefinitely if required data in the global table is not available for a
long time or not available at all. Maybe all this caveats do not apply
to your use case.
Best,
Bruno
On 21.09.20 13:45, Pushkar Deole wrote:
Say the application level exception is named as :
MeasureDefinitionNotAvaialbleException
What I am trying to achieve is: in above case when the event processing
fails due to required data not available, the streams should not
proceed
on
to next event, however it should wait for the processing of current
event
to complete. If I just catch the MeasureDefinitionNotAvaialbleException
in
processor and log it then the stream will proceed onto next event
considering the current event processing got successful right?
On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole <pdeole2...@gmail.com>
wrote:
It is not a kafka streams error, it is an application level error e.g.
say, some data required for processing an input event is not available
in
the GlobalKTable since it is not yet synced with the global topic
On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna <br...@confluent.io>
wrote:
Hi Pushkar,
Is the error you are talking about, one that is thrown by Kafka
Streams
or by your application? If it is thrown by Kafka Streams, could you
please post the error?
I do not completely understand what you are trying to achieve, but
maybe
max.task.idle.ms [1] is the configuration you are looking for.
I can assure you that enable.auto.commit is false in Kafka Streams.
What
you probably mean is that Kafka Streams periodically commits the
offsets. The commit interval can be controlled with
commit.interval.ms
[2].
Best,
Bruno
[1] https://kafka.apache.org/documentation/#max.task.idle.ms
[2] https://kafka.apache.org/documentation/#commit.interval.ms
On 21.09.20 12:38, Pushkar Deole wrote:
Hi,
I would like to know how to handle following scenarios while
processing
events in a kafka streams application:
1. the streams application needs data from a globalKtable which
loads
it
from a topic that is populated by some other service/application.
So,
if
the streams application starts getting events from input source
topic
however it doesn't find required data in GlobalKTable since that
other
application/service hasn't yet loaded that data then the Kafka
streams
application gets error while processing the event and application
handles
the exception by logging an error and it goes onto processing other
events. Since auto.commit is true, the polling will go on fetching
next
batch and probably it will set the offset of previous batch, causing
loss
of events that had an exception while processing.
I want to halt the processing here if an error occurs while
processing
the
event, so instead of going on to the next event, the processing
should
keep
trying previous event until application level error is resolved. How
can I
achieve this?