Hi Dan, > I've tried playing around with parallelism and resources. It does help. Glad to hear your problem is solved 😀.
> Does Flink have special logic with the built in interval join code that impacts how kafka data sources are read? No. If you said the way I mentioned in the last email, I mean to add control the consumption order of each source in a custom Kafka connector. Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 下午2:10写道: > Thanks JING and Caizhi! > > Yea, I've tried playing around with parallelism and resources. It does > help. > > We have our own join operator that acts like an interval join (with fuzzy > matching). We wrote our own KeyedCoProcessFunction and modeled it closely > after the internal interval join code. Does Flink have special logic with > the built in interval join code that impacts how kafka data sources are > read? > > > > On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Dan, >> You are right. In interval join, if one of input stream is far ahead of >> the other one, its data would be buffered into state until watermark of the >> other input stream catches up. >> This is a known issue of interval join. And this situation is even worse >> in your example because of the following reasons: >> 1. Running as backfills >> 2. There are cascading interval joins in the topology >> >> There is a hack way to walk around, hope it helps. Control the consume >> data of each source based on the following sequence: >> 1. Consume the larger data source in the same join after the smaller >> source consumption finished. >> 2. Consume the source in the following join after the previous join >> finished >> >> BTW: Please double check you use interval join instead of regular join, >> this would happen if compare two field with regular timestamp type in join >> condition instead of time attribute. >> >> Best, >> JING ZHANG >> >> Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 上午4:25写道: >> >>> Hi. My team's flink job has cascading interval joins. The problem I'm >>> outlining below is fine when streaming normally. It's an issue with >>> backfills. We've been running into a bunch of backfills to evaluate the >>> job over older data. >>> >>> When running as backfills, I've noticed that sometimes one of downstream >>> kafka inputs will read in a lot of data from it's kafka source before the >>> upstream kafka sources makes much progress. The downstream kafka source >>> gets far ahead of the interval join window constrained by the upstream >>> sources. This appears to cause the state to grow unnecessarily and has >>> caused checkpoint failures. I assumed there was built in Flink code to not >>> get too far ahead for a single downstream kafka source. Looking through >>> the code, I don't think this exists. >>> >>> Is this a known issue with trying to use Flink to backfill? Am I >>> misunderstanding something? >>> >>> Here's an example flow chart for a cascading join job. One of the right >>> kafka data sources goes 10x-100x more records than the left data sources >>> and causes state to grow. >>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png] >>> >>