Prateek Maheshwari <prateek...@gmail.com> writes:

Hi Tom,

I'm assuming that the two sub-DAGs you're talking about are the two Map ->
Send To chains acting on the "audit-report-requests" input and sending
their results to the "audit-report-status" output.


Yes, that's correct.


Although processing within each Task is in-order, the framework does not
guarantee the order in which the multiple chained operators for an operator
are evaluated. Specifically, in the current implementation
<https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java#L106>,
an Operator's registeredOperators are maintained as a HashSet of
OperatorImpls. This would explain the out-of-order appearance of the two
messages. I'm not sure what's changed in 1.0 that makes this trigger now.


Ah! I thought this was the case but I couldn't find the part of the code
to prove it. This makes far more sense than Kafka routinely not
committing messages in order (though it is still technically a
possibility).

Upon further investigation, I'm not convinced it's a 1.0 issue; I think
we just started using multiple chained operators more heavily.


Since both sendTo and sink are terminal operators (void return type), I
don't think you'll be able to easily get around this. Let me discuss this
with the team and get back to you with a workaround / fix.


Thanks a lot! <3


Thanks,
Prateek


On Tue, Feb 26, 2019 at 7:08 PM Tom Davis <t...@recursivedream.com> wrote:

Hey folks!

We have noticed some inconsistencies in message ordering when running a
StreamApplication that calls two separate `map` functions over an input
and sends results to the same output. I have attached my Execution Plan,
but the gist is that the first `map` function marks a thing as "pending"
by sending a message to a status topic and the second `map` function
does some work then sends its own status with "done".

We have a test set up to read the resulting status topic with a normal
Kafka consumer to ensure that two status messages were produced by Samza
and consumed in the proper order (first "pending", then "done", per the
order of the MessageStream call chains). This test flaps pretty
routinely since upgrading to Samza 1.0; we never noticed this in the
past. Sometimes, it times out waiting for any messages, though that's
considerably less rare than the ordering issue. My understanding is, for
a given Task, by default, all processing should be done serially. Is
that no longer true? Is the guarantee *only* for the order in which
messages are consumed, not produced?

For test simplicity, there's a single Kafka partition for each topic and
I attempted to create a configuration file that would eliminate as much
coordination and concurrency sources as I knew how:

  processor.id=0

job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
  job.container.single.thread.mode=true

(We use the ZkJobCoordinatorFactory normally but both produce the bug)

I realize the KafkaProducer does not *technically* guarantee delivery
order except when using transactions, which KafkaSystemProducer doesn't
appear to do by default. I have checked the actual message envelope and
when the ordering is wrong, the offset order is correct -- so, "done"
was recorded by Kafka prior to "pending". This seems to rule out Samza
but I'm not entirely confident in that conclusion. Any thoughts?

Thanks,

Tom

Reply via email to