Hi Dan,

unfortunately Flink currently provides no source level synchronization,
except for Kinesis [1], so it's easy to run into large states, when
processing historical data.

There is an on-going effort, to provide a generic watermark-based alignment
of FLIP-27 sources [2], that will most likely help to mitigate the issue.

[1] https://issues.apache.org/jira/browse/FLINK-10886
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

Best,
D.

On Wed, Jul 21, 2021 at 8:43 AM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Dan,
> > I've tried playing around with parallelism and resources.  It does help.
> Glad to hear your problem is solved 😀.
>
> > Does Flink have special logic with the built in interval join code that
> impacts how kafka data sources are read?
> No. If you said the way I mentioned in the last email, I mean to add
> control the consumption order of each source in a custom Kafka connector.
>
> Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 下午2:10写道:
>
>> Thanks JING and Caizhi!
>>
>> Yea, I've tried playing around with parallelism and resources.  It does
>> help.
>>
>> We have our own join operator that acts like an interval join (with fuzzy
>> matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
>> after the internal interval join code.  Does Flink have special logic with
>> the built in interval join code that impacts how kafka data sources are
>> read?
>>
>>
>>
>> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG <beyond1...@gmail.com> wrote:
>>
>>> Hi Dan,
>>> You are right. In interval join, if one of input stream is far ahead of
>>> the other one, its data would be buffered into state until watermark of the
>>> other input stream catches up.
>>> This is a known issue of interval join. And this situation is even worse
>>> in your example because of the following reasons:
>>> 1. Running as backfills
>>> 2. There are cascading interval joins in the topology
>>>
>>> There is a hack way to walk around, hope it helps. Control the consume
>>> data of each source based on the following sequence:
>>> 1. Consume the larger data source in the same join after the smaller
>>> source consumption finished.
>>> 2. Consume the source in the following join after the previous join
>>> finished
>>>
>>> BTW: Please double check you use interval join instead of regular join,
>>> this would happen if compare two field with regular timestamp type in join
>>> condition instead of time attribute.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 上午4:25写道:
>>>
>>>> Hi.  My team's flink job has cascading interval joins.  The problem I'm
>>>> outlining below is fine when streaming normally.  It's an issue with
>>>> backfills.  We've been running into a bunch of backfills to evaluate the
>>>> job over older data.
>>>>
>>>> When running as backfills, I've noticed that sometimes one of
>>>> downstream kafka inputs will read in a lot of data from it's kafka source
>>>> before the upstream kafka sources makes much progress.  The downstream
>>>> kafka source gets far ahead of the interval join window constrained by the
>>>> upstream sources.  This appears to cause the state to grow unnecessarily
>>>> and has caused checkpoint failures.  I assumed there was built in Flink
>>>> code to not get too far ahead for a single downstream kafka source.
>>>> Looking through the code, I don't think this exists.
>>>>
>>>> Is this a known issue with trying to use Flink to backfill?  Am I
>>>> misunderstanding something?
>>>>
>>>> Here's an example flow chart for a cascading join job.  One of the
>>>> right kafka data sources goes 10x-100x more records than the left data
>>>> sources and causes state to grow.
>>>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>>>>
>>>

Reply via email to