hi flink community~ I came across a problem I didn't understand,I can't use pyflink aggfuction function properly in window tvf, The following are available: java aggfuntion
flink system aggfunction window (not window tvf) I want to know if this is a bug or if I'm using it the wrong way?Thanks! from datetime import datetime, timedelta from pyflink.table import AggregateFunction from pyflink.common.typeinfo import Types from pyflink.common import Row from pyflink.table import Schema, StreamTableEnvironment from pyflink.datastream import StreamExecutionEnvironment class Sum0(AggregateFunction): def get_value(self, accumulator): return accumulator[0] def create_accumulator(self): return Row(0) def accumulate(self, accumulator, *args): if args[0] is not None: accumulator[0] += args[0] def retract(self, accumulator, *args): if args[0] is not None: accumulator[0] -= args[0] def merge(self, accumulator, accumulators): for acc in accumulators: accumulator[0] += acc[0] def get_result_type(self): return "BIGINT" def get_accumulator_type(self): return 'ROW' def test_py_udf_kafka(): # stream env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar") env.set_parallelism(1) table_env = StreamTableEnvironment.create(stream_execution_environment=env) ds = env.from_collection( collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)), (2, 3, "Lee", datetime.now() - timedelta(hours=4)), (3, 4, "Jay", datetime.now() - timedelta(hours=4)), (5, 6, "Jay", datetime.now() - timedelta(hours=2)), (7, 8, "Lee", datetime.now())], type_info=Types.ROW([Types.INT(), Types.INT(), Types.STRING(), Types.SQL_TIMESTAMP()])) table_schema = Schema.new_builder() \ .column("f0", "INT") \ .column("f1", "INT") \ .column("f2", "STRING") \ .column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \ .watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \ .build() ts = table_env.from_data_stream(ds, table_schema) \ .alias("value", "count", "name", "rowtime") print(“schema desc") ts.print_schema() sql_sink_dll_1 = """CREATE TABLE kafka_test( `name` string, `agg_data` bigint) with ( 'connector' = 'kafka', 'topic'='test_java2', 'properties.bootstrap.servers'='agent3:9092', 'value.format' = 'json' );""" table_env.execute_sql(sql_sink_dll_1) table_env.create_temporary_view("source", ts) table_env.create_temporary_function( "sum_udf", Sum0()) sql_query_system = """ select name,sum(`value`) as agg_data from TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS)) group by window_start, window_end, name """ sql_query = """ select name,sum_udf(`value`) as agg_data from TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS)) group by window_start, window_end, name """ print(table_env.explain_sql(sql_query)) table_env.sql_query(sql_query).execute().print() table_env.sql_query(sql_query).execute_insert("kafka_test").wait() if __name__ == "__main__": test_py_udf_kafka()