Hi Dc, Thank you for your feedback.
1. Currently, only built-in types are supported in Python DataStream API, however, you can apply a Row type to represent a custom Python class as a workaround that field names stand for the name of member variables and field types stand for the type of member variables. 2. Could you please provide the full executed command line and which kind of cluster you are running (standalone/yarn/k8s)? Various command lines to submit a Pylink job are shown in https://ci.apache.org/projects/flink/flink-docs-master/deployment/cli.html#submitting-pyflink-jobs . The attachment is an example code for a Python DataStream API job, for your information. Best, Shuiqiang Dc Zhao (BLOOMBERG/ 120 PARK) <dzha...@bloomberg.net> 于2021年1月14日周四 下午1:00写道: > Hi Flink Community: > We are using the pyflink to develop a POC for our project. We encountered > some questions while using the flink. > > We are using the flink version 1.2, python3.7, data stream API > > 1. Do you have examples of providing a python customized class as a > `result type`? Based on the documentation research, we found out only > built-in types are supported in Python. Also, what is the payload size > limitation inside the flink, do we have a recommendation for that? > > 2. Do you have examples of `flink run --python` data stream API codes to > the cluster? We tried to do that, however the process hangs on a `socket > read from the java gateway`, due to the lack of the missing logs, we are > not sure what is missing while submitting the job. > > > > > Regards > Dc > > > << {CH} {TS} Anything that can possibly go wrong, it does. >> >
from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, ProcessFunction def datastream_processfunction_example(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.get_config().set_auto_watermark_interval(2000) env.set_stream_time_characteristic(TimeCharacteristic.EventTime) data_stream = env.from_collection([(1, '1603708211000'), (2, '1603708224000'), (3, '1603708226000'), (4, '1603708289000')], type_info=Types.ROW([Types.INT(), Types.STRING()])) class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp) -> int: return int(value[1]) class MyProcessFunction(ProcessFunction): def process_element(self, value, ctx): current_timestamp = ctx.timestamp() current_watermark = ctx.timer_service().current_watermark() yield "current timestamp: {}, current watermark: {}, current_value: {}" \ .format(str(current_timestamp), str(current_watermark), str(value)) def on_timer(self, timestamp, ctx, out): pass watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ .with_timestamp_assigner(MyTimestampAssigner()) data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ .process(MyProcessFunction(), output_type=Types.STRING()).print() env.execute("Python DataStream Example") if __name__ == '__main__': datastream_processfunction_example()