Any advance related to synchronizing ingestion by event/ingestion-time
between kafka partitions?

On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier <jgr...@lyft.com.invalid> wrote:

> Hey all,
>
> I think all we need for this on the state sharing side is pretty simple.  I
> opened a JIRA to track this work and submitted a PR for the state sharing
> bit.
>
> https://issues.apache.org/jira/browse/FLINK-10886
> https://github.com/apache/flink/pull/7099
>
> Please provide feedback :)
>
> -Jamie
>
>
> On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
> > Hi Thomas,
> >
> > using Akka directly would further manifest our dependency on Scala in
> > flink-runtime. This is something we are currently trying to get rid of.
> For
> > that purpose we have added the RpcService abstraction which encapsulates
> > all Akka specific logic. We hope that we can soon get rid of the Scala
> > dependency in flink-runtime by using a special class loader only for
> > loading the AkkaRpcService implementation.
> >
> > I think the easiest way to sync the task information is actually going
> > through the JobMaster because the subtasks don't know on which other TMs
> > the other subtasks run. Otherwise, we would need to have some TM
> detection
> > mechanism between TMs. If you choose this way, then you should be able to
> > use the RpcService by extending the JobMasterGateway by additional RPCs.
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > We are planning to work on the Kinesis consumer in the following order:
> > >
> > > 1. Add per shard watermarking:
> > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> we
> > > already use internally; I will open a PR to add it to the Flink Kinesis
> > > consumer
> > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > multiple sources
> > > 3. Implement queue approach described in Jamie's document in to utilize
> > 1.)
> > > and 2.) to align the shard consumers WRT event time
> > >
> > > There was some discussion regarding the mechanism to share the
> watermarks
> > > between subtasks. If there is something that can be re-used it would be
> > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > route. Regarding Akka, since it is used within Flink already, is there
> an
> > > abstraction that you would recommend to consider to avoid direct
> > > dependency?
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > >
> > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > > <wangzhijiang...@aliyun.com.invalid> wrote:
> > >
> > > > Not yet. We only have some initial thoughts and have not worked on it
> > > yet.
> > > > We will update the progress in this discussion if have.
> > > >
> > > > Best,
> > > > Zhijiang
> > > > ------------------------------------------------------------------
> > > > 发件人:Aljoscha Krettek <aljos...@apache.org>
> > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > > > wangzhijiang...@aliyun.com>
> > > > 抄 送:Till Rohrmann <trohrm...@apache.org>
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Hi Zhijiang,
> > > >
> > > > do you already have working code or a design doc for the second
> > approach?
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > > wangzhijiang...@aliyun.com.INVALID> wrote:
> > > > >
> > > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > > update and I want to share some thoughts from our experiences.
> > > > >
> > > > > We also encountered the source consuption skew issue before, and we
> > are
> > > > focused on improving this by two possible ways.
> > > > >
> > > > > 1. Control the read strategy by the downstream side. In detail,
> every
> > > > input channel in downstream task corresponds to the consumption of
> one
> > > > upstream source task, and we will tag each input channel with
> watermark
> > > to
> > > > find the lowest channel to read in high priority. In essence, we
> > actually
> > > > rely on the mechanism of backpressure. If the channel with highest
> > > > timestamp is not read by downstream task for a while, it will block
> the
> > > > corresponding source task to read when the buffers are exhausted. It
> is
> > > no
> > > > need to change the source interface in this way, but there are two
> > major
> > > > concerns: first it will affect the barier alignment resulting in
> > > checkpoint
> > > > delayed or expired. Second it can not confirm source consumption
> > > alignment
> > > > very precisely, and it is just a best effort way. So we gave up this
> > way
> > > > finally.
> > > > >
> > > > > 2. Add the new component of SourceCoordinator to coordinate the
> > source
> > > > consumption distributedly. For example we can start this componnet in
> > the
> > > > JobManager like the current role of CheckpointCoordinator. Then every
> > > > source task would commnicate with JobManager via current RPC
> mechanism,
> > > > maybe we can rely on the heartbeat message to attach the consumption
> > > > progress as the payloads. The JobManagerwill accumulator or state all
> > the
> > > > reported progress and then give responses for different source tasks.
> > We
> > > > can define a protocol for indicating the fast soruce task to sleep
> for
> > > > specific time for example. To do so, the coordinator has the global
> > > > informations to give the proper decision for individuals, so it seems
> > > more
> > > > precise. And it will not affect the barrier alignment, because the
> > > sleeping
> > > > fast source can release the lock to emit barrier as normal. The only
> > > > concern is the changes for source interface and may affect all
> related
> > > > source implementations.
> > > > >
> > > > > Currently we prefer to the second way to implement and will refer
> to
> > > > other good points above. :)
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > > ------------------------------------------------------------------
> > > > > 发件人:Jamie Grier <jgr...@lyft.com.INVALID>
> > > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > > 收件人:dev <dev@flink.apache.org>
> > > > > 主 题:Re: Sharing state between subtasks
> > > > >
> > > > > Here's a doc I started describing some changes we would like to
> make
> > > > > starting with the Kinesis Source.. It describes a refactoring of
> that
> > > > code
> > > > > specifically and also hopefully a pattern and some reusable code we
> > can
> > > > use
> > > > > in the other sources as well.  The end goal would be best-effort
> > > > event-time
> > > > > synchronization across all Flink sources but we are going to start
> > with
> > > > the
> > > > > Kinesis Source first.
> > > > >
> > > > > Please take a look and please provide thoughts and opinions about
> the
> > > > best
> > > > > state sharing mechanism to use -- that section is left blank and
> > we're
> > > > > especially looking for input there.
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > > >
> > > > > -Jamie
> > > > >
> > > > >
> > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <
> trohrm...@apache.org
> > >
> > > > wrote:
> > > > >
> > > > >> But on the Kafka source level it should be perfectly fine to do
> what
> > > > Elias
> > > > >> proposed. This is of course is not the perfect solution but could
> > > bring
> > > > us
> > > > >> forward quite a bit. The changes required for this should also be
> > > > minimal.
> > > > >> This would become obsolete once we have something like shared
> state.
> > > But
> > > > >> until then, I think it would worth a try.
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> > aljos...@apache.org
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> The reason this selective reading doesn't work well in Flink in
> the
> > > > >> moment
> > > > >>> is because of checkpointing. For checkpointing, checkpoint
> barriers
> > > > >> travel
> > > > >>> within the streams. If we selectively read from inputs based on
> > > > >> timestamps
> > > > >>> this is akin to blocking an input if that input is very far ahead
> > in
> > > > >> event
> > > > >>> time, which can happen when you have a very fast source and a
> slow
> > > > source
> > > > >>> (in event time), maybe because you're in a catchup phase. In
> those
> > > > cases
> > > > >>> it's better to simply not read the data at the sources, as Thomas
> > > said.
> > > > >>> This is also because with Kafka Streams, each operator is
> basically
> > > its
> > > > >> own
> > > > >>> job: it's reading from Kafka and writing to Kafka and there is
> not
> > a
> > > > >>> complex graph of different operations with network shuffles in
> > > between,
> > > > >> as
> > > > >>> you have with Flink.
> > > > >>>
> > > > >>> This different nature of Flink is also why I think that readers
> > need
> > > > >>> awareness of other readers to do the event-time alignment, and
> this
> > > is
> > > > >>> where shared state comes in.
> > > > >>>
> > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> > fearsome.lucid...@gmail.com>
> > > > >>> wrote:
> > > > >>>>
> > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <
> fhue...@gmail.com>
> > > > >> wrote:
> > > > >>>>
> > > > >>>>> I think the new source interface would be designed to be able
> to
> > > > >>> leverage
> > > > >>>>> shared state to achieve time alignment.
> > > > >>>>> I don't think this would be possible without some kind of
> shared
> > > > >> state.
> > > > >>>>>
> > > > >>>>> The problem of tasks that are far ahead in time cannot be
> solved
> > > with
> > > > >>>>> back-pressure.
> > > > >>>>> That's because a task cannot choose from which source task it
> > > accepts
> > > > >>>>> events and from which doesn't.
> > > > >>>>> If it blocks an input, all downstream tasks that are connected
> to
> > > the
> > > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > > >>>>> Therefore, all operators need to be able to handle events when
> > they
> > > > >>> arrive.
> > > > >>>>> If they cannot process them yet because they are too far ahead
> in
> > > > >> time,
> > > > >>>>> they are put in state.
> > > > >>>>>
> > > > >>>>
> > > > >>>> The idea I was suggesting is not for operators to block an
> input.
> > > > >>> Rather,
> > > > >>>> it is that they selectively choose from which input to process
> the
> > > > next
> > > > >>>> message from based on their timestamp, so long as there are
> > buffered
> > > > >>>> messages waiting to be processed.  That is a best-effort
> alignment
> > > > >>>> strategy.  Seems to work relatively well in practice, at least
> > > within
> > > > >>> Kafka
> > > > >>>> Streams.
> > > > >>>>
> > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate
> > for
> > > > >> both
> > > > >>>> its inputs.  Instead, it could keep them separate and
> selectively
> > > > >> consume
> > > > >>>> from the one that had a buffer available, and if both have
> buffers
> > > > >>>> available, from the buffer with the messages with a lower
> > timestamp.
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to