
You're not only grouping by the over window but also grouping by the value,
thus only the records with the same value will be in the same group. I
guess this is no intended.

Long Nguyễn 于2021年11月2日周二 上午3:05写道:

> I have set up a program that takes bits 0 and 1 from a Kafka topic and
> then uses Flink to create a sliding count window of size 5. In that window,
> I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output
> 0.
> Currently, I follow the way of calculating the sum of bits in the window.
> import os
> from urllib.parse import quote
> from pyflink.common import Row
> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
> from pyflink.table.expressions import col, lit, row_interval
> from pyflink.table.udf import udf
> from pyflink.table.window import Slide
> settings = EnvironmentSettings.new_instance(
> ).in_streaming_mode().use_blink_planner().build()
> table_env = TableEnvironment.create(settings)
> kafka_sql_connector_jar_path = 
> quote(os.path.join(os.path.abspath(os.path.dirname(__file__)),
> 'flink-sql-connector-kafka_2.12-1.13.2.jar'))
> table_env.get_config() \
>     .get_configuration() \
>     .set_string("pipeline.jars", 
> "file://{}".format(kafka_sql_connector_jar_path))
> @udf(result_type=DataTypes.INT())
> def read_raw_data(data):
>     return int(data, base=0)
> def sliding_window_demo():
>     source_ddl = """
>             CREATE TABLE input(
>                 val BINARY,
>                 proctime AS PROCTIME()
>             ) WITH (
>               'connector' = 'kafka',
>               'topic' = 'flink-demo-input',
>               'properties.bootstrap.servers' = 'localhost:9092',
>               'properties.group.id' = 'flink-demo',
>               'scan.startup.mode' = 'earliest-offset',
>               'format' = 'raw'
>             )
>             """
>     temp_ddl = """
>             CREATE TABLE temp(
>                 res INT
>             ) WITH (
>               'connector' = 'print'
>             )
>             """
>     table_env.execute_sql(source_ddl)
>     table_env.execute_sql(temp_ddl)
>     table = table_env.from_path('input')
>     result = table.window(Slide.over(
> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
>  \
>         .group_by(col('w'), col('val')) \
>         .select(read_raw_data(col('val')).sum)
>     result.execute_insert('temp').wait()
> However, when I call the sum expression on those bits in the window, every
> time I add a 0 to the stream, the result is always 0, and when I add a 1,
> it always returns 5 (which is the window size).
> Can you tell me what I'm doing wrong? Thank you so much.
