Hi! You're not only grouping by the over window but also grouping by the value, thus only the records with the same value will be in the same group. I guess this is no intended.
Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 上午3:05写道: > I have set up a program that takes bits 0 and 1 from a Kafka topic and > then uses Flink to create a sliding count window of size 5. In that window, > I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output > 0. > Currently, I follow the way of calculating the sum of bits in the window. > > import os > from urllib.parse import quote > > from pyflink.common import Row > from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings > from pyflink.table.expressions import col, lit, row_interval > from pyflink.table.udf import udf > from pyflink.table.window import Slide > > > settings = EnvironmentSettings.new_instance( > ).in_streaming_mode().use_blink_planner().build() > table_env = TableEnvironment.create(settings) > > kafka_sql_connector_jar_path = > quote(os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'flink-sql-connector-kafka_2.12-1.13.2.jar')) > > table_env.get_config() \ > .get_configuration() \ > .set_string("pipeline.jars", > "file://{}".format(kafka_sql_connector_jar_path)) > > WINDOW_SIZE = 5 > WINDOW_SLIDE = 1 > THRESHOLD = 3 > > > @udf(result_type=DataTypes.INT()) > def read_raw_data(data): > return int(data, base=0) > > > def sliding_window_demo(): > source_ddl = """ > CREATE TABLE input( > val BINARY, > proctime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'flink-demo-input', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'flink-demo', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'raw' > ) > """ > > temp_ddl = """ > CREATE TABLE temp( > res INT > ) WITH ( > 'connector' = 'print' > ) > """ > > table_env.execute_sql(source_ddl) > table_env.execute_sql(temp_ddl) > > table = table_env.from_path('input') > > result = table.window(Slide.over( > > row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w")) > \ > .group_by(col('w'), col('val')) \ > .select(read_raw_data(col('val')).sum) > > result.execute_insert('temp').wait() > > > However, when I call the sum expression on those bits in the window, every > time I add a 0 to the stream, the result is always 0, and when I add a 1, > it always returns 5 (which is the window size). > Can you tell me what I'm doing wrong? Thank you so much. > > -- > ------------------------------------------------------------ > -------------------------------------------------- > Nguyen Dich Long, > School of Information and Communication Technology (SoICT - > https://www.soict.hust.edu.vn) > Hanoi University of Science and Technology (https://www.hust.edu.vn) > 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha > Noi, Vietnam > Tel: +84 (0)3.54.41.76.76 > Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com >