(This is the application I had in mind yesterday, when erroneously thinking
I could commit specific offsets.)

I have a Kafka-consuming app which subscribes to two related topics.
Messages from one topic will match up with a message in the other topic. I
need to commit in such a way that if my app is restarted it's not left with
the two message streams out-of-sync. Let me give a concrete, though
hypothetical example.

Producers A and B both publish a stream of ever-increasing integers, though
with random intervals between production. To throw a wrench in the works,
every now and then one or the other will hiccup and simply fail to publish
a number altogether. Neither will ever publish the same number twice. For
example:

prod A: 1 3    4     5 6 7    8 9
prod B: 1    2 3 4  5 7 8 9
time  :     x          y

The consumer needs to commit in such a way that it will restart properly,
neither reprocessing old messages, nor failing to back up and reprocess
messages from one producer which would eventually have been matched with
messages from the other producer. If restarted at time x, 3 should be
grabbed from producer A. If restarted at time y, it would be nice if we
didn't reprocess 3, 4 and 5, despite the fact that A is never going to
produce a "2". (In my real-world application, message numbers won't always
arrive in monotonically increasing fashion, so in the timeline above,
matching up a pair of "3"s doesn't mean I will be able to discard numbers <
3.)

Here's a simple, incorrect, example (apologies, typing this into Gmail on a
computer without Kafka, so it's tough to check my work):

pending = set()

consumer = kafka.KafkaConsumer(...)
consumer.subscribe(["A", "B"])

for message in consumer:
    if message.value in pending:
        print("We have a match!", message.value)
        pending.remove(message.value)
    else:
        pending.add(message.value)

Now, I didn't disable auto-commit, so every now and then the application
will synchronize with the broker. That's a problem, because if the app is
restarted, whatever was in the pending set won't be properly reprocessed.
Let's assume I did turn off auto-commit so we can do our own committing:

for message in consumer:
    if message.value in pending:
        print("We have a match!", message.value)
        pending.remove(message.value)
        consumer.commit()
    else:
        pending.add(message.value)

That's no better, as pending might still contain unmatched integers, so a
restart will fail to reprocess them. Let's only commit when there are no
unmatched values:

for message in consumer:
    if message.value in pending:
        print("We have a match!", message.value)
        pending.remove(message.value)
        if not pending:
            consumer.commit()
    else:
        pending.add(message.value)

That's better in the common case, but recall that I said every now and then
one of the producers would hiccup and simply drop its next value on the
floor. After such an event occurs, pending will never again be empty, so we
will never commit. Upon restart, you might reprocess bazillions of messages.

The best "solution" I can come up with is only a heuristic. Remember when a
number arrived, and after some delta t delete it, or at least move it to a
"stale" set so the pending-is-empty test can succeed again. Whenever a new
integer arrives, you have to check both pending and stale sets. In the
worst case, you might have some leftovers, though if you restart at just
the right (wrong?) time, you run the risk of discarding a pending - but
stale - value which would have eventually be matched.

Is there a better solution than my discard-after-awhile heuristic?

Thx,

Skip Montanaro

Reply via email to