Hi Mason, I upload the code and resource files to AwesomeArthurLi/quickstart: quickstart (github.com) <https://github.com/AwesomeArthurLi/quickstart>, may it will help you reproduce the issue.
BR. Arthur Li > 2022年5月3日 15:48,Mason Chen <mas.chen6...@gmail.com> 写道: > > Hi Arthur, > > Coincidentally, I also encountered a similar issue recently. For my issue, I > noticed that the source implementation always marks itself as having data > available causing the Flink runtime to repeatedly loop in succession and > causing high CPU utilization. More details in here: > https://issues.apache.org/jira/browse/FLINK-27479 > <https://issues.apache.org/jira/browse/FLINK-27479> > > Can you provide a minimal working example to reproduce this issue? I presume > you notice high CPU utilization before switching from FileSource and also > after switching to KafkaSource? > > Best, > Mason > > On Sun, May 1, 2022 at 6:24 AM Arthur Li <lianyou1...@126.com > <mailto:lianyou1...@126.com>> wrote: > Following snapshot is the java process’s frame graph. > > <粘贴的图形-1.png> > > >> 2022年5月1日 09:14,Arthur Li <lianyou1...@126.com <mailto:lianyou1...@126.com>> >> 写道: >> >> Hi all, >> >> the Hybrid Source | Apache Flink >> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> >> is one of new features of Flink 1.14.x, but one problem is it takes over >> 700% CPU which is almost 5 times than these two splits. >> >> >> My Environment: >> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12" >> Scala: Scala code runner version 2.12.14 >> OS: MacOS Monterey >> >> >> Hybrid Source Code: >> >> object HelloHybrid { >> def main(args: Array[String]): Unit = { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val kafka = >> KafkaSource.builder[String]() >> .setBootstrapServers("localhost:9092") >> .setTopics("lab-flink-sensor-iot") >> .setGroupId("sensor-iot-group") >> .setStartingOffsets(OffsetsInitializer.earliest()) >> .setValueOnlyDeserializer(new SimpleStringSchema()) >> .build() >> >> val sensorDataFile = >> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv" >> val fileData = FileSource.forRecordStreamFormat( >> new TextLineFormat(), >> Path.fromLocalFile(new File(sensorDataFile))) >> .build() >> >> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build() >> >> env.fromSource(hybridSrc, >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), >> "kafka & file hybrid source") >> .map(data => { >> val arr = data.split(",").map(_.trim) >> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) >> }) >> .print("hybrid") >> >> env.execute("Hello kafka & file hybrid source") >> } >> } >> >> >