Hi. Thank you for the clarification.
I updated my code as below and got the desired result.

result = table.window(Slide.over(
        
row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
\
        .group_by(col('w')) \
        .select(call(read_raw_data, col('val')).sum)


However, I can only compute some aggregation over those bits and cannot
select the val column individually. If I remove the .sum like this:

.select(call(read_raw_data, col('val')))

I get this error:

py4j.protocol.Py4JJavaError: An error occurred while calling o90.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field
[val], input field list:[].

Do you know why?

On Tue, Nov 2, 2021 at 9:25 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> You're not only grouping by the over window but also grouping by the
> value, thus only the records with the same value will be in the same group.
> I guess this is no intended.
>
> Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 上午3:05写道:
>
>> I have set up a program that takes bits 0 and 1 from a Kafka topic and
>> then uses Flink to create a sliding count window of size 5. In that window,
>> I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output
>> 0.
>> Currently, I follow the way of calculating the sum of bits in the window.
>>
>> import os
>> from urllib.parse import quote
>>
>> from pyflink.common import Row
>> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
>> from pyflink.table.expressions import col, lit, row_interval
>> from pyflink.table.udf import udf
>> from pyflink.table.window import Slide
>>
>>
>> settings = EnvironmentSettings.new_instance(
>> ).in_streaming_mode().use_blink_planner().build()
>> table_env = TableEnvironment.create(settings)
>>
>> kafka_sql_connector_jar_path = 
>> quote(os.path.join(os.path.abspath(os.path.dirname(__file__)),
>>                                                   
>> 'flink-sql-connector-kafka_2.12-1.13.2.jar'))
>>
>> table_env.get_config() \
>>     .get_configuration() \
>>     .set_string("pipeline.jars", 
>> "file://{}".format(kafka_sql_connector_jar_path))
>>
>> WINDOW_SIZE = 5
>> WINDOW_SLIDE = 1
>> THRESHOLD = 3
>>
>>
>> @udf(result_type=DataTypes.INT())
>> def read_raw_data(data):
>>     return int(data, base=0)
>>
>>
>> def sliding_window_demo():
>>     source_ddl = """
>>             CREATE TABLE input(
>>                 val BINARY,
>>                 proctime AS PROCTIME()
>>             ) WITH (
>>               'connector' = 'kafka',
>>               'topic' = 'flink-demo-input',
>>               'properties.bootstrap.servers' = 'localhost:9092',
>>               'properties.group.id' = 'flink-demo',
>>               'scan.startup.mode' = 'earliest-offset',
>>               'format' = 'raw'
>>             )
>>             """
>>
>>     temp_ddl = """
>>             CREATE TABLE temp(
>>                 res INT
>>             ) WITH (
>>               'connector' = 'print'
>>             )
>>             """
>>
>>     table_env.execute_sql(source_ddl)
>>     table_env.execute_sql(temp_ddl)
>>
>>     table = table_env.from_path('input')
>>
>>     result = table.window(Slide.over(
>>         
>> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
>>  \
>>         .group_by(col('w'), col('val')) \
>>         .select(read_raw_data(col('val')).sum)
>>
>>     result.execute_insert('temp').wait()
>>
>>
>> However, when I call the sum expression on those bits in the window,
>> every time I add a 0 to the stream, the result is always 0, and when I add
>> a 1, it always returns 5 (which is the window size).
>> Can you tell me what I'm doing wrong? Thank you so much.
>>
>> --
>> ------------------------------------------------------------
>> --------------------------------------------------
>> Nguyen Dich Long,
>> School of Information and Communication Technology (SoICT -
>> https://www.soict.hust.edu.vn)
>> Hanoi University of Science and Technology (https://www.hust.edu.vn)
>> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha
>> Noi, Vietnam
>> Tel: +84 (0)3.54.41.76.76
>> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com
>>
>

-- 
------------------------------------------------------------
--------------------------------------------------
Nguyen Dich Long,
School of Information and Communication Technology (SoICT -
https://www.soict.hust.edu.vn)
Hanoi University of Science and Technology (https://www.hust.edu.vn)
601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha Noi,
Vietnam
Tel: +84 (0)3.54.41.76.76
Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com

Reply via email to