Hello, I'm working on a pyflink application (1.15 deployed to aws). I have made a few applications and I feel like I'm making good progress, but I have a different problem that I think requires that I have multiple stages each with their own window. I'm not sure how I can properly pass time into my second section. I'll share some pseudo code that shows what i'm trying to do.
# first i have a preformat stage that does some manipulation of the kinesis stream formatted_source = t_env.sql_query( """SELECT JSON_VALUE(`data`, '$.a') AS `a`, JSON_VALUE(`data`, '$.b') AS `b`, JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE) AS `x`, JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE) AS `y`, JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE) AS `z`, JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`, TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`, '$.request.tracking.time') AS BIGINT), 3) AS `time`, CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT) AS `v`, `proc_time` FROM source""" ) # next i have a stage that combines records by (a, b and time) first_stage = ( formatted_source.window( Tumble.over(lit(2).seconds).on(col("proc_time")).alias( "tumble_window") ) .group_by(col("a"), col("b"), col("time"), col("tumble_window")) .select( col("a"), col("b"), col("time"), first_process( col("a"), col("b"), col("x"), col("y"), col("z"), col("c"), col("time"), col("v"), ).alias("first_out"), ) .where(col("first_out").is_not_null) ) # I want to create a second stage here but i have no idea what to pass into the window ".on()" second_stage = ( first_stage.window( Slide.over(lit(1).hour).every(lit(2).seconds).on(col("proc_time" )).alias("slide_window") ) .group_by(col("a"),col("b"),col("slide_window")).select( col("a"), col("b"), second_process( col("a"), col("b"), col("first_out"), ).alias("second_out"), ) ).execute().print() second_stage.wait() My first stage and sql parts are working fine. I'm not sure how to pass in a valid time value into my second stage because I cannot add it to my first stage group. Any advice would be appreciated! Thank you! Nick Hecht