I'll add to what Thomas already said.. The larger issue driving this is that when reading from a source with many parallel partitions, especially when reading lots of historical data (or recovering from downtime and there is a backlog to read), it's quite common for there to develop an event-time skew across those partitions.
When doing event-time windowing -- or in fact any event-time driven processing -- the event time skew across partitions results directly in increased buffering in Flink and of course the corresponding state/checkpoint size growth. As the event-time skew and state size grows larger this can have a major effect on application performance and in some cases result in a "death spiral" where the application performance get's worse and worse as the state size grows and grows. So, one solution to this problem, outside of core changes in Flink itself, seems to be to try to coordinate sources across partitions so that they make progress through event time at roughly the same rate. In fact if there is large skew the idea would be to slow or even stop reading from some partitions with newer data while first reading the partitions with older data. Anyway, to do this we need to share state somehow amongst sub-tasks. The common sense view of this is the following: Why would we want to pull data from a perfectly good buffer (like a filesystem, Kinesis, or Kafka) into Flink state just to manage and checkpoint it while waiting to be able to complete event time computations. The completion of computations is held up by the partitions with the oldest data so it's of no value to read the newer data until you've read the old. It seems much better to leave the newer data buffered upstream. I'd be very curious to hear others' thoughts on this.. I would expect many people to have run into similar issues. I also wonder if anybody has already been working on similar issues. It seems there is room for some core Flink changes to address this as well and I'm guessing people have already thought about it. -Jamie On Sun, Oct 7, 2018 at 12:58 PM Thomas Weise <t...@apache.org> wrote: > I'm looking to implement a state sharing mechanism between subtasks (of one > or multiple tasks). Our use case is to align watermarks between subtasks of > one or multiple sources to prevent some data fetchers to race ahead of > others and cause massive state buffering in Flink. > > Each subtask would share a small state (probably just a key and couple > longs). The state would be updated periodically (perhaps every 30s). Other > subtasks should see these changes with similar latency. It is essentially a > hash table to which every node contributes a distinct key. > > An initial idea was to implement this using ZooKeeper ephemeral nodes. But > since there is no way to read all child nodes in one sweep, state access > becomes very chatty. With lets's say 512 subtasks we would end up with 512 > * 512 reads per interval (1 to list children, N-1 to fetch data, per > subtask). > > My next stop will be a group communication mechanism like JGroups or Akka > (following looks like a potential good fit: > https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java). > But > before that I wanted to check if others already had a similar need and > possibly experience/implementation to share? > > There are probably more use cases related to discovery etc. Perhaps Flink > could provide a state primitive, if there is broader interest in the > community? > > Thanks, > Thomas >