Hi John

Thanks for your message.
Indeed there's a potential feature here. Should I log it on Jira ?

Mathieu

Le ven. 20 nov. 2020 à 21:53, John Roesler <vvcep...@apache.org> a écrit :

> Hi Mathieu,
>
> Ah, that is unfortunate. I believe your analysis is correct. In general,
> we have no good solution to the problem of upstream tasks moving ahead of
> each other and causing disorder in the repartition topics. Guozhang has
> done a substantial amount of thinking on this subject, though, and has some
> ideas for how we can improve the behavior.
>
> However, your situation is a special case, since the data actually doesn’t
> need to be shuffled. Ideally, there would be a way to say “I assert that
> the partitioning doesn’t need to change despite this key change”. That
> actually seems like a good feature for the repartition operator.
>
> In the absence of that feature, I think you’re on the right track. If you
> specify a partitioner that produces exactly the same partitioning as the
> source, you _should_ be able to avoid any shuffling, although there will
> still be a repartition topic there.
>
> I hope this helps,
> John
>
> On Fri, Nov 20, 2020, at 04:14, Mathieu D wrote:
> > Hello there,
> >
> > We're processing IOT device data, each device sending several metrics.
> >
> > When we upgrade our streams app, we set a brand new 'application.id' to
> > reprocess a bit of past data, to warm up the state stores and
> aggregations
> > to make sure all outputs will be valid. Downstream is designed for "at
> > least once" so no problem with this bit of reprocessing.
> >
> > When this restart+reprocessing occurs, we observe a peak of "Skipping
> > record for expired window" / "Skipping record for expired segment"
> warning
> > in logs.
> >
> > My understanding so far is this:
> > - a part of our topology is keyed by deviceId.
> > - during the reprocessing, some tasks are moving faster for some
> > partitions, which means there's a substantial difference between the
> > various stream-times across tasks
> > - at some point in the topology, we re-key the data by (deviceId, metric)
> > for "group by metric" aggregations
> > - this shuffles the data:  deviceId1 was in partition 1 with eventTime1,
> > deviceId2 was in partition 2 with eventTime2, and now by the magic of
> > hashing a (device,metric) key, they are pushed together in the same
> > partitionX. If eventTime2 is far ahead of eventTime1, then all windows
> > will  expire at once.
> >
> > Is this analysis correct ?
> >
> > Then, what's the proper way to avoid this ? Manually do a .repartition()
> > with a custom partitioner after each .selectKey((device, metric)), and
> > before going through aggregations ?
> >
> > Any other advice ?
> >
> > Thanks for your insights
> >
> > Mathieu
> >
>

Reply via email to