Hi all, the Hybrid Source | Apache Flink <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is one of new features of Flink 1.14.x, but one problem is it takes over 700% CPU which is almost 5 times than these two splits.
My Environment: JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12" Scala: Scala code runner version 2.12.14 OS: MacOS Monterey Hybrid Source Code: object HelloHybrid { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val kafka = KafkaSource.builder[String]() .setBootstrapServers("localhost:9092") .setTopics("lab-flink-sensor-iot") .setGroupId("sensor-iot-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build() val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv" val fileData = FileSource.forRecordStreamFormat( new TextLineFormat(), Path.fromLocalFile(new File(sensorDataFile))) .build() val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build() env.fromSource(hybridSrc, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka & file hybrid source") .map(data => { val arr = data.split(",").map(_.trim) SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) .print("hybrid") env.execute("Hello kafka & file hybrid source") } }