[ 
https://issues.apache.org/jira/browse/FLINK-21208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279400#comment-17279400
 ] 

Huang Xingbo commented on FLINK-21208:
--------------------------------------

 [~liuyufei] The serialization protocol provided by arrow is to serialize the 
schema info into the header before transmitting data. This is actually a 
stateful serializer. But for beam, it requires your serializer to be stateless. 
Both of them are not wrong and have their own considerations, but when used in 
combination, there will be problems, unless you transmit a schema for each 
arrow batch.


> pyarrow exception when using window with pandas udaf
> ----------------------------------------------------
>
>                 Key: FLINK-21208
>                 URL: https://issues.apache.org/jira/browse/FLINK-21208
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.12.0, 1.12.1
>            Reporter: YufeiLiu
>            Priority: Major
>              Labels: pull-request-available
>
> I write a pyflink demo and execute in local environment, the logic is 
> simple:generate records and aggerate in 100s tumle window, using a pandas 
> udaf.
> But the job failed after several minutes, I don't think it's a resource 
> problem because the amount of data is small, here is the error trace.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> Caught exception while processing timer.
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1108)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1082)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1213)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:302)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: TimerException{java.lang.RuntimeException: Failed to close remote 
> bundle}
>       ... 11 more
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325)
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291)
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>       ... 10 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 3: Traceback (most recent call last):
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
>     response = task()
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 978, in process_bundle
>     element.data)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 292, in 
> apache_beam.runners.worker.operations.Operation.process
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py",
>  line 73, in process
>     for value in o.value:
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 625, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream, 
> in_stream.read_var_int64())
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 636, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types, 
> [next(self._batch_reader)])
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 629, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 
> 146, in open_stream
>     return RecordBatchStreamReader(source)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 
> 62, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 360, in 
> pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 123, in 
> pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
>       at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369)
>       ... 15 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
> instruction 3: Traceback (most recent call last):
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
>     response = task()
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 978, in process_bundle
>     element.data)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 292, in 
> apache_beam.runners.worker.operations.Operation.process
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py",
>  line 73, in process
>     for value in o.value:
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 625, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream, 
> in_stream.read_var_int64())
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 636, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types, 
> [next(self._batch_reader)])
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 629, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 
> 146, in open_stream
>     return RecordBatchStreamReader(source)
>   File 
> "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 
> 62, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 360, in 
> pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 123, in 
> pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> {code}
> And my test code:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import *
> from pyflink.table.udf import udaf, AggregateFunction
> from pyflink.table.window import Tumble
> class MyTestAggregateFunction(AggregateFunction):
>     def get_value(self, accumulator):
>         return accumulator[0]
>     def create_accumulator(self):
>         return Row(0)
>     def accumulate(self, accumulator, *args):
>         accumulator[0] = len(args[0])
>     def get_result_type(self):
>         return DataTypes.BIGINT()
> if __name__ == '__main__':
>     env = StreamExecutionEnvironment.get_execution_environment()
>     f_s_settings = 
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
>     t_env = StreamTableEnvironment.create(env, None, f_s_settings)
>     my_udaf = udaf(MyTestAggregateFunction(), func_type="pandas")
>     t_env.register_function('my_udaf', my_udaf)
>     t_env.sql_update("""
>     CREATE TABLE `source_table` (
>         `header` STRING,
>         ts AS PROCTIME()
>     ) WITH (
>           'connector' = 'datagen',
>           'rows-per-second' = '100'
>     )
>     """)
>     t_env.sql_update("""
>     CREATE TABLE `sink_table` (
>         `content` BIGINT,
>         `wstart` TIMESTAMP(3)
>     ) WITH (
>         'connector' = 'print'
>     )
>     """)
>     t_env.scan("source_table") \
>         .window(Tumble.over("100.second").on("ts").alias("w")) \
>         .group_by('w') \
>         .select("my_udaf(header), w.start")\
>         .insert_into("sink_table")
>     t_env.execute("test_job")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to