Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) 
> <wangzhijiang...@aliyun.com.INVALID> wrote:
> 
> Just noticed this discussion from @Till Rohrmann's weekly community update 
> and I want to share some thoughts from our experiences.
> 
> We also encountered the source consuption skew issue before, and we are 
> focused on improving this by two possible ways.
> 
> 1. Control the read strategy by the downstream side. In detail, every input 
> channel in downstream task corresponds to the consumption of one upstream 
> source task, and we will tag each input channel with watermark to find the 
> lowest channel to read in high priority. In essence, we actually rely on the 
> mechanism of backpressure. If the channel with highest timestamp is not read 
> by downstream task for a while, it will block the corresponding source task 
> to read when the buffers are exhausted. It is no need to change the source 
> interface in this way, but there are two major concerns: first it will affect 
> the barier alignment resulting in checkpoint delayed or expired. Second it 
> can not confirm source consumption alignment very precisely, and it is just a 
> best effort way. So we gave up this way finally.
> 
> 2. Add the new component of SourceCoordinator to coordinate the source 
> consumption distributedly. For example we can start this componnet in the 
> JobManager like the current role of CheckpointCoordinator. Then every source 
> task would commnicate with JobManager via current RPC mechanism, maybe we can 
> rely on the heartbeat message to attach the consumption progress as the 
> payloads. The JobManagerwill accumulator or state all the reported progress 
> and then give responses for different source tasks. We can define a protocol 
> for indicating the fast soruce task to sleep for specific time for example. 
> To do so, the coordinator has the global informations to give the proper 
> decision for individuals, so it seems more precise. And it will not affect 
> the barrier alignment, because the sleeping fast source can release the lock 
> to emit barrier as normal. The only concern is the changes for source 
> interface and may affect all related source implementations.
> 
> Currently we prefer to the second way to implement and will refer to other 
> good points above. :)
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Jamie Grier <jgr...@lyft.com.INVALID>
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev <dev@flink.apache.org>
> 主 题:Re: Sharing state between subtasks
> 
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
> 
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
> 
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> 
> -Jamie
> 
> 
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrm...@apache.org> wrote:
> 
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> 
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those cases
>>> it's better to simply not read the data at the sources, as Thomas said.
>>> This is also because with Kafka Streams, each operator is basically its
>> own
>>> job: it's reading from Kafka and writing to Kafka and there is not a
>>> complex graph of different operations with network shuffles in between,
>> as
>>> you have with Flink.
>>> 
>>> This different nature of Flink is also why I think that readers need
>>> awareness of other readers to do the event-time alignment, and this is
>>> where shared state comes in.
>>> 
>>>> On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com>
>>> wrote:
>>>> 
>>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>>> 
>>>>> I think the new source interface would be designed to be able to
>>> leverage
>>>>> shared state to achieve time alignment.
>>>>> I don't think this would be possible without some kind of shared
>> state.
>>>>> 
>>>>> The problem of tasks that are far ahead in time cannot be solved with
>>>>> back-pressure.
>>>>> That's because a task cannot choose from which source task it accepts
>>>>> events and from which doesn't.
>>>>> If it blocks an input, all downstream tasks that are connected to the
>>>>> operator are affected. This can easily lead to deadlocks.
>>>>> Therefore, all operators need to be able to handle events when they
>>> arrive.
>>>>> If they cannot process them yet because they are too far ahead in
>> time,
>>>>> they are put in state.
>>>>> 
>>>> 
>>>> The idea I was suggesting is not for operators to block an input.
>>> Rather,
>>>> it is that they selectively choose from which input to process the next
>>>> message from based on their timestamp, so long as there are buffered
>>>> messages waiting to be processed.  That is a best-effort alignment
>>>> strategy.  Seems to work relatively well in practice, at least within
>>> Kafka
>>>> Streams.
>>>> 
>>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
>> both
>>>> its inputs.  Instead, it could keep them separate and selectively
>> consume
>>>> from the one that had a buffer available, and if both have buffers
>>>> available, from the buffer with the messages with a lower timestamp.
>>> 
>>> 
>> 
> 

Reply via email to