I was just looking to resolve this by using KafkaProducer's transaction API, assuming the underlying issue is just the async, unordered nature of actual producer sends. I turned on idempotent mode via producer config, which (as expected) reduced the failure rate but didn't eliminate it.
Actual sends are done in KafkaSystemProducer. There is a constructor parameter `getProducer` which I can't customize as it is hard-coded in KafkaSystemFactory. I *could* sub-class this factory and use my own KafkaProducer supplier function, but SAMZA-1919 was closed as "Won't Fix" for some reason -- so I would need to rewrite (copy/paste ;) KafkaSystemDescriptor because there's no constructor defined that would allow me to avoid the hard-coded factory class. What was the impetus for deciding to close SAMZA-1919 as "Won't Fix"? Are there other plans for customizing system components? Cheers, Tom Tom Davis <t...@recursivedream.com> writes:
Hey folks! We have noticed some inconsistencies in message ordering when running a StreamApplication that calls two separate `map` functions over an input and sends results to the same output. I have attached my Execution Plan, but the gist is that the first `map` function marks a thing as "pending" by sending a message to a status topic and the second `map` function does some work then sends its own status with "done". We have a test set up to read the resulting status topic with a normal Kafka consumer to ensure that two status messages were produced by Samza and consumed in the proper order (first "pending", then "done", per the order of the MessageStream call chains). This test flaps pretty routinely since upgrading to Samza 1.0; we never noticed this in the past. Sometimes, it times out waiting for any messages, though that's considerably less rare than the ordering issue. My understanding is, for a given Task, by default, all processing should be done serially. Is that no longer true? Is the guarantee *only* for the order in which messages are consumed, not produced? For test simplicity, there's a single Kafka partition for each topic and I attempted to create a configuration file that would eliminate as much coordination and concurrency sources as I knew how: processor.id=0 job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory job.container.single.thread.mode=true (We use the ZkJobCoordinatorFactory normally but both produce the bug) I realize the KafkaProducer does not *technically* guarantee delivery order except when using transactions, which KafkaSystemProducer doesn't appear to do by default. I have checked the actual message envelope and when the ordering is wrong, the offset order is correct -- so, "done" was recorded by Kafka prior to "pending". This seems to rule out Samza but I'm not entirely confident in that conclusion. Any thoughts? Thanks, Tom