actually I had build/compile
- pyarrow==2.0.0 (test skipped)
- apache-beam==2.27.0 (test skipped)
on python 3.9, and test with example python jobs( bin/flink run
-pyclientexec python3.7 -pyexec python3.9 -py
examples/python/table/word_count.py )
but got exceptions following

Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
1: Traceback (most recent call last):
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 289, in _execute
    response = task()
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 606, in do_instruction
    return getattr(self, request_type)(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 637, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 463, in get
    processor = bundle_processor.BundleProcessor(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 921, in create_execution_tree
    return collections.OrderedDict([(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 812, in wrapper
    result = cache[args] = func(*args)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 903, in get_operation
    transform_consumers = {
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 904, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 812, in wrapper
    result = cache[args] = func(*args)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 908, in get_operation
    return transform_factory.create_operation(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
line 55, in create_table_function
    return _create_user_defined_function_operation(
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
line 199, in _create_user_defined_function_operation
    return beam_operation_cls(
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
line 124, in __init__
    super(TableFunctionOperation, self).__init__(spec)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
line 81, in __init__
    self.func, self.user_defined_funcs =
self.generate_func(self.spec.serialized_fn)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
line 134, in generate_func
    operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/utils/operation_utils.py",
line 137, in extract_user_defined_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/pickle.py",
line 29, in loads
    return cloudpickle.loads(payload)
TypeError: an integer is required (got type bytes)

        at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
        at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
      at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
        at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
        ... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 1: Traceback (most recent call last):
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 289, in _execute
    response = task()
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 606, in do_instruction
    return getattr(self, request_type)(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 637, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 463, in get
    processor = bundle_processor.BundleProcessor(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 921, in create_execution_tree
    return collections.OrderedDict([(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 812, in wrapper
    result = cache[args] = func(*args)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 903, in get_operation
    transform_consumers = {
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 904, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 812, in wrapper
    result = cache[args] = func(*args)
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 908, in get_operation
    return transform_factory.create_operation(
  File
"/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
line 55, in create_table_function
    return _create_user_defined_function_operation(
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
line 199, in _create_user_defined_function_operation
    return beam_operation_cls(
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
line 124, in __init__
    super(TableFunctionOperation, self).__init__(spec)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
line 81, in __init__
    self.func, self.user_defined_funcs =
self.generate_func(self.spec.serialized_fn)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
line 134, in generate_func
    operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/utils/operation_utils.py",
line 137, in extract_user_defined_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
  File
"/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/pickle.py",
line 29, in loads
    return cloudpickle.loads(payload)
TypeError: an integer is required (got type bytes)

        at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
        at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        ... 3 more

08:14:10.118 [main] ERROR org.apache.flink.client.python.PythonDriver - Run
python process failed
java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
~[flink-python_2.11-1.14.3.jar:1.14.3]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:?]
        at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
        at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
[flink-dist_2.11-1.14.3.jar:1.14.3]
08:14:10.129 [main] ERROR org.apache.flink.client.cli.CliFrontend - Fatal
error while running command line interface.
org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:?]
        at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
        at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
~[flink-dist_2.11-1.14.3.jar:1.14.3]
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
[flink-dist_2.11-1.14.3.jar:1.14.3]
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
~[?:?]
        ... 13 more
org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
        ... 13 more


It seems I should follow https://issues.apache.org/jira/browse/FLINK-25188
and test on Python 3.9


On Wed, Apr 6, 2022 at 10:27 AM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Martjin and Luan,
>
> As of now, the main reason why PyFlink has not declared to support Python
> 3.9 is that the dependent apache-beam, and the versions of numpy and
> pyarrow that apache-beam depends on do not provide corresponding whl
> packages in Python 3.9. Users need source code installation, but source
> code installation is really difficult to install successfully, especially
> pyarrow.
> If you can successfully install these dependencies on python3.9 through
> source installation, as far as I know, you can successfully run the pyflink
> job. We are also upgrading the versions of these dependencies[1], and then
> we can easily provide support for python 3.9 and Mac M1.
>
> [1] https://issues.apache.org/jira/browse/FLINK-25188
>
> Best,
> Xingbo
>
> Martijn Visser <martijnvis...@apache.org> 于2022年4月5日周二 18:50写道:
>
> > Hi Luan,
> >
> > According to the documentation Python 3.9 is currently indeed not
> > supported. I briefly checked the Jira tickets and also couldn't find one
> > about adding support for this, so I've created
> > https://issues.apache.org/jira/browse/FLINK-27058 for that.
> >
> > @dian0511...@gmail.com <dian0511...@gmail.com> @hxbks...@gmail.co
> > <hxbks...@gmail.com>m can you let us know your thoughts on this?
> > Especially if it's "just" a matter of upgrading dependencies, since we
> also
> > have another PyFlink ticket for that because it currently can't compile
> on
> > Mac M1 [1]
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> > https://github.com/MartijnVisser
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-25188
> >
> >
> > On Tue, 5 Apr 2022 at 11:40, Luan Cooper <gc.su...@gmail.com> wrote:
> >
> >> Hi
> >>
> >> currently I'll need to run pyflink udf on python 3.9 which is not
> >> supported
> >> right now
> >>
> >> I tried building
> >> - pyarrow==2.0.0
> >> - apache-beam==2.27.0
> >> on python 3.9 and test python jobs but failed
> >>
> >> Is there any discussions/git branch on python 3.9 before? (I didn't find
> >> any in this dev list)
> >> so I can continue working to fit 3.9
> >>
> >
>

Reply via email to