Hi Yik San, Make sense to me. :)
Regards, Dian > 2021年4月27日 下午9:50,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > Hi Dian, > > Wow, this is unexpected 😮 How about adding documentations to Python UDF about > this? Again it can be time consuming to figure this out. Maybe: > > To be able to run Python UDFs in any non-local mode, it is recommended to > include your UDF definitions using -pyfs config option, if your UDFs live > outside of the file where the main() function is defined. > > What do you think? > > Best, > Yik San > > On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to > serialize the Python UDF. > > For the latter case, I guess the whole Python UDF implementation will be > serialized. However, for the previous case, only the path of the class is > serialized. > > Regards, > Dian > >> 2021年4月27日 下午8:52,Yik San Chan <evan.chanyik...@gmail.com >> <mailto:evan.chanyik...@gmail.com>> 写道: >> >> Hi Dian, >> >> Thanks! Adding -pyfs definitely helps. >> >> However, I am curious. If I define my udf this way: >> >> ```python >> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) >> def decrypt(s): >> import pandas as pd >> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, >> index_col=0, squeeze=True).to_dict() >> return d.get(s, "unknown") >> ``` >> >> I can `flink run` without having to specify -pyfs option. The code can also >> be found in the commit >> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607 >> >> <https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607>. >> I wonder why? >> >> Best, >> Yik San >> >> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <dian0511...@gmail.com >> <mailto:dian0511...@gmail.com>> wrote: >> Hi Yik San, >> >> From the exception message, it’s clear that it could not find module >> `decrypt_fun` during execution. >> >> You need to specify file `decrypt_fun.py` as a dependency during submitting >> the job, e.g. via -pyfs command line arguments. Otherwise, this file will >> not be available during execution. >> >> Regards, >> Dian >> >>> 2021年4月27日 下午8:01,Yik San Chan <evan.chanyik...@gmail.com >>> <mailto:evan.chanyik...@gmail.com>> 写道: >>> >>> Hi, >>> >>> Here's the reproducible code sample: >>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 >>> >>> <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3> >>> >>> I implement my Python UDF by extending the ScalarFunction base class in a >>> separate file named decrypt_fun.py, and try to import the udf into my main >>> python file named udf_use_resource.py. >>> >>> However, after I `flink run`, I find the error log in TaskManager log: >>> >>> ``` >>> Caused by: java.lang.RuntimeException: Error received from SDK harness for >>> instruction 1: Traceback (most recent call last): >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 376, in get >>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop() >>> IndexError: pop from empty list >>> >>> During handling of the above exception, another exception occurred: >>> >>> Traceback (most recent call last): >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 253, in _execute >>> response = task() >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 310, in <lambda> >>> lambda: self.create_worker().do_instruction(request), request) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 480, in do_instruction >>> getattr(request, request_type), request.instruction_id) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 509, in process_bundle >>> instruction_id, request.process_bundle_descriptor_id) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 382, in get >>> self.data_channel_factory) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 847, in __init__ >>> self.ops = self.create_execution_tree(self.process_bundle_descriptor) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 902, in create_execution_tree >>> descriptor.transforms, key=topological_height, reverse=True) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 901, in <listcomp> >>> (transform_id, get_operation(transform_id)) for transform_id in sorted( >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 791, in wrapper >>> result = cache[args] = func(*args) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 885, in get_operation >>> pcoll_id in descriptor.transforms[transform_id].outputs.items() >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 885, in <dictcomp> >>> pcoll_id in descriptor.transforms[transform_id].outputs.items() >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 883, in <listcomp> >>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 791, in wrapper >>> result = cache[args] = func(*args) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 888, in get_operation >>> transform_id, transform_consumers) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 1174, in create_operation >>> return creator(self, transform_id, transform_proto, payload, consumers) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", >>> line 39, in create_scalar_function >>> operations.ScalarFunctionOperation) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", >>> line 166, in _create_user_defined_function_operation >>> internal_operation_cls) >>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in >>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ >>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in >>> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ >>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in >>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", >>> line 91, in __init__ >>> super(ScalarFunctionOperation, self).__init__(spec) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", >>> line 56, in __init__ >>> self.func, self.user_defined_funcs = >>> self.generate_func(self.spec.serialized_fn) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", >>> line 105, in generate_func >>> [operation_utils.extract_user_defined_function(udf) for udf in >>> serialized_fn.udfs]) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", >>> line 105, in <listcomp> >>> [operation_utils.extract_user_defined_function(udf) for udf in >>> serialized_fn.udfs]) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", >>> line 86, in extract_user_defined_function >>> user_defined_func = pickle.loads(user_defined_function_proto.payload) >>> File >>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", >>> line 29, in loads >>> return cloudpickle.loads(payload) >>> ModuleNotFoundError: No module named 'decrypt_fun' >>> >>> at >>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>> >>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> ~[?:1.8.0_282] >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> ~[?:1.8.0_282] >>> ... 1 more >>> ``` >>> >>> I wonder why? If I move the Decrypt class into udf_use_resource.py, >>> everything works just fine. >>> >>> Thank you! >>> >>> Best, >>> Yik San >> >