I have set up a program that takes bits 0 and 1 from a Kafka topic and then
uses Flink to create a sliding count window of size 5. In that window, I'd
like to output 1 if there are 3 or more of the bit 1, otherwise, output 0.
Currently, I follow the way of calculating the sum of bits in the window.

import os
from urllib.parse import quote

from pyflink.common import Row
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit, row_interval
from pyflink.table.udf import udf
from pyflink.table.window import Slide


settings = EnvironmentSettings.new_instance(
).in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(settings)

kafka_sql_connector_jar_path =
quote(os.path.join(os.path.abspath(os.path.dirname(__file__)),

'flink-sql-connector-kafka_2.12-1.13.2.jar'))

table_env.get_config() \
    .get_configuration() \
    .set_string("pipeline.jars",
"file://{}".format(kafka_sql_connector_jar_path))

WINDOW_SIZE = 5
WINDOW_SLIDE = 1
THRESHOLD = 3


@udf(result_type=DataTypes.INT())
def read_raw_data(data):
    return int(data, base=0)


def sliding_window_demo():
    source_ddl = """
            CREATE TABLE input(
                val BINARY,
                proctime AS PROCTIME()
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'flink-demo-input',
              'properties.bootstrap.servers' = 'localhost:9092',
              'properties.group.id' = 'flink-demo',
              'scan.startup.mode' = 'earliest-offset',
              'format' = 'raw'
            )
            """

    temp_ddl = """
            CREATE TABLE temp(
                res INT
            ) WITH (
              'connector' = 'print'
            )
            """

    table_env.execute_sql(source_ddl)
    table_env.execute_sql(temp_ddl)

    table = table_env.from_path('input')

    result = table.window(Slide.over(
        
row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
\
        .group_by(col('w'), col('val')) \
        .select(read_raw_data(col('val')).sum)

    result.execute_insert('temp').wait()


However, when I call the sum expression on those bits in the window, every
time I add a 0 to the stream, the result is always 0, and when I add a 1,
it always returns 5 (which is the window size).
Can you tell me what I'm doing wrong? Thank you so much.

-- 
------------------------------------------------------------
--------------------------------------------------
Nguyen Dich Long,
School of Information and Communication Technology (SoICT -
https://www.soict.hust.edu.vn)
Hanoi University of Science and Technology (https://www.hust.edu.vn)
601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha Noi,
Vietnam
Tel: +84 (0)3.54.41.76.76
Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com

Reply via email to