Hi. Thank you for the clarification. I updated my code as below and got the desired result.
result = table.window(Slide.over( row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w")) \ .group_by(col('w')) \ .select(call(read_raw_data, col('val')).sum) However, I can only compute some aggregation over those bits and cannot select the val column individually. If I remove the .sum like this: .select(call(read_raw_data, col('val'))) I get this error: py4j.protocol.Py4JJavaError: An error occurred while calling o90.select. : org.apache.flink.table.api.ValidationException: Cannot resolve field [val], input field list:[]. Do you know why? On Tue, Nov 2, 2021 at 9:25 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > You're not only grouping by the over window but also grouping by the > value, thus only the records with the same value will be in the same group. > I guess this is no intended. > > Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 上午3:05写道: > >> I have set up a program that takes bits 0 and 1 from a Kafka topic and >> then uses Flink to create a sliding count window of size 5. In that window, >> I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output >> 0. >> Currently, I follow the way of calculating the sum of bits in the window. >> >> import os >> from urllib.parse import quote >> >> from pyflink.common import Row >> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings >> from pyflink.table.expressions import col, lit, row_interval >> from pyflink.table.udf import udf >> from pyflink.table.window import Slide >> >> >> settings = EnvironmentSettings.new_instance( >> ).in_streaming_mode().use_blink_planner().build() >> table_env = TableEnvironment.create(settings) >> >> kafka_sql_connector_jar_path = >> quote(os.path.join(os.path.abspath(os.path.dirname(__file__)), >> >> 'flink-sql-connector-kafka_2.12-1.13.2.jar')) >> >> table_env.get_config() \ >> .get_configuration() \ >> .set_string("pipeline.jars", >> "file://{}".format(kafka_sql_connector_jar_path)) >> >> WINDOW_SIZE = 5 >> WINDOW_SLIDE = 1 >> THRESHOLD = 3 >> >> >> @udf(result_type=DataTypes.INT()) >> def read_raw_data(data): >> return int(data, base=0) >> >> >> def sliding_window_demo(): >> source_ddl = """ >> CREATE TABLE input( >> val BINARY, >> proctime AS PROCTIME() >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'flink-demo-input', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'flink-demo', >> 'scan.startup.mode' = 'earliest-offset', >> 'format' = 'raw' >> ) >> """ >> >> temp_ddl = """ >> CREATE TABLE temp( >> res INT >> ) WITH ( >> 'connector' = 'print' >> ) >> """ >> >> table_env.execute_sql(source_ddl) >> table_env.execute_sql(temp_ddl) >> >> table = table_env.from_path('input') >> >> result = table.window(Slide.over( >> >> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w")) >> \ >> .group_by(col('w'), col('val')) \ >> .select(read_raw_data(col('val')).sum) >> >> result.execute_insert('temp').wait() >> >> >> However, when I call the sum expression on those bits in the window, >> every time I add a 0 to the stream, the result is always 0, and when I add >> a 1, it always returns 5 (which is the window size). >> Can you tell me what I'm doing wrong? Thank you so much. >> >> -- >> ------------------------------------------------------------ >> -------------------------------------------------- >> Nguyen Dich Long, >> School of Information and Communication Technology (SoICT - >> https://www.soict.hust.edu.vn) >> Hanoi University of Science and Technology (https://www.hust.edu.vn) >> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha >> Noi, Vietnam >> Tel: +84 (0)3.54.41.76.76 >> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com >> > -- ------------------------------------------------------------ -------------------------------------------------- Nguyen Dich Long, School of Information and Communication Technology (SoICT - https://www.soict.hust.edu.vn) Hanoi University of Science and Technology (https://www.hust.edu.vn) 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha Noi, Vietnam Tel: +84 (0)3.54.41.76.76 Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com