Hi Dian, thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.
Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink? Kind Regards, Niklas > On 11. Nov 2020, at 15:32, Dian Fu <dian0511...@gmail.com> wrote: > > Hi Niklas, > > You are correct that the input/output length of Pandas UDF must be of the > same size and that Flink will split the input data into multiple bundles for > Pandas UDF and the bundle size is non-determinstic. Both of the above two > limitations are by design and so I guess Pandas UDF could not meet your > requirements. > > However, you could take a look at if the Pandas UDAF[1] which was supported > in 1.12 could meet your requirements: > - As group_by only generate one record per group key just as you said, you > could declare the output type of Pandas UDAF as an array type > - You need then flatten the aggregation results, e.g. using UNNEST > > NOTE: Flink 1.12 is still not released. You could try the PyFlink package of > RC1[2] for 1.12.0 or build it yourself according to [3]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ > <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/> > [3] > https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink > > <https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink> > > Regards, > Dian > >> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com >> <mailto:niklas.wil...@uniberg.com>> 写道: >> >> Hi Flink Community, >> >> I'm currently trying to implement a parallel machine learning job with >> Flink. The goal is to train models in parallel for independent time series >> in the same data stream. For that purpose I'm using a Python library, which >> lead me to PyFlink. Let me explain the use case a bit more. >> I want to implement a batch job, which partitions/groups the data by a >> device identifier. After that I need to process the data for each device all >> at once. There is no way to iteratively train the model unfortunately. The >> challenge I'm facing is to guarantee that all data belonging to a certain >> device is processed in one single step. I'm aware of the fact that this does >> not scale well, but for a reasonable amount of input data per device it >> should be fine from my perspective. >> I investigated a lot and I ended up using the Table API and Pandas UDF, >> which roughly fulfil my requirements, but there are the following >> limitations left, which I wanted to talk about. >> >> 1. Pandas UDF takes multiple Series as input parameters, which is fine for >> my purpose, but as far as I can see there is no way to guarantee that the >> chunk of data in the Series is "complete". Flink will slice the Series and >> maybe call the UDF multiple times for each device. As far as I can see there >> are some config options like "python.fn-execution.arrow.batch.size" and >> "python.fn-execution.bundle.time", which might help, but I'm not sure, >> whether this is the right path to take. >> 2. The length of the input Series needs to be of the same size as the output >> Series, which isn't nice for my use case. What I would like to do is to >> process n rows and emit m rows. There shouldn't be any dependency between >> the number of input rows and the number of output rows. >> >> 3. How do I partition the data stream. The Table API offers a groupby, but >> this doesn't serve my purpose, because I don't want to aggregate all the >> grouped lines. Instead as stated above I want to emit m result lines per >> group. Are there other options using the Table API or any other API to do >> this kind of grouping. I would need something like a "keyBy()" from the >> streaming API. Maybe this can be combined? Can I create a separate table for >> each key? >> >> I'm also open to ideas for a completely different approach not using the >> Table API or Pandas UDF. Any idea is welcome. >> >> You can find a condensed version of the source code attached. >> >> Kind Regards, >> Niklas >> >> >> >> ############################################################# >> >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, DataTypes >> from pyflink.table.udf import udf >> >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> t_env = StreamTableEnvironment.create(env) >> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", >> True) >> >> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], >> result_type=DataTypes.FLOAT(), udf_type='pandas') >> def forcast(ds_float_series, y): >> >> # Train the model and create the forcast >> >> yhat_ts = forcast['yhat'].tail(input_size) >> return yhat_ts >> >> t_env.register_function("forcast", forcast) >> >> # Define sink and source here >> >> t_env.execute_sql(my_source_ddl) >> t_env.execute_sql(my_sink_ddl) >> >> # TODO: key_by instead of filter >> t_env.from_path('mySource') \ >> .where("riid === 'r1i1'") \ >> .select("ds, riid, y, forcast(ds, y) as yhat_90d") \ >> .insert_into('mySink') >> >> t_env.execute("pandas_udf_demo") >> >> ############################################################# >> >> >
smime.p7s
Description: S/MIME cryptographic signature