Hi all,

 the Hybrid Source | Apache Flink 
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source>
 is one of new features of Flink 1.14.x,  but one problem is it takes over 700% 
CPU which is almost 5 times than these two splits.


My Environment:
JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
Scala: Scala code runner version 2.12.14
OS: MacOS Monterey


Hybrid Source Code:

object HelloHybrid {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val kafka =
      KafkaSource.builder[String]()
        .setBootstrapServers("localhost:9092")
        .setTopics("lab-flink-sensor-iot")
        .setGroupId("sensor-iot-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build()

    val sensorDataFile = 
"/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
    val fileData = FileSource.forRecordStreamFormat(
      new TextLineFormat(),
      Path.fromLocalFile(new File(sensorDataFile)))
      .build()

    val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()

    env.fromSource(hybridSrc,
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
      "kafka & file hybrid source")
      .map(data => {
        val arr = data.split(",").map(_.trim)
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
      .print("hybrid")

    env.execute("Hello kafka & file hybrid source")
  }
}


Reply via email to