Thank you for reporting the issue. Mason has already identified the root cause and the JIRA is now assigned to him: https://issues.apache.org/jira/browse/FLINK-27479
Thomas On Tue, May 3, 2022 at 4:02 AM Martijn Visser <martijnvis...@apache.org> wrote: > I'm looping in @Thomas Weise <t...@apache.org> since he has expertise on > the HybridSource. > > On Tue, 3 May 2022 at 12:04, Arthur Li <lianyou1...@126.com> wrote: > >> Hi Mason, >> >> I upload the code and resource files to AwesomeArthurLi/quickstart: >> quickstart (github.com) <https://github.com/AwesomeArthurLi/quickstart>, >> may it will help you reproduce the issue. >> >> BR. >> Arthur Li >> >> >> 2022年5月3日 15:48,Mason Chen <mas.chen6...@gmail.com> 写道: >> >> Hi Arthur, >> >> Coincidentally, I also encountered a similar issue recently. For my >> issue, I noticed that the source implementation always marks itself as >> having data available causing the Flink runtime to repeatedly loop in >> succession and causing high CPU utilization. More details in here: >> https://issues.apache.org/jira/browse/FLINK-27479 >> >> Can you provide a minimal working example to reproduce this issue? I >> presume you notice high CPU utilization before switching from FileSource >> and also after switching to KafkaSource? >> >> Best, >> Mason >> >> On Sun, May 1, 2022 at 6:24 AM Arthur Li <lianyou1...@126.com> wrote: >> >>> Following snapshot is the java process’s frame graph. >>> >>> <粘贴的图形-1.png> >>> >>> >>> 2022年5月1日 09:14,Arthur Li <lianyou1...@126.com> 写道: >>> >>> Hi all, >>> >>> the Hybrid Source | Apache Flink >>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> >>> is >>> one of new features of Flink 1.14.x, but one problem is it takes over* >>> 700% CPU* which is almost 5 times than these two splits. >>> >>> >>> My Environment: >>> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12" >>> Scala: Scala code runner version 2.12.14 >>> OS: MacOS Monterey >>> >>> >>> Hybrid Source Code: >>> >>> object HelloHybrid { >>> >>> def main(args: Array[String]): Unit = { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val kafka = >>> KafkaSource.builder[String]() >>> .setBootstrapServers("localhost:9092") >>> .setTopics("lab-flink-sensor-iot") >>> .setGroupId("sensor-iot-group") >>> .setStartingOffsets(OffsetsInitializer.earliest()) >>> .setValueOnlyDeserializer(new SimpleStringSchema()) >>> .build() >>> >>> val sensorDataFile = >>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv" >>> val fileData = FileSource.forRecordStreamFormat( >>> new TextLineFormat(), >>> Path.fromLocalFile(new File(sensorDataFile))) >>> .build() >>> >>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build() >>> >>> env.fromSource(hybridSrc, >>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), >>> "kafka & file hybrid source") >>> .map(data => { >>> val arr = data.split(",").map(_.trim) >>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) >>> }) >>> .print("hybrid") >>> >>> env.execute("Hello kafka & file hybrid source") >>> } >>> } >>> >>> >>> >>> >>> >>