Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions 
you raised, please find my reply inline.

Regards,
Dian

> 在 2020年11月13日,下午8:48,Niklas Wilcke <niklas.wil...@uniberg.com> 写道:
> 
> Hi Dian,
> 
> thanks again for your response. In the meantime I tried out your proposal 
> using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I 
> am facing some issues, which I would like to address. If this goes too far, 
> please let me know and I will open a new thread for each of the questions. 
> Let me share some more information about my current environment, which will 
> maybe help to answer the questions. I'm currently using my dev machine with 
> Docker and one jobmanager container and one taskmanager container. If needed 
> I can share the whole docker environment, but this would involve some more 
> effort on my side. Here are my five questions.
> 
> 1. Where can I find connector libraries for 1.12.0-rc1 or some kind of 
> instruction how to build them? I can't find them in the 1.12.0-rc1 release 
> and when I build flink from source, I can't find the connector libraries in 
> the build target. I need flink-sql-connector-elasticsearch7 and 
> flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: 
https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

> 2. Which steps are needed to properly Setup PyFlink? I followed the 
> instructions, but I always get some ClassNotFoundExceptions for some Beam 
> related classes in the taskmanager. The job still works fine, but this 
> doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to 
> resolve this by adding certain jars, but I wasn't able to fix it. Maybe you 
> have an idea. You can find the Dockerfile attached, which lines out the steps 
> I'm currently using. The Exceptions signature looks like this.
>    Exception in thread "grpc-nio-worker-ELG-3-2" 
> java.lang.NoClassDefFoundError: 
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have 
manually checked that this class should be there(inside 
flink-python_2.11-1.12.0.jar) and so guess if it's because you environment 
isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not 
clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 
1.12.0-rc1 again? You could also manually check that there should be only one 
flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: 
(flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
        jar tf flink-python_2.11-1.12.0.jar | grep 
"org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

> 3. When increasing the size of the input data set I get the following 
> Exception and the job is canceled. I tried to increase the resources assigned 
> to flink, but it didn't help. Do you have an idea why this is happening? You 
> can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this 
exception happens?

> 4. I can't manage to get the SQL UNNEST operation to work. It is quite hard 
> for me to debug it and I can't really find any valuable examples or 
> documentation on the internet. Currently instead of creating an ARRAY I'm 
> just returning a VARCHAR containing a string representation of the array. The 
> relevant code you can find in the apendix.

There are some examples here: 
https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

> 5. How can I obtain the output of the Python interpreter executing the UDF. 
> If I put a print statement in the UDF I can't see the output in the log of 
> the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output 
could then be found in the log of the task manager.

> I hope these aren't too many questions for this thread. If this is the case I 
> can still split some of them out. Please let me know, if this is the case.
> Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

> Kind Regards,
> Niklas
> 
> 
> Dockerfile for question 2.
> ####################################################################
> # This image has been build based on the Dockerfile used for the flink image 
> on docker hub.
> # The only change I applied is that I switched to flink 1.12.0-rc1.
> FROM flink:1.12.0-rc1-scala_2.12
> 
> # Install python
> # TODO: Minimize dependencies
> RUN apt-get update && apt-get install -y \
>     python3 \
>     python3-pip \
>     python3-dev \
>     zip \
>   && rm -rf /var/lib/apt/lists/* \
>   && ln -s /usr/bin/python3 /usr/bin/python \
>   && ln -s /usr/bin/pip3 /usr/bin/pip
> 
> # Install pyflink
> RUN wget --no-verbose 
> https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
>  
> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl>
>  \
>   && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
>   && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
> ####################################################################
> Stack Trace for question 3.
> ####################################################################
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>         at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
>         at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
>         at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
>         at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
>         at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
>         at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
>         at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>         at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
> CANCELLED: cancelled before receiving half close
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>         at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
>         at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
>         ... 17 more
> Caused by: 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
> CANCELLED: cancelled before receiving half close
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 more
> ################################################################
> Code for question 4.
> ################################################################
> # UDAF signature
> @udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>      result_type=DataTypes.VARCHAR(10000), func_type='pandas')
> def forcast(ds_float_series, y):
> 
> # SQL DDL
> "create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
> "create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"
> 
> # SQL INSERT
> "INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP 
> BY riid"
> ################################################################
> 
>> On 12. Nov 2020, at 12:53, Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> 
>> Hi Niklas,
>> 
>> Python DataStream API will also be supported in coming release of 1.12.0 
>> [1]. However, the functionalities are still limited for the time being 
>> compared to the Java DataStream API, e.g. it will only support the stateless 
>> operations, such as map, flat_map, etc.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html>
>>> 在 2020年11月12日,下午7:46,Niklas Wilcke <niklas.wil...@uniberg.com 
>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>> 
>>> Hi Dian,
>>> 
>>> thank you very much for this valuable response. I already read about the 
>>> UDAF, but I wasn't aware of the fact that it is possible to return and 
>>> UNNEST an array. I will definitely have a try and hopefully this will solve 
>>> my issue.
>>> 
>>> Another question that came up to my mind is whether PyFlink supports any 
>>> other API except Table and SQL, like the Streaming and Batch API. The 
>>> documentation is only covering the Table API, but I'm not sure about that. 
>>> Can you maybe tell me whether the Table and SQL API is the only one 
>>> supported by PyFlink?
>>> 
>>> Kind Regards,
>>> Niklas
>>> 
>>>  
>>> 
>>>> On 11. Nov 2020, at 15:32, Dian Fu <dian0511...@gmail.com 
>>>> <mailto:dian0511...@gmail.com>> wrote:
>>>> 
>>>> Hi Niklas,
>>>> 
>>>> You are correct that the input/output length of Pandas UDF must be of the 
>>>> same size and that Flink will split the input data into multiple bundles 
>>>> for Pandas UDF and the bundle size is non-determinstic. Both of the above 
>>>> two limitations are by design and so I guess Pandas UDF could not meet 
>>>> your requirements.
>>>> 
>>>> However, you could take a look at if the Pandas UDAF[1] which was 
>>>> supported in 1.12 could meet your requirements:
>>>> - As group_by only generate one record per group key just as you said, you 
>>>> could declare the output type of Pandas UDAF as an array type
>>>> - You need then flatten the aggregation results, e.g. using UNNEST
>>>> 
>>>> NOTE: Flink 1.12 is still not released. You could try the PyFlink package 
>>>> of RC1[2] for 1.12.0 or build it yourself according to [3].
>>>> 
>>>> [1] 
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions>
>>>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 
>>>> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/>
>>>> [3] 
>>>> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink>
>>>> 
>>>> Regards,
>>>> Dian
>>>> 
>>>>> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com 
>>>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>>>> 
>>>>> Hi Flink Community,
>>>>> 
>>>>> I'm currently trying to implement a parallel machine learning job with 
>>>>> Flink. The goal is to train models in parallel for independent time 
>>>>> series in the same data stream. For that purpose I'm using a Python 
>>>>> library, which lead me to PyFlink. Let me explain the use case a bit more.
>>>>> I want to implement a batch job, which partitions/groups the data by a 
>>>>> device identifier. After that I need to process the data for each device 
>>>>> all at once. There is no way to iteratively train the model 
>>>>> unfortunately. The challenge I'm facing is to guarantee that all data 
>>>>> belonging to a certain device is processed in one single step. I'm aware 
>>>>> of the fact that this does not scale well, but for a reasonable amount of 
>>>>> input data per device it should be fine from my perspective.
>>>>> I investigated a lot and I ended up using the Table API and Pandas UDF, 
>>>>> which roughly fulfil my requirements, but there are the following 
>>>>> limitations left, which I wanted to talk about.
>>>>> 
>>>>> 1. Pandas UDF takes multiple Series as input parameters, which is fine 
>>>>> for my purpose, but as far as I can see there is no way to guarantee that 
>>>>> the chunk of data in the Series is "complete". Flink will slice the 
>>>>> Series and maybe call the UDF multiple times for each device. As far as I 
>>>>> can see there are some config options like 
>>>>> "python.fn-execution.arrow.batch.size" and 
>>>>> "python.fn-execution.bundle.time", which might help, but I'm not sure, 
>>>>> whether this is the right path to take.
>>>>> 2. The length of the input Series needs to be of the same size as the 
>>>>> output Series, which isn't nice for my use case. What I would like to do 
>>>>> is to process n rows and emit m rows. There shouldn't be any dependency 
>>>>> between the number of input rows and the number of output rows.
>>>>> 
>>>>> 3. How do I partition the data stream. The Table API offers a groupby, 
>>>>> but this doesn't serve my purpose, because I don't want to aggregate all 
>>>>> the grouped lines. Instead as stated above I want to emit m result lines 
>>>>> per group. Are there other options using the Table API or any other API 
>>>>> to do this kind of grouping. I would need something like a "keyBy()" from 
>>>>> the streaming API. Maybe this can be combined? Can I create a separate 
>>>>> table for each key?
>>>>> 
>>>>> I'm also open to ideas for a completely different approach not using the 
>>>>> Table API or Pandas UDF. Any idea is welcome.
>>>>> 
>>>>> You can find a condensed version of the source code attached.
>>>>> 
>>>>> Kind Regards,
>>>>> Niklas
>>>>> 
>>>>> 
>>>>> 
>>>>> #############################################################
>>>>> 
>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>> from pyflink.table import StreamTableEnvironment, DataTypes
>>>>> from pyflink.table.udf import udf
>>>>> 
>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>> env.set_parallelism(1)
>>>>> t_env = StreamTableEnvironment.create(env)
>>>>> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>>>>>  True)
>>>>> 
>>>>> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>>>>>     result_type=DataTypes.FLOAT(), udf_type='pandas')
>>>>> def forcast(ds_float_series, y):
>>>>> 
>>>>>    # Train the model and create the forcast
>>>>> 
>>>>>    yhat_ts = forcast['yhat'].tail(input_size)
>>>>>    return yhat_ts
>>>>> 
>>>>> t_env.register_function("forcast", forcast)
>>>>> 
>>>>> # Define sink and source here
>>>>> 
>>>>> t_env.execute_sql(my_source_ddl)
>>>>> t_env.execute_sql(my_sink_ddl)
>>>>> 
>>>>> # TODO: key_by instead of filter
>>>>> t_env.from_path('mySource') \
>>>>>    .where("riid === 'r1i1'") \
>>>>>    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
>>>>>    .insert_into('mySink')
>>>>> 
>>>>> t_env.execute("pandas_udf_demo")
>>>>> 
>>>>> #############################################################
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to