Hi Zhijiang, do you already have working code or a design doc for the second approach?
Best, Aljoscha > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) > <wangzhijiang...@aliyun.com.INVALID> wrote: > > Just noticed this discussion from @Till Rohrmann's weekly community update > and I want to share some thoughts from our experiences. > > We also encountered the source consuption skew issue before, and we are > focused on improving this by two possible ways. > > 1. Control the read strategy by the downstream side. In detail, every input > channel in downstream task corresponds to the consumption of one upstream > source task, and we will tag each input channel with watermark to find the > lowest channel to read in high priority. In essence, we actually rely on the > mechanism of backpressure. If the channel with highest timestamp is not read > by downstream task for a while, it will block the corresponding source task > to read when the buffers are exhausted. It is no need to change the source > interface in this way, but there are two major concerns: first it will affect > the barier alignment resulting in checkpoint delayed or expired. Second it > can not confirm source consumption alignment very precisely, and it is just a > best effort way. So we gave up this way finally. > > 2. Add the new component of SourceCoordinator to coordinate the source > consumption distributedly. For example we can start this componnet in the > JobManager like the current role of CheckpointCoordinator. Then every source > task would commnicate with JobManager via current RPC mechanism, maybe we can > rely on the heartbeat message to attach the consumption progress as the > payloads. The JobManagerwill accumulator or state all the reported progress > and then give responses for different source tasks. We can define a protocol > for indicating the fast soruce task to sleep for specific time for example. > To do so, the coordinator has the global informations to give the proper > decision for individuals, so it seems more precise. And it will not affect > the barrier alignment, because the sleeping fast source can release the lock > to emit barrier as normal. The only concern is the changes for source > interface and may affect all related source implementations. > > Currently we prefer to the second way to implement and will refer to other > good points above. :) > > Best, > Zhijiang > ------------------------------------------------------------------ > 发件人:Jamie Grier <jgr...@lyft.com.INVALID> > 发送时间:2018年10月17日(星期三) 03:28 > 收件人:dev <dev@flink.apache.org> > 主 题:Re: Sharing state between subtasks > > Here's a doc I started describing some changes we would like to make > starting with the Kinesis Source.. It describes a refactoring of that code > specifically and also hopefully a pattern and some reusable code we can use > in the other sources as well. The end goal would be best-effort event-time > synchronization across all Flink sources but we are going to start with the > Kinesis Source first. > > Please take a look and please provide thoughts and opinions about the best > state sharing mechanism to use -- that section is left blank and we're > especially looking for input there. > > https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing > > -Jamie > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> But on the Kafka source level it should be perfectly fine to do what Elias >> proposed. This is of course is not the perfect solution but could bring us >> forward quite a bit. The changes required for this should also be minimal. >> This would become obsolete once we have something like shared state. But >> until then, I think it would worth a try. >> >> Cheers, >> Till >> >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> The reason this selective reading doesn't work well in Flink in the >> moment >>> is because of checkpointing. For checkpointing, checkpoint barriers >> travel >>> within the streams. If we selectively read from inputs based on >> timestamps >>> this is akin to blocking an input if that input is very far ahead in >> event >>> time, which can happen when you have a very fast source and a slow source >>> (in event time), maybe because you're in a catchup phase. In those cases >>> it's better to simply not read the data at the sources, as Thomas said. >>> This is also because with Kafka Streams, each operator is basically its >> own >>> job: it's reading from Kafka and writing to Kafka and there is not a >>> complex graph of different operations with network shuffles in between, >> as >>> you have with Flink. >>> >>> This different nature of Flink is also why I think that readers need >>> awareness of other readers to do the event-time alignment, and this is >>> where shared state comes in. >>> >>>> On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com> >>> wrote: >>>> >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com> >> wrote: >>>> >>>>> I think the new source interface would be designed to be able to >>> leverage >>>>> shared state to achieve time alignment. >>>>> I don't think this would be possible without some kind of shared >> state. >>>>> >>>>> The problem of tasks that are far ahead in time cannot be solved with >>>>> back-pressure. >>>>> That's because a task cannot choose from which source task it accepts >>>>> events and from which doesn't. >>>>> If it blocks an input, all downstream tasks that are connected to the >>>>> operator are affected. This can easily lead to deadlocks. >>>>> Therefore, all operators need to be able to handle events when they >>> arrive. >>>>> If they cannot process them yet because they are too far ahead in >> time, >>>>> they are put in state. >>>>> >>>> >>>> The idea I was suggesting is not for operators to block an input. >>> Rather, >>>> it is that they selectively choose from which input to process the next >>>> message from based on their timestamp, so long as there are buffered >>>> messages waiting to be processed. That is a best-effort alignment >>>> strategy. Seems to work relatively well in practice, at least within >>> Kafka >>>> Streams. >>>> >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for >> both >>>> its inputs. Instead, it could keep them separate and selectively >> consume >>>> from the one that had a buffer available, and if both have buffers >>>> available, from the buffer with the messages with a lower timestamp. >>> >>> >> >