if you don't have hot users, you can use the user id as the hash key for publishing into kafka. That will put all events for a given user in the same partition per batch. Then you can do foreachPartition with a local map to store just a single event per user, e.g.
foreachPartition { p => val m = new HashMap p.foreach ( event => m.put(event,user, event) } m.foreach { ... do your computation } On Wed, Jan 6, 2016 at 10:09 AM, Julien Naour <julna...@gmail.com> wrote: > Thanks for your answer, > > As I understand it, a consumer that stays caught-up will read every > message even with compaction. So for a pure Kafka Spark Streaming It will > not be a solution. > > Perhaps I could reconnect to the Kafka topic after each process to get the > last state of events and then compare to current Kafka Spark Streaming > events, but it seems a little tricky. For each event it will connect to > Kafka and get the current state by key (possibly lot of data) and then > compare to the current event. Latency could be an issue then. > > To be more specific with my issue: > > My events have specific keys corresponding to some kind of user id. I want > to process last events by each user id once ie skip intermediate events by > user id. > I have only one Kafka topic with all theses events. > > Regards, > > Julien Naour > > Le mer. 6 janv. 2016 à 16:13, Cody Koeninger <c...@koeninger.org> a > écrit : > >> Have you read >> >> http://kafka.apache.org/documentation.html#compaction >> >> >> >> On Wed, Jan 6, 2016 at 8:52 AM, Julien Naour <julna...@gmail.com> wrote: >> >>> Context: Process data coming from Kafka and send back results to Kafka. >>> >>> Issue: Each events could take several seconds to process (Work in >>> progress to improve that). During that time, events (and RDD) do >>> accumulate. Intermediate events (by key) do not have to be processed, only >>> the last ones. So when one process finished it would be ideal that Spark >>> Streaming skip all events that are not the current last ones (by key). >>> >>> I'm not sure that the solution could be done using only Spark Streaming >>> API. As I understand Spark Streaming, DStream RDD will accumulate and be >>> processed one by one and do not considerate if there are others afterwards. >>> >>> Possible solutions: >>> >>> Using only Spark Streaming API but I'm not sure how. updateStateByKey >>> seems to be a solution. But I'm not sure that it will work properly when >>> DStream RDD accumulate and you have to only process lasts events by key. >>> >>> Have two Spark Streaming pipelines. One to get last updated event by >>> key, store that in a map or a database. The second pipeline processes >>> events only if they are the last ones as indicate by the other pipeline. >>> >>> >>> Sub questions for the second solution: >>> >>> Could two pipelines share the same sparkStreamingContext and process the >>> same DStream at different speed (low processing vs high)? >>> >>> Is it easily possible to share values (map for example) between >>> pipelines without using an external database? I think accumulator/broadcast >>> could work but between two pipelines I'm not sure. >>> >>> Regards, >>> >>> Julien Naour >>> >> >>