Hello,

I'm working on a pyflink application (1.15 deployed to aws). I have made a
few applications and I feel like I'm making good progress, but I have a
different problem that I think requires that I have multiple stages each
with their own window.
I'm not sure how I can properly pass time into my second section.  I'll
share some pseudo code that shows what i'm trying to do.


    # first i have a preformat stage that does some manipulation of the
kinesis stream
    formatted_source = t_env.sql_query(
        """SELECT
            JSON_VALUE(`data`, '$.a') AS `a`,
            JSON_VALUE(`data`, '$.b') AS `b`,
            JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE)
AS `x`,
            JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE)
AS `y`,
            JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE)
AS `z`,
            JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`,
            TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`,
'$.request.tracking.time') AS BIGINT), 3) AS `time`,
            CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT)
AS `v`,
            `proc_time`
        FROM source"""
    )

    # next i have a stage that combines records by (a, b and time)
    first_stage = (
        formatted_source.window(
            Tumble.over(lit(2).seconds).on(col("proc_time")).alias(
"tumble_window")
        )
        .group_by(col("a"), col("b"), col("time"), col("tumble_window"))
        .select(
            col("a"),
            col("b"),
            col("time"),
            first_process(
                col("a"),
                col("b"),
                col("x"),
                col("y"),
                col("z"),
                col("c"),
                col("time"),
                col("v"),
            ).alias("first_out"),
        )
        .where(col("first_out").is_not_null)
    )

    # I want to create a second stage here but i have no idea what to pass
into the window ".on()"
    second_stage = (
        first_stage.window(
            Slide.over(lit(1).hour).every(lit(2).seconds).on(col("proc_time"
)).alias("slide_window")
        )
        .group_by(col("a"),col("b"),col("slide_window")).select(
            col("a"),
            col("b"),
            second_process(
                col("a"),
                col("b"),
                col("first_out"),
            ).alias("second_out"),
        )
    ).execute().print()

    second_stage.wait()

My first stage and sql parts are working fine. I'm not sure how to pass in
a valid time value into my second stage because I cannot add it to my first
stage group.
Any advice would be appreciated!

Thank you!

Nick Hecht

Reply via email to