Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using 
the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am 
facing some issues, which I would like to address. If this goes too far, please 
let me know and I will open a new thread for each of the questions. Let me 
share some more information about my current environment, which will maybe help 
to answer the questions. I'm currently using my dev machine with Docker and one 
jobmanager container and one taskmanager container. If needed I can share the 
whole docker environment, but this would involve some more effort on my side. 
Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of 
instruction how to build them? I can't find them in the 1.12.0-rc1 release and 
when I build flink from source, I can't find the connector libraries in the 
build target. I need flink-sql-connector-elasticsearch7 and 
flink-sql-connector-kafka.

2. Which steps are needed to properly Setup PyFlink? I followed the 
instructions, but I always get some ClassNotFoundExceptions for some Beam 
related classes in the taskmanager. The job still works fine, but this doesn't 
look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve 
this by adding certain jars, but I wasn't able to fix it. Maybe you have an 
idea. You can find the Dockerfile attached, which lines out the steps I'm 
currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" 
java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

3. When increasing the size of the input data set I get the following Exception 
and the job is canceled. I tried to increase the resources assigned to flink, 
but it didn't help. Do you have an idea why this is happening? You can find a 
more detailed stack trace in apendix.

4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for 
me to debug it and I can't really find any valuable examples or documentation 
on the internet. Currently instead of creating an ARRAY I'm just returning a 
VARCHAR containing a string representation of the array. The relevant code you 
can find in the apendix.

5. How can I obtain the output of the Python interpreter executing the UDF. If 
I put a print statement in the UDF I can't see the output in the log of the 
taskmanager. Is there a way to access it?

I hope these aren't too many questions for this thread. If this is the case I 
can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

Kind Regards,
Niklas


Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on 
docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
RUN wget --no-verbose 
https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
 \
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: 
cancelled before receiving half close
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
CANCELLED: cancelled before receiving half close
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY 
riid"
################################################################

> On 12. Nov 2020, at 12:53, Dian Fu <dian0511...@gmail.com> wrote:
> 
> Hi Niklas,
> 
> Python DataStream API will also be supported in coming release of 1.12.0 [1]. 
> However, the functionalities are still limited for the time being compared to 
> the Java DataStream API, e.g. it will only support the stateless operations, 
> such as map, flat_map, etc.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/datastream_tutorial.html>
>> 在 2020年11月12日,下午7:46,Niklas Wilcke <niklas.wil...@uniberg.com 
>> <mailto:niklas.wil...@uniberg.com>> 写道:
>> 
>> Hi Dian,
>> 
>> thank you very much for this valuable response. I already read about the 
>> UDAF, but I wasn't aware of the fact that it is possible to return and 
>> UNNEST an array. I will definitely have a try and hopefully this will solve 
>> my issue.
>> 
>> Another question that came up to my mind is whether PyFlink supports any 
>> other API except Table and SQL, like the Streaming and Batch API. The 
>> documentation is only covering the Table API, but I'm not sure about that. 
>> Can you maybe tell me whether the Table and SQL API is the only one 
>> supported by PyFlink?
>> 
>> Kind Regards,
>> Niklas
>> 
>>  
>> 
>>> On 11. Nov 2020, at 15:32, Dian Fu <dian0511...@gmail.com 
>>> <mailto:dian0511...@gmail.com>> wrote:
>>> 
>>> Hi Niklas,
>>> 
>>> You are correct that the input/output length of Pandas UDF must be of the 
>>> same size and that Flink will split the input data into multiple bundles 
>>> for Pandas UDF and the bundle size is non-determinstic. Both of the above 
>>> two limitations are by design and so I guess Pandas UDF could not meet your 
>>> requirements.
>>> 
>>> However, you could take a look at if the Pandas UDAF[1] which was supported 
>>> in 1.12 could meet your requirements:
>>> - As group_by only generate one record per group key just as you said, you 
>>> could declare the output type of Pandas UDAF as an array type
>>> - You need then flatten the aggregation results, e.g. using UNNEST
>>> 
>>> NOTE: Flink 1.12 is still not released. You could try the PyFlink package 
>>> of RC1[2] for 1.12.0 or build it yourself according to [3].
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions>
>>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 
>>> <https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/>
>>> [3] 
>>> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink>
>>> 
>>> Regards,
>>> Dian
>>> 
>>>> 在 2020年11月11日,下午9:03,Niklas Wilcke <niklas.wil...@uniberg.com 
>>>> <mailto:niklas.wil...@uniberg.com>> 写道:
>>>> 
>>>> Hi Flink Community,
>>>> 
>>>> I'm currently trying to implement a parallel machine learning job with 
>>>> Flink. The goal is to train models in parallel for independent time series 
>>>> in the same data stream. For that purpose I'm using a Python library, 
>>>> which lead me to PyFlink. Let me explain the use case a bit more.
>>>> I want to implement a batch job, which partitions/groups the data by a 
>>>> device identifier. After that I need to process the data for each device 
>>>> all at once. There is no way to iteratively train the model unfortunately. 
>>>> The challenge I'm facing is to guarantee that all data belonging to a 
>>>> certain device is processed in one single step. I'm aware of the fact that 
>>>> this does not scale well, but for a reasonable amount of input data per 
>>>> device it should be fine from my perspective.
>>>> I investigated a lot and I ended up using the Table API and Pandas UDF, 
>>>> which roughly fulfil my requirements, but there are the following 
>>>> limitations left, which I wanted to talk about.
>>>> 
>>>> 1. Pandas UDF takes multiple Series as input parameters, which is fine for 
>>>> my purpose, but as far as I can see there is no way to guarantee that the 
>>>> chunk of data in the Series is "complete". Flink will slice the Series and 
>>>> maybe call the UDF multiple times for each device. As far as I can see 
>>>> there are some config options like "python.fn-execution.arrow.batch.size" 
>>>> and "python.fn-execution.bundle.time", which might help, but I'm not sure, 
>>>> whether this is the right path to take.
>>>> 2. The length of the input Series needs to be of the same size as the 
>>>> output Series, which isn't nice for my use case. What I would like to do 
>>>> is to process n rows and emit m rows. There shouldn't be any dependency 
>>>> between the number of input rows and the number of output rows.
>>>> 
>>>> 3. How do I partition the data stream. The Table API offers a groupby, but 
>>>> this doesn't serve my purpose, because I don't want to aggregate all the 
>>>> grouped lines. Instead as stated above I want to emit m result lines per 
>>>> group. Are there other options using the Table API or any other API to do 
>>>> this kind of grouping. I would need something like a "keyBy()" from the 
>>>> streaming API. Maybe this can be combined? Can I create a separate table 
>>>> for each key?
>>>> 
>>>> I'm also open to ideas for a completely different approach not using the 
>>>> Table API or Pandas UDF. Any idea is welcome.
>>>> 
>>>> You can find a condensed version of the source code attached.
>>>> 
>>>> Kind Regards,
>>>> Niklas
>>>> 
>>>> 
>>>> 
>>>> #############################################################
>>>> 
>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>> from pyflink.table import StreamTableEnvironment, DataTypes
>>>> from pyflink.table.udf import udf
>>>> 
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> env.set_parallelism(1)
>>>> t_env = StreamTableEnvironment.create(env)
>>>> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>>>>  True)
>>>> 
>>>> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
>>>>     result_type=DataTypes.FLOAT(), udf_type='pandas')
>>>> def forcast(ds_float_series, y):
>>>> 
>>>>    # Train the model and create the forcast
>>>> 
>>>>    yhat_ts = forcast['yhat'].tail(input_size)
>>>>    return yhat_ts
>>>> 
>>>> t_env.register_function("forcast", forcast)
>>>> 
>>>> # Define sink and source here
>>>> 
>>>> t_env.execute_sql(my_source_ddl)
>>>> t_env.execute_sql(my_sink_ddl)
>>>> 
>>>> # TODO: key_by instead of filter
>>>> t_env.from_path('mySource') \
>>>>    .where("riid === 'r1i1'") \
>>>>    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
>>>>    .insert_into('mySink')
>>>> 
>>>> t_env.execute("pandas_udf_demo")
>>>> 
>>>> #############################################################
>>>> 
>>>> 
>>> 
>> 
> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to