Hi Dan, unfortunately Flink currently provides no source level synchronization, except for Kinesis [1], so it's easy to run into large states, when processing historical data.
There is an on-going effort, to provide a generic watermark-based alignment of FLIP-27 sources [2], that will most likely help to mitigate the issue. [1] https://issues.apache.org/jira/browse/FLINK-10886 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources Best, D. On Wed, Jul 21, 2021 at 8:43 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Dan, > > I've tried playing around with parallelism and resources. It does help. > Glad to hear your problem is solved 😀. > > > Does Flink have special logic with the built in interval join code that > impacts how kafka data sources are read? > No. If you said the way I mentioned in the last email, I mean to add > control the consumption order of each source in a custom Kafka connector. > > Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 下午2:10写道: > >> Thanks JING and Caizhi! >> >> Yea, I've tried playing around with parallelism and resources. It does >> help. >> >> We have our own join operator that acts like an interval join (with fuzzy >> matching). We wrote our own KeyedCoProcessFunction and modeled it closely >> after the internal interval join code. Does Flink have special logic with >> the built in interval join code that impacts how kafka data sources are >> read? >> >> >> >> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG <beyond1...@gmail.com> wrote: >> >>> Hi Dan, >>> You are right. In interval join, if one of input stream is far ahead of >>> the other one, its data would be buffered into state until watermark of the >>> other input stream catches up. >>> This is a known issue of interval join. And this situation is even worse >>> in your example because of the following reasons: >>> 1. Running as backfills >>> 2. There are cascading interval joins in the topology >>> >>> There is a hack way to walk around, hope it helps. Control the consume >>> data of each source based on the following sequence: >>> 1. Consume the larger data source in the same join after the smaller >>> source consumption finished. >>> 2. Consume the source in the following join after the previous join >>> finished >>> >>> BTW: Please double check you use interval join instead of regular join, >>> this would happen if compare two field with regular timestamp type in join >>> condition instead of time attribute. >>> >>> Best, >>> JING ZHANG >>> >>> Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 上午4:25写道: >>> >>>> Hi. My team's flink job has cascading interval joins. The problem I'm >>>> outlining below is fine when streaming normally. It's an issue with >>>> backfills. We've been running into a bunch of backfills to evaluate the >>>> job over older data. >>>> >>>> When running as backfills, I've noticed that sometimes one of >>>> downstream kafka inputs will read in a lot of data from it's kafka source >>>> before the upstream kafka sources makes much progress. The downstream >>>> kafka source gets far ahead of the interval join window constrained by the >>>> upstream sources. This appears to cause the state to grow unnecessarily >>>> and has caused checkpoint failures. I assumed there was built in Flink >>>> code to not get too far ahead for a single downstream kafka source. >>>> Looking through the code, I don't think this exists. >>>> >>>> Is this a known issue with trying to use Flink to backfill? Am I >>>> misunderstanding something? >>>> >>>> Here's an example flow chart for a cascading join job. One of the >>>> right kafka data sources goes 10x-100x more records than the left data >>>> sources and causes state to grow. >>>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png] >>>> >>>