Thanks for the detailed response Cody. Our use case is to  do some external
lookups (cached and all) for every event, match the event against the
looked up data, decide whether to write an entry in mysql and write it in
the order in which the events arrived within a kafka partition.

We don't need global ordering. Message ordering within a batch can be
achieved either by waiting for 1.3 to be released (the behavior you
described works very well for us, within a batch) , or by using
 updateStateByKey and sorting.   speculative execution is turned off as
well (I think its off by default).

But, from what I see from the JobScheduler/JobGenerator is this. Within
each stream, jobs are generated every 'n' milliseconds (batch duration),
and submitted for execution. Since job generation in a stream is temporal,
its guaranteed that the jobs are submitted in the order of event arrival
within a stream. And since we have one stream per kafka partition, this
translates to sequentially generated batches & sequentially scheduled
batches within a kafka partition. But since the *execution* of jobs itself
is in parallel, its probable that back-to-back batches in a stream are
submitted one after the other , but are executing concurrently. If this
understanding of mine is correct, it breaks our requirement that messages
be executed in order within a partition.

Thanks!






On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger <c...@koeninger.org> wrote:

> For a given batch, for a given partition, the messages will be processed
> in order by the executor that is running that partition.  That's because
> messages for the given offset range are pulled by the executor, not pushed
> from some other receiver.
>
> If you have speculative execution, yes, another executor may be running
> that partition.
>
> If your job is lagging behind in processing such that the next batch
> starts executing before the last batch is finished processing, yes it is
> possible for some other executor to start working on messages from that
> same kafka partition.
>
> The obvious solution here seems to be turn off speculative execution and
> adjust your batch interval / sizes such that they can comfortably finish
> processing :)
>
> If your processing time is sufficiently non-linear with regard to the
> number of messages, yes you might be able to do something with overriding
> dstream.compute.  Unfortunately the new kafka dstream implementation is
> private, so it's not straightforward to subclass it.  I'd like to get a
> solution in place for people who need to be able to tune the batch
> generation policy (I need to as well, for unrelated reasons).  Maybe you
> can say a little more about your use case.
>
> But regardless of the technology you're using to read from kafka (spark,
> storm, whatever), kafka only gives you ordering as to a particular
> partition.  So you're going to need to do some kind of downstream sorting
> if you really care about a global order.
>
> On Fri, Feb 20, 2015 at 1:43 AM, Neelesh <neele...@gmail.com> wrote:
>
>> Even with the new direct streams in 1.3,  isn't it the case that the job
>> *scheduling* follows the partition order, rather than job *execution*?
>> Or is it the case that the stream listens to job completion event (using a
>> streamlistener) before scheduling the next batch?  To compare with storm
>> from a message ordering point of view, unless a tuple is fully processed by
>> the DAG (as defined by spout+bolts), the next tuple does not enter the DAG.
>>
>>
>> On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Kafka ordering is guaranteed on a per-partition basis.
>>>
>>> The high-level consumer api as used by the spark kafka streams prior to
>>> 1.3 will consume from multiple kafka partitions, thus not giving any
>>> ordering guarantees.
>>>
>>> The experimental direct stream in 1.3 uses the "simple" consumer api,
>>> and there is a 1:1 correspondence between spark partitions and kafka
>>> partitions.  So you will get deterministic ordering, but only on a
>>> per-partition basis.
>>>
>>> On Thu, Feb 19, 2015 at 11:31 PM, Neelesh <neele...@gmail.com> wrote:
>>>
>>>> I had a chance to talk to TD today at the Strata+Hadoop Conf in San
>>>> Jose. We talked a bit about this after his presentation about this - the
>>>> short answer is spark streaming does not guarantee any sort of ordering
>>>> (within batches, across batches).  One would have to use updateStateByKey
>>>> to collect the events and sort them based on some attribute of the event.
>>>> But TD said message ordering is a frequently asked feature recently and is
>>>> getting on his radar.
>>>>
>>>> I went through the source code and there does not seem to be any
>>>> architectural/design limitation to support this.  (JobScheduler,
>>>> JobGenerator are a good starting point to see how stuff works under the
>>>> hood).  Overriding DStream#compute and using streaminglistener looks like a
>>>> simple way of ensuring ordered execution of batches within a stream. But
>>>> this would be a partial solution, since ordering within a batch needs some
>>>> more work that I don't understand fully yet.
>>>>
>>>> Side note :  My custom receiver polls the metricsservlet once in a
>>>> while to decide whether jobs are getting done fast enough and
>>>> throttle/relax pushing data in to receivers based on the numbers provided
>>>> by metricsservlet. I had to do this because out-of-the-box rate limiting
>>>> right now is static and cannot adapt to the state of the cluster
>>>>
>>>> thnx
>>>> -neelesh
>>>>
>>>> On Wed, Feb 18, 2015 at 4:13 PM, jay vyas <jayunit100.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> This is a *fantastic* question.  The idea of how we identify
>>>>> individual things in multiple  DStreams is worth looking at.
>>>>>
>>>>> The reason being, that you can then fine tune your streaming job,
>>>>> based on the RDD identifiers (i.e. are the timestamps from the producer
>>>>> correlating closely to the order in which RDD elements are being produced)
>>>>> ?  If *NO* then you need to (1) dial up throughput on producer sources or
>>>>> else (2) increase cluster size so that spark is capable of evenly handling
>>>>> load.
>>>>>
>>>>> You cant decide to do (1) or (2) unless you can track  when the
>>>>> streaming elements are being  converted to RDDs by spark itself.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 18, 2015 at 6:54 PM, Neelesh <neele...@gmail.com> wrote:
>>>>>
>>>>>> There does not seem to be a definitive answer on this. Every time I
>>>>>> google for message ordering,the only relevant thing that comes up is this
>>>>>>  -
>>>>>> http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
>>>>>> .
>>>>>>
>>>>>> With a kafka receiver that pulls data from a single kafka partition
>>>>>> of a kafka topic, are individual messages in the microbatch in same the
>>>>>> order as kafka partition? Are successive microbatches originating from a
>>>>>> kafka partition executed in order?
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> jay vyas
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to