Hi, Mason

Think, the problem is related to 
https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358

if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) {

Integer is Java object, so comparison should use โ€˜equals()โ€™, not โ€˜==โ€™
It is Java, not Scala ๐Ÿ˜‰


Could you please fix this issue in your fix for 
https://issues.apache.org/jira/browse/FLINK-27479?



And thanks for the original fix.

From: Mason Chen <mas.chen6...@gmail.com>
Date: Tuesday, July 26, 2022 at 9:57 PM
To: Benenson, Michael <mikhail_benen...@intuit.com>
Cc: user@flink.apache.org <user@flink.apache.org>, Deshpande, Omkar 
<omkar_deshpa...@intuit.com>, Rosensweig, JD <jd_rosensw...@intuit.com>, Sana, 
Harish <harish_s...@intuit.com>
Subject: Re: Hybrid Source stop processing files after processing 128 
SourceFactories
This email is from an external sender.

Hi Michael,

I'm glad the CPU fix works for you!

Regarding the behavior, HybridSource should only consume from Kafka after it 
finishes the bounded read of the files. At that time, files will not be read 
anymore. In addition, there is no limitation where there can only be 128 source 
factories (the upper limit should be integer max).

Can you give more details on how the HybridSource is configured? Are all 
sources unbounded? When you say it stopped processing files, does this mean it 
stops reading from Kafka too? How do you know the program is stalling? Is the 
metric numRecordsInPerSecond from the source operator 0?

Best,
Mason

On Mon, Jul 25, 2022 at 7:52 PM Benenson, Michael 
<mikhail_benen...@intuit.com<mailto:mikhail_benen...@intuit.com>> wrote:
Hi, folks

I have tried fix FLINK-27479<https://issues.apache.org/jira/browse/FLINK-27479> 
for Hybrid Source from https://github.com/apache/flink/pull/20215  in Flink 14.3

It works fine, but Flink stops processing files after processing 128 
SourceFactories. I have run this program a few times, starting without 
savepoint, and each time the program hangs up, after processing 128 
SourceFactories. Program does not crash or terminate, but stop processing files.

My program is like the Hybrid source example: reading multiple files, and then 
reading from Kafka

In my case program reads a few hundred directories from s3, that contains 
snappy files, so for each directory it creates separate 
HybridSource.SourceFactory, and the last one is the SourceFactory for reading 
from Kafka.

Any idea, what could be wring? Is it a known restriction, that there should be 
no more than 128 Source Factories?
I have the program running now, so I could collect any additional info to 
clarify the cause of the problem.

Here are the last few lines from JobManager before program stop processing 
files.

2022/07/26 01:02:35.248 INFO  o.a.f.c.f.s.i.StaticFileSplitEnumerator - No more 
splits available for subtask 0
2022/07/26 01:02:36.249 INFO  c.i.strmprocess.hybrid.ReadS3Hybrid1 - Reading 
input data from path 
s3://idl-kafka-connect-ued-raw-uw2-data-lake-e2e/data/topics/sbseg-qbo-clickstream/d_20220715-0800
 for 2022-07-15T08:00:00Z
2022/07/26 01:02:36.618 INFO  o.a.f.c.b.s.h.HybridSourceSplitEnumerator - 
Starting enumerator for sourceIndex=128
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 1
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 2
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 1


Reply via email to