Any advance related to synchronizing ingestion by event/ingestion-time between kafka partitions?
On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier <jgr...@lyft.com.invalid> wrote: > Hey all, > > I think all we need for this on the state sharing side is pretty simple. I > opened a JIRA to track this work and submitted a PR for the state sharing > bit. > > https://issues.apache.org/jira/browse/FLINK-10886 > https://github.com/apache/flink/pull/7099 > > Please provide feedback :) > > -Jamie > > > On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <trohrm...@apache.org> wrote: > > > Hi Thomas, > > > > using Akka directly would further manifest our dependency on Scala in > > flink-runtime. This is something we are currently trying to get rid of. > For > > that purpose we have added the RpcService abstraction which encapsulates > > all Akka specific logic. We hope that we can soon get rid of the Scala > > dependency in flink-runtime by using a special class loader only for > > loading the AkkaRpcService implementation. > > > > I think the easiest way to sync the task information is actually going > > through the JobMaster because the subtasks don't know on which other TMs > > the other subtasks run. Otherwise, we would need to have some TM > detection > > mechanism between TMs. If you choose this way, then you should be able to > > use the RpcService by extending the JobMasterGateway by additional RPCs. > > > > Cheers, > > Till > > > > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <t...@apache.org> wrote: > > > > > Hi, > > > > > > We are planning to work on the Kinesis consumer in the following order: > > > > > > 1. Add per shard watermarking: > > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code > we > > > already use internally; I will open a PR to add it to the Flink Kinesis > > > consumer > > > 2. Exchange of per subtask watermarks between all subtasks of one or > > > multiple sources > > > 3. Implement queue approach described in Jamie's document in to utilize > > 1.) > > > and 2.) to align the shard consumers WRT event time > > > > > > There was some discussion regarding the mechanism to share the > watermarks > > > between subtasks. If there is something that can be re-used it would be > > > great. Otherwise I'm going to further investigate the Akka or JGroups > > > route. Regarding Akka, since it is used within Flink already, is there > an > > > abstraction that you would recommend to consider to avoid direct > > > dependency? > > > > > > Thanks, > > > Thomas > > > > > > > > > > > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999) > > > <wangzhijiang...@aliyun.com.invalid> wrote: > > > > > > > Not yet. We only have some initial thoughts and have not worked on it > > > yet. > > > > We will update the progress in this discussion if have. > > > > > > > > Best, > > > > Zhijiang > > > > ------------------------------------------------------------------ > > > > 发件人:Aljoscha Krettek <aljos...@apache.org> > > > > 发送时间:2018年10月18日(星期四) 17:53 > > > > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) < > > > > wangzhijiang...@aliyun.com> > > > > 抄 送:Till Rohrmann <trohrm...@apache.org> > > > > 主 题:Re: Sharing state between subtasks > > > > > > > > Hi Zhijiang, > > > > > > > > do you already have working code or a design doc for the second > > approach? > > > > > > > > Best, > > > > Aljoscha > > > > > > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) < > > > > wangzhijiang...@aliyun.com.INVALID> wrote: > > > > > > > > > > Just noticed this discussion from @Till Rohrmann's weekly community > > > > update and I want to share some thoughts from our experiences. > > > > > > > > > > We also encountered the source consuption skew issue before, and we > > are > > > > focused on improving this by two possible ways. > > > > > > > > > > 1. Control the read strategy by the downstream side. In detail, > every > > > > input channel in downstream task corresponds to the consumption of > one > > > > upstream source task, and we will tag each input channel with > watermark > > > to > > > > find the lowest channel to read in high priority. In essence, we > > actually > > > > rely on the mechanism of backpressure. If the channel with highest > > > > timestamp is not read by downstream task for a while, it will block > the > > > > corresponding source task to read when the buffers are exhausted. It > is > > > no > > > > need to change the source interface in this way, but there are two > > major > > > > concerns: first it will affect the barier alignment resulting in > > > checkpoint > > > > delayed or expired. Second it can not confirm source consumption > > > alignment > > > > very precisely, and it is just a best effort way. So we gave up this > > way > > > > finally. > > > > > > > > > > 2. Add the new component of SourceCoordinator to coordinate the > > source > > > > consumption distributedly. For example we can start this componnet in > > the > > > > JobManager like the current role of CheckpointCoordinator. Then every > > > > source task would commnicate with JobManager via current RPC > mechanism, > > > > maybe we can rely on the heartbeat message to attach the consumption > > > > progress as the payloads. The JobManagerwill accumulator or state all > > the > > > > reported progress and then give responses for different source tasks. > > We > > > > can define a protocol for indicating the fast soruce task to sleep > for > > > > specific time for example. To do so, the coordinator has the global > > > > informations to give the proper decision for individuals, so it seems > > > more > > > > precise. And it will not affect the barrier alignment, because the > > > sleeping > > > > fast source can release the lock to emit barrier as normal. The only > > > > concern is the changes for source interface and may affect all > related > > > > source implementations. > > > > > > > > > > Currently we prefer to the second way to implement and will refer > to > > > > other good points above. :) > > > > > > > > > > Best, > > > > > Zhijiang > > > > > ------------------------------------------------------------------ > > > > > 发件人:Jamie Grier <jgr...@lyft.com.INVALID> > > > > > 发送时间:2018年10月17日(星期三) 03:28 > > > > > 收件人:dev <dev@flink.apache.org> > > > > > 主 题:Re: Sharing state between subtasks > > > > > > > > > > Here's a doc I started describing some changes we would like to > make > > > > > starting with the Kinesis Source.. It describes a refactoring of > that > > > > code > > > > > specifically and also hopefully a pattern and some reusable code we > > can > > > > use > > > > > in the other sources as well. The end goal would be best-effort > > > > event-time > > > > > synchronization across all Flink sources but we are going to start > > with > > > > the > > > > > Kinesis Source first. > > > > > > > > > > Please take a look and please provide thoughts and opinions about > the > > > > best > > > > > state sharing mechanism to use -- that section is left blank and > > we're > > > > > especially looking for input there. > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing > > > > > > > > > > -Jamie > > > > > > > > > > > > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann < > trohrm...@apache.org > > > > > > > wrote: > > > > > > > > > >> But on the Kafka source level it should be perfectly fine to do > what > > > > Elias > > > > >> proposed. This is of course is not the perfect solution but could > > > bring > > > > us > > > > >> forward quite a bit. The changes required for this should also be > > > > minimal. > > > > >> This would become obsolete once we have something like shared > state. > > > But > > > > >> until then, I think it would worth a try. > > > > >> > > > > >> Cheers, > > > > >> Till > > > > >> > > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek < > > aljos...@apache.org > > > > > > > > >> wrote: > > > > >> > > > > >>> The reason this selective reading doesn't work well in Flink in > the > > > > >> moment > > > > >>> is because of checkpointing. For checkpointing, checkpoint > barriers > > > > >> travel > > > > >>> within the streams. If we selectively read from inputs based on > > > > >> timestamps > > > > >>> this is akin to blocking an input if that input is very far ahead > > in > > > > >> event > > > > >>> time, which can happen when you have a very fast source and a > slow > > > > source > > > > >>> (in event time), maybe because you're in a catchup phase. In > those > > > > cases > > > > >>> it's better to simply not read the data at the sources, as Thomas > > > said. > > > > >>> This is also because with Kafka Streams, each operator is > basically > > > its > > > > >> own > > > > >>> job: it's reading from Kafka and writing to Kafka and there is > not > > a > > > > >>> complex graph of different operations with network shuffles in > > > between, > > > > >> as > > > > >>> you have with Flink. > > > > >>> > > > > >>> This different nature of Flink is also why I think that readers > > need > > > > >>> awareness of other readers to do the event-time alignment, and > this > > > is > > > > >>> where shared state comes in. > > > > >>> > > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy < > > fearsome.lucid...@gmail.com> > > > > >>> wrote: > > > > >>>> > > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske < > fhue...@gmail.com> > > > > >> wrote: > > > > >>>> > > > > >>>>> I think the new source interface would be designed to be able > to > > > > >>> leverage > > > > >>>>> shared state to achieve time alignment. > > > > >>>>> I don't think this would be possible without some kind of > shared > > > > >> state. > > > > >>>>> > > > > >>>>> The problem of tasks that are far ahead in time cannot be > solved > > > with > > > > >>>>> back-pressure. > > > > >>>>> That's because a task cannot choose from which source task it > > > accepts > > > > >>>>> events and from which doesn't. > > > > >>>>> If it blocks an input, all downstream tasks that are connected > to > > > the > > > > >>>>> operator are affected. This can easily lead to deadlocks. > > > > >>>>> Therefore, all operators need to be able to handle events when > > they > > > > >>> arrive. > > > > >>>>> If they cannot process them yet because they are too far ahead > in > > > > >> time, > > > > >>>>> they are put in state. > > > > >>>>> > > > > >>>> > > > > >>>> The idea I was suggesting is not for operators to block an > input. > > > > >>> Rather, > > > > >>>> it is that they selectively choose from which input to process > the > > > > next > > > > >>>> message from based on their timestamp, so long as there are > > buffered > > > > >>>> messages waiting to be processed. That is a best-effort > alignment > > > > >>>> strategy. Seems to work relatively well in practice, at least > > > within > > > > >>> Kafka > > > > >>>> Streams. > > > > >>>> > > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate > > for > > > > >> both > > > > >>>> its inputs. Instead, it could keep them separate and > selectively > > > > >> consume > > > > >>>> from the one that had a buffer available, and if both have > buffers > > > > >>>> available, from the buffer with the messages with a lower > > timestamp. > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > > > > >