if you don't have hot users, you can use the user id as the hash key for
publishing into kafka.
That will put all events for a given user in the same partition per batch.
Then you can do foreachPartition with a local map to store just a single
event per user, e.g.

foreachPartition { p =>
  val m = new HashMap
  p.foreach ( event =>
    m.put(event,user, event)
  }
  m.foreach {
   ... do your computation
 }

On Wed, Jan 6, 2016 at 10:09 AM, Julien Naour <julna...@gmail.com> wrote:

> Thanks for your answer,
>
> As I understand it, a consumer that stays caught-up will read every
> message even with compaction. So for a pure Kafka Spark Streaming It will
> not be a solution.
>
> Perhaps I could reconnect to the Kafka topic after each process to get the
> last state of events and then compare to current Kafka Spark Streaming
> events, but it seems a little tricky. For each event it will connect to
> Kafka and get the current state by key (possibly lot of data) and then
> compare to the current event. Latency could be an issue then.
>
> To be more specific with my issue:
>
> My events have specific keys corresponding to some kind of user id. I want
> to process last events by each user id once ie skip intermediate events by
> user id.
> I have only one Kafka topic with all theses events.
>
> Regards,
>
> Julien Naour
>
> Le mer. 6 janv. 2016 à 16:13, Cody Koeninger <c...@koeninger.org> a
> écrit :
>
>> Have you read
>>
>> http://kafka.apache.org/documentation.html#compaction
>>
>>
>>
>> On Wed, Jan 6, 2016 at 8:52 AM, Julien Naour <julna...@gmail.com> wrote:
>>
>>> Context: Process data coming from Kafka and send back results to Kafka.
>>>
>>> Issue: Each events could take several seconds to process (Work in
>>> progress to improve that). During that time, events (and RDD) do
>>> accumulate. Intermediate events (by key) do not have to be processed, only
>>> the last ones. So when one process finished it would be ideal that Spark
>>> Streaming skip all events that are not the current last ones (by key).
>>>
>>> I'm not sure that the solution could be done using only Spark Streaming
>>> API. As I understand Spark Streaming, DStream RDD will accumulate and be
>>> processed one by one and do not considerate if there are others afterwards.
>>>
>>> Possible solutions:
>>>
>>> Using only Spark Streaming API but I'm not sure how. updateStateByKey
>>> seems to be a solution. But I'm not sure that it will work properly when
>>> DStream RDD accumulate and you have to only process lasts events by key.
>>>
>>> Have two Spark Streaming pipelines. One to get last updated event by
>>> key, store that in a map or a database. The second pipeline processes
>>> events only if they are the last ones as indicate by the other pipeline.
>>>
>>>
>>> Sub questions for the second solution:
>>>
>>> Could two pipelines share the same sparkStreamingContext and process the
>>> same DStream at different speed (low processing vs high)?
>>>
>>> Is it easily possible to share values (map for example) between
>>> pipelines without using an external database? I think accumulator/broadcast
>>> could work but between two pipelines I'm not sure.
>>>
>>> Regards,
>>>
>>> Julien Naour
>>>
>>
>>

Reply via email to