Hi John Thanks for your message. Indeed there's a potential feature here. Should I log it on Jira ?
Mathieu Le ven. 20 nov. 2020 à 21:53, John Roesler <vvcep...@apache.org> a écrit : > Hi Mathieu, > > Ah, that is unfortunate. I believe your analysis is correct. In general, > we have no good solution to the problem of upstream tasks moving ahead of > each other and causing disorder in the repartition topics. Guozhang has > done a substantial amount of thinking on this subject, though, and has some > ideas for how we can improve the behavior. > > However, your situation is a special case, since the data actually doesn’t > need to be shuffled. Ideally, there would be a way to say “I assert that > the partitioning doesn’t need to change despite this key change”. That > actually seems like a good feature for the repartition operator. > > In the absence of that feature, I think you’re on the right track. If you > specify a partitioner that produces exactly the same partitioning as the > source, you _should_ be able to avoid any shuffling, although there will > still be a repartition topic there. > > I hope this helps, > John > > On Fri, Nov 20, 2020, at 04:14, Mathieu D wrote: > > Hello there, > > > > We're processing IOT device data, each device sending several metrics. > > > > When we upgrade our streams app, we set a brand new 'application.id' to > > reprocess a bit of past data, to warm up the state stores and > aggregations > > to make sure all outputs will be valid. Downstream is designed for "at > > least once" so no problem with this bit of reprocessing. > > > > When this restart+reprocessing occurs, we observe a peak of "Skipping > > record for expired window" / "Skipping record for expired segment" > warning > > in logs. > > > > My understanding so far is this: > > - a part of our topology is keyed by deviceId. > > - during the reprocessing, some tasks are moving faster for some > > partitions, which means there's a substantial difference between the > > various stream-times across tasks > > - at some point in the topology, we re-key the data by (deviceId, metric) > > for "group by metric" aggregations > > - this shuffles the data: deviceId1 was in partition 1 with eventTime1, > > deviceId2 was in partition 2 with eventTime2, and now by the magic of > > hashing a (device,metric) key, they are pushed together in the same > > partitionX. If eventTime2 is far ahead of eventTime1, then all windows > > will expire at once. > > > > Is this analysis correct ? > > > > Then, what's the proper way to avoid this ? Manually do a .repartition() > > with a custom partitioner after each .selectKey((device, metric)), and > > before going through aggregations ? > > > > Any other advice ? > > > > Thanks for your insights > > > > Mathieu > > >