Hi Team,

I have a Flink job that consumes from a kafka topic and tries to create
windows (using Tumble) based on few columns like eventId and eventName.
Kafka topic has data in format of comma separated values like below:

event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32


Here is the Flink code I’m using for this:

main() {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)

    val kafkaSource = KafkaSource.builder<String>()
        .setBootstrapServers("localhost:9092")
        .setTopics("an-topic")
        .setGroupId("testGroup")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(SimpleStringSchema())
        .build()

    val kafkaStream = env.fromSource(kafkaSource,
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)),
"KafkaSource")

    val kafkaRowMapper = kafkaStream.map(RowMapper())

    val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
        Schema.newBuilder()
            .columnByExpression("proc_time", "PROCTIME()")
            .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
            .watermark("event_time", "event_time - INTERVAL '20' SECOND")
            .build()
    ).renameColumns(
        `$`("f0").`as`("eventId"),
        `$`("f1").`as`("eventName"),
        `$`("f3").`as`("eventValue")
    )
    tableEnv.createTemporaryView("finalTable", finalTable)

    val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time,
INTERVAL '1' MINUTE) AS event_time_new, " +
            "LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
            "GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL
'1' MINUTE)"
    val resultTable = tableEnv.sqlQuery(sqlQuery)
    tableEnv.toDataStream(resultTable).print()

    env.execute("TestJob")
}

class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> {
    override fun map(value: String): Tuple4<String, String, Long, Float> {
        val lineArray = value.split(",")

        return Tuple4 (lineArray[0], lineArray[1],
lineArray[2].toLong(), lineArray[3].toFloat())
    }
}


When I use proc_time instead of event_time, I’m able to create windows and
see results.
I suspect that the code is correct but the way I'm testing is wrong. How
shall I test this for event time?

Can someone please help me on what mistake I’m making here?

Thanks in advance.

Reply via email to