Hi Curt, Could you try if it works by reducing python.fn-execution.bundle.size to 1000 or 100?
Regards, Dian On Thu, Oct 14, 2021 at 2:47 AM Curt Buechter <tricksho...@gmail.com> wrote: > Hi guys, > I'm still running into this problem. I checked the logs, and there is no > evidence that the python process crashed. I checked the process IDs and > they are still active after the error. No `killed process` messages in > /var/log/messages. > > I don't think it's necessarily related to checkpointing. I noticed > https://issues.apache.org/jira/browse/FLINK-24123 and thought it was > possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly) > same error, but now the error happens outside the context of performing the > checkpointing operation. > > I tried reducing python.fn-execution.bundle.size to 10,000 (default > 100,000), and no luck there, either. > > 2021-10-13 13:39:19 > java.lang.RuntimeException: Error while waiting for > BeamPythonFunctionRunner flush > at org.apache.flink.streaming.api.operators.python. > AbstractPythonFunctionOperator.invokeFinishBundle( > AbstractPythonFunctionOperator.java:361) > at org.apache.flink.streaming.api.operators.python. > AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount( > AbstractPythonFunctionOperator.java:321) > at org.apache.flink.streaming.api.operators.python. > AbstractOneInputPythonFunctionOperator.processElement( > AbstractOneInputPythonFunctionOperator.java:139) > at org.apache.flink.streaming.api.operators.python. > PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java: > 176) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask > .java:233) > at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput > .processElement(AbstractStreamTaskNetworkInput.java:134) > at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput > .emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:496) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:203) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:809) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > .java:761) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Failed to close remote bundle > at org.apache.flink.streaming.api.runners.python.beam. > BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377) > at org.apache.flink.streaming.api.runners.python.beam. > BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361) > at org.apache.flink.streaming.api.operators.python. > AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2( > AbstractPythonFunctionOperator.java:340) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor > .java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor > .java:624) > ... 1 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: > CANCELLED: cancelled before receiving half close > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture > .java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) > at org.apache.beam.runners.fnexecution.control. > SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java: > 504) > at org.apache.beam.runners.fnexecution.control. > DefaultJobBundleFactory$SimpleStageBundleFactory$1.close( > DefaultJobBundleFactory.java:555) > at org.apache.flink.streaming.api.runners.python.beam. > BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375) > ... 7 more > Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > StatusRuntimeException: CANCELLED: cancelled before receiving half close > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException( > Status.java:524) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub. > ServerCalls$StreamingServerCallHandler$StreamingServerCallListener > .onCancel(ServerCalls.java:275) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > PartialForwardingServerCallListener.onCancel( > PartialForwardingServerCallListener.java:40) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23 > ) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel( > ForwardingServerCallListener.java:40) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal. > ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl > .java:353) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal. > ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal. > ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed > .runInContext(ServerImpl.java:867) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable > .run(ContextRunnable.java:37) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal. > SerializingExecutor.run(SerializingExecutor.java:123) > ... 3 more > > On Thu, Sep 23, 2021 at 9:07 PM Dian Fu <dian0511...@gmail.com> wrote: > >> I agree with Roman that it seems that the Python process has crashed. >> >> Besides the suggestions from Roman, I guess you could also try to >> configure the bundle size to smaller value via >> “python.fn-execution.bundle.size”. >> >> Regards, >> Dian >> >> > 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道: >> > >> > Hi, >> > >> > Is it possible that the python process crashed or hung up? (probably >> > performing a snapshot) >> > Could you validate this by checking the OS logs for OOM killer >> > messages or process status? >> > >> > Regards, >> > Roman >> > >> > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tricksho...@gmail.com> >> wrote: >> >> >> >> Hi, >> >> I'm getting an error after enabling checkpointing in my pyflink >> application that uses a keyed stream and rocksdb state. >> >> >> >> Here is the error message: >> >> >> >> 2021-09-22 16:18:14,408 INFO >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - >> Closed RocksDB State Backend. Cleaning up RocksDB working directory >> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39. >> >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task >> [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34 >> (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with >> failure cause: java.io.IOException: Could not perform checkpoint 2 for >> operator KEYED PROCESS -> Sink: Unnamed (1/1)#34. >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) >> >> at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) >> >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) >> >> at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.lang.RuntimeException: Failed to close remote bundle >> >> at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383) >> >> at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331) >> >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320) >> >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415) >> >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) >> >> ... 19 more >> >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: >> CANCELLED: cancelled before receiving half close >> >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> >> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) >> >> at >> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504) >> >> at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555) >> >> at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381) >> >> ... 28 more >> >> Caused by: >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: >> CANCELLED: cancelled before receiving half close >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >> >> at >> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> ... 1 more >> >> >> >> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34 >> (8f4fd40e863dd058822060dc3cf98831). >> >> 2021-09-22 16:18:14,411 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task >> and sending final execution state FAILED to JobManager for task KEYED >> PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831. >> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Attempting to cancel task Source: Custom Source -> >> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831). >> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 >> (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING. >> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Triggering cancellation of task code Source: Custom Source -> >> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831). >> >> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Ignoring checkpoint aborted notification for non-running task Source: >> Custom Source -> _stream_key_by_map_operator (1/1)#34. >> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics >> [] - Metrics scheduler closed >> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics >> [] - Closing reporter org.apache.kafka.common.metrics.JmxReporter >> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics >> [] - Metrics reporters closed >> >> >> >> >> >> And, here is some source code: >> >> >> >> class DedupeFunction(FlatMapFunction): >> >> def __init__(self, schema): >> >> super().__init__() >> >> self.logger = None >> >> self.state = None >> >> self.my_state = None >> >> self.schema = schema >> >> self.metric_columns = [c.column_name for c in schema.columns if >> c.is_metric] >> >> >> >> def open(self, runtime_context: RuntimeContext): >> >> self.logger = logging >> >> self.logger.info('Opening the FlatMapFunction') >> >> descriptor = MapStateDescriptor("my_map_state_descriptor", >> Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY()) >> >> self.state = runtime_context.get_map_state(descriptor) >> >> >> >> def flat_map(self, value): >> >> try: >> >> if not self.state.is_empty(): >> >> # self.logger.info('key in state') >> >> previous_dict = {} >> >> for k, v in self.state.items(): >> >> # reverse the metric columns >> >> if k in self.metric_columns: >> >> if v: >> >> v = -v >> >> previous_dict[k] = v >> >> yield Row(**previous_dict) >> >> new_dict = value.as_dict() >> >> self.state.put_all(new_dict.items()) >> >> yield value >> >> except Exception as ex: >> >> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}') >> >> >> >> class PrimaryKeySelector(KeySelector): >> >> >> >> def __init__(self, primary_key): >> >> self.__primary_key__ = primary_key >> >> >> >> def get_key(self, kv_obj): >> >> return kv_obj.as_dict().get(self.__primary_key__) >> >> >> >> >> >> >> >> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR')) >> >> >> >> self.__env__.set_state_backend(backend) >> >> >> >> >> >> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__), >> key_type_info=primary_key_type_info) >> >> >> >> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__), >> output_type=type_info) >> >> >> >> >> >> This program works fine if checkpointing is not enabled. Any advice >> here? >> >> >> >> >> >> Thanks >> >>