I have set up a program that takes bits 0 and 1 from a Kafka topic and then uses Flink to create a sliding count window of size 5. In that window, I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output 0. Currently, I follow the way of calculating the sum of bits in the window.
import os from urllib.parse import quote from pyflink.common import Row from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings from pyflink.table.expressions import col, lit, row_interval from pyflink.table.udf import udf from pyflink.table.window import Slide settings = EnvironmentSettings.new_instance( ).in_streaming_mode().use_blink_planner().build() table_env = TableEnvironment.create(settings) kafka_sql_connector_jar_path = quote(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flink-sql-connector-kafka_2.12-1.13.2.jar')) table_env.get_config() \ .get_configuration() \ .set_string("pipeline.jars", "file://{}".format(kafka_sql_connector_jar_path)) WINDOW_SIZE = 5 WINDOW_SLIDE = 1 THRESHOLD = 3 @udf(result_type=DataTypes.INT()) def read_raw_data(data): return int(data, base=0) def sliding_window_demo(): source_ddl = """ CREATE TABLE input( val BINARY, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'flink-demo-input', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'flink-demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'raw' ) """ temp_ddl = """ CREATE TABLE temp( res INT ) WITH ( 'connector' = 'print' ) """ table_env.execute_sql(source_ddl) table_env.execute_sql(temp_ddl) table = table_env.from_path('input') result = table.window(Slide.over( row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w")) \ .group_by(col('w'), col('val')) \ .select(read_raw_data(col('val')).sum) result.execute_insert('temp').wait() However, when I call the sum expression on those bits in the window, every time I add a 0 to the stream, the result is always 0, and when I add a 1, it always returns 5 (which is the window size). Can you tell me what I'm doing wrong? Thank you so much. -- ------------------------------------------------------------ -------------------------------------------------- Nguyen Dich Long, School of Information and Communication Technology (SoICT - https://www.soict.hust.edu.vn) Hanoi University of Science and Technology (https://www.hust.edu.vn) 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha Noi, Vietnam Tel: +84 (0)3.54.41.76.76 Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com