WeiZhong94 commented on a change in pull request #15212: URL: https://github.com/apache/flink/pull/15212#discussion_r598400948
########## File path: flink-python/pyflink/common/types.py ########## @@ -141,6 +141,12 @@ def set_row_kind(self, row_kind: RowKind): def set_field_names(self, field_names: List): self._fields = field_names + def is_retract_msg(self): Review comment: add "_" prefix? These method should not be public API. ########## File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx ########## @@ -107,8 +118,11 @@ cdef class AggregateFunctionRowCoderImpl(FlattenRowCoderImpl): self._encode_internal_row(value, output_stream) else: for result in results: - for value in result: - self._encode_internal_row(value, output_stream) + if isinstance(result[0], InternalRow): Review comment: If the input is not InternalRow we can just use the FlattenRowCoder directly. ########## File path: flink-python/pyflink/fn_execution/operations.py ########## @@ -406,6 +420,121 @@ def create_process_function(self, user_defined_aggs, input_extractors, filter_ar self.index_of_count_star) +class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation): + def __init__(self, spec, keyed_state_backend): + self._window = spec.serialized_fn.group_window + self._named_property_extractor = self._create_named_property_function() + self._is_time_window = None + super(StreamGroupWindowAggregateOperation, self).__init__(spec, keyed_state_backend) + + def create_process_function(self, user_defined_aggs, input_extractors, filter_args, + distinct_indexes, distinct_view_descriptors, key_selector, + state_value_coder): + self._is_time_window = self._window.is_time_window + if self._window.window_type == flink_fn_execution_pb2.GroupWindow.TUMBLING_GROUP_WINDOW: + if self._is_time_window: + window_assigner = TumblingWindowAssigner( + self._window.window_size, 0, self._window.is_row_time) + else: + window_assigner = CountTumblingWindowAssigner(self._window.window_size) + elif self._window.window_type == flink_fn_execution_pb2.GroupWindow.SLIDING_GROUP_WINDOW: + raise Exception("General Python UDAF in Sliding window will be implemented in " + "FLINK-21629") + else: + raise Exception("General Python UDAF in Sessiong window will be implemented in " + "FLINK-21630") + if self._is_time_window: + if self._window.is_row_time: + trigger = EventTimeTrigger() + else: + trigger = ProcessingTimeTrigger() + else: + trigger = CountTrigger(self._window.window_size) + + window_aggregator = SimpleNamespaceAggsHandleFunction( + user_defined_aggs, + input_extractors, + self.index_of_count_star, + self.count_star_inserted, + self._named_property_extractor, + self.data_view_specs, + filter_args, + distinct_indexes, + distinct_view_descriptors) + return GroupWindowAggFunction( + self._window.allowedLateness, + key_selector, + self.keyed_state_backend, + state_value_coder, + window_assigner, + window_aggregator, + trigger, + self._window.time_field_index) + + def process_element_or_timer(self, input_datas: List[Tuple[int, Row, int, Row, int, int]]): + results = [] + for input_data in input_datas: + if input_data[0] == NORMAL_RECORD: + self.group_agg_function.process_watermark(input_data[2]) + result_datas = self.group_agg_function.process_element(input_data[1]) + for result_data in result_datas: + result = [NORMAL_RECORD, result_data, None] + results.append(result) + timers = self.group_agg_function.get_timers() + for timer in timers: + timer_operand_type = timer[0] # type: TimerOperandType + internal_timer = timer[1] # type: InternalTimer + window = internal_timer.get_namespace() + key = internal_timer.get_key() + timestamp = internal_timer.get_timestamp() + if self._is_time_window: + timer_data = \ + [TRIGGER_TIMER, None, + [timer_operand_type.value, key, timestamp, window.start, window.end]] Review comment: Maybe we can use a bytes field to store the serialized window, so that the DataStream window implementation can reuse these part. ########## File path: flink-python/pyflink/fn_execution/window_process_function.py ########## @@ -0,0 +1,154 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from abc import abstractmethod +from typing import Generic, List + +from pyflink.common import Row +from pyflink.fn_execution.window_assigner import WindowAssigner +from pyflink.fn_execution.window_context import Context, K, W + +MAX_LONG_VALUE = sys.maxsize + + +def join_row(left: List, right: List): + return Row(*(left + right)) + + +class InternalWindowProcessFunction(Generic[K, W]): Review comment: It seems that this is a abstract class. So it should be a subclass of ABC. ########## File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java ########## @@ -198,6 +198,7 @@ public BeamPythonFunctionRunner( FlinkMetricContainer flinkMetricContainer, @Nullable KeyedStateBackend keyedStateBackend, @Nullable TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, Review comment: add Nullable ########## File path: flink-python/pyflink/fn_execution/operations.py ########## @@ -406,6 +420,121 @@ def create_process_function(self, user_defined_aggs, input_extractors, filter_ar self.index_of_count_star) +class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation): + def __init__(self, spec, keyed_state_backend): + self._window = spec.serialized_fn.group_window + self._named_property_extractor = self._create_named_property_function() + self._is_time_window = None + super(StreamGroupWindowAggregateOperation, self).__init__(spec, keyed_state_backend) + + def create_process_function(self, user_defined_aggs, input_extractors, filter_args, + distinct_indexes, distinct_view_descriptors, key_selector, + state_value_coder): + self._is_time_window = self._window.is_time_window + if self._window.window_type == flink_fn_execution_pb2.GroupWindow.TUMBLING_GROUP_WINDOW: + if self._is_time_window: + window_assigner = TumblingWindowAssigner( + self._window.window_size, 0, self._window.is_row_time) + else: + window_assigner = CountTumblingWindowAssigner(self._window.window_size) + elif self._window.window_type == flink_fn_execution_pb2.GroupWindow.SLIDING_GROUP_WINDOW: + raise Exception("General Python UDAF in Sliding window will be implemented in " + "FLINK-21629") + else: + raise Exception("General Python UDAF in Sessiong window will be implemented in " + "FLINK-21630") + if self._is_time_window: + if self._window.is_row_time: + trigger = EventTimeTrigger() + else: + trigger = ProcessingTimeTrigger() + else: + trigger = CountTrigger(self._window.window_size) + + window_aggregator = SimpleNamespaceAggsHandleFunction( + user_defined_aggs, + input_extractors, + self.index_of_count_star, + self.count_star_inserted, + self._named_property_extractor, + self.data_view_specs, + filter_args, + distinct_indexes, + distinct_view_descriptors) + return GroupWindowAggFunction( + self._window.allowedLateness, + key_selector, + self.keyed_state_backend, + state_value_coder, + window_assigner, + window_aggregator, + trigger, + self._window.time_field_index) + + def process_element_or_timer(self, input_datas: List[Tuple[int, Row, int, Row, int, int]]): + results = [] + for input_data in input_datas: + if input_data[0] == NORMAL_RECORD: + self.group_agg_function.process_watermark(input_data[2]) + result_datas = self.group_agg_function.process_element(input_data[1]) + for result_data in result_datas: + result = [NORMAL_RECORD, result_data, None] + results.append(result) + timers = self.group_agg_function.get_timers() + for timer in timers: + timer_operand_type = timer[0] # type: TimerOperandType + internal_timer = timer[1] # type: InternalTimer + window = internal_timer.get_namespace() + key = internal_timer.get_key() + timestamp = internal_timer.get_timestamp() + if self._is_time_window: + timer_data = \ + [TRIGGER_TIMER, None, + [timer_operand_type.value, key, timestamp, window.start, window.end]] + else: + timer_data = \ + [TRIGGER_TIMER, None, + [timer_operand_type.value, key, timestamp, window.id]] + results.append(timer_data) + else: + timestamp = input_data[2] + timer_data = input_data[3] + key = list(timer_data[1]) + timer_type = timer_data[0] + if self._is_time_window: Review comment: ditto ########## File path: flink-python/pyflink/fn_execution/window_trigger.py ########## @@ -0,0 +1,212 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from abc import abstractmethod +from typing import Generic + +from pyflink.common.typeinfo import Types +from pyflink.datastream.state import ValueStateDescriptor +from pyflink.fn_execution.window import TimeWindow, CountWindow +from pyflink.fn_execution.window_context import TriggerContext, W + + +class Trigger(Generic[W]): Review comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org