Hi, Mason Think, the problem is related to https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358
if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) { Integer is Java object, so comparison should use โequals()โ, not โ==โ It is Java, not Scala ๐ Could you please fix this issue in your fix for https://issues.apache.org/jira/browse/FLINK-27479? And thanks for the original fix. From: Mason Chen <mas.chen6...@gmail.com> Date: Tuesday, July 26, 2022 at 9:57 PM To: Benenson, Michael <mikhail_benen...@intuit.com> Cc: user@flink.apache.org <user@flink.apache.org>, Deshpande, Omkar <omkar_deshpa...@intuit.com>, Rosensweig, JD <jd_rosensw...@intuit.com>, Sana, Harish <harish_s...@intuit.com> Subject: Re: Hybrid Source stop processing files after processing 128 SourceFactories This email is from an external sender. Hi Michael, I'm glad the CPU fix works for you! Regarding the behavior, HybridSource should only consume from Kafka after it finishes the bounded read of the files. At that time, files will not be read anymore. In addition, there is no limitation where there can only be 128 source factories (the upper limit should be integer max). Can you give more details on how the HybridSource is configured? Are all sources unbounded? When you say it stopped processing files, does this mean it stops reading from Kafka too? How do you know the program is stalling? Is the metric numRecordsInPerSecond from the source operator 0? Best, Mason On Mon, Jul 25, 2022 at 7:52 PM Benenson, Michael <mikhail_benen...@intuit.com<mailto:mikhail_benen...@intuit.com>> wrote: Hi, folks I have tried fix FLINK-27479<https://issues.apache.org/jira/browse/FLINK-27479> for Hybrid Source from https://github.com/apache/flink/pull/20215 in Flink 14.3 It works fine, but Flink stops processing files after processing 128 SourceFactories. I have run this program a few times, starting without savepoint, and each time the program hangs up, after processing 128 SourceFactories. Program does not crash or terminate, but stop processing files. My program is like the Hybrid source example: reading multiple files, and then reading from Kafka In my case program reads a few hundred directories from s3, that contains snappy files, so for each directory it creates separate HybridSource.SourceFactory, and the last one is the SourceFactory for reading from Kafka. Any idea, what could be wring? Is it a known restriction, that there should be no more than 128 Source Factories? I have the program running now, so I could collect any additional info to clarify the cause of the problem. Here are the last few lines from JobManager before program stop processing files. 2022/07/26 01:02:35.248 INFO o.a.f.c.f.s.i.StaticFileSplitEnumerator - No more splits available for subtask 0 2022/07/26 01:02:36.249 INFO c.i.strmprocess.hybrid.ReadS3Hybrid1 - Reading input data from path s3://idl-kafka-connect-ued-raw-uw2-data-lake-e2e/data/topics/sbseg-qbo-clickstream/d_20220715-0800 for 2022-07-15T08:00:00Z 2022/07/26 01:02:36.618 INFO o.a.f.c.b.s.h.HybridSourceSplitEnumerator - Starting enumerator for sourceIndex=128 2022/07/26 01:02:36.619 INFO o.a.f.r.s.c.SourceCoordinator - Source Source: hybrid-source received split request from parallel task 1 2022/07/26 01:02:36.619 INFO o.a.f.r.s.c.SourceCoordinator - Source Source: hybrid-source received split request from parallel task 2 2022/07/26 01:02:36.619 INFO o.a.f.r.s.c.SourceCoordinator - Source Source: hybrid-source received split request from parallel task 1