Guess my last reply didn't go through, so here goes again... Possibly, but I don't think so. Since I submitted this, I have done some more testing. It works fine with file system or memory state backends, but not with rocksdb. I will try again and check the logs, though. I've also tested rocksdb checkpointing on other jobs, and it works fine. But when I combine rocksdb with the keyed stream, it fails.
Thanks for the suggestions, I'll look into them. On Thu, Sep 23, 2021 at 9:07 PM Dian Fu <dian0511...@gmail.com> wrote: > I agree with Roman that it seems that the Python process has crashed. > > Besides the suggestions from Roman, I guess you could also try to > configure the bundle size to smaller value via > “python.fn-execution.bundle.size”. > > Regards, > Dian > > > 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道: > > > > Hi, > > > > Is it possible that the python process crashed or hung up? (probably > > performing a snapshot) > > Could you validate this by checking the OS logs for OOM killer > > messages or process status? > > > > Regards, > > Roman > > > > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tricksho...@gmail.com> > wrote: > >> > >> Hi, > >> I'm getting an error after enabling checkpointing in my pyflink > application that uses a keyed stream and rocksdb state. > >> > >> Here is the error message: > >> > >> 2021-09-22 16:18:14,408 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - > Closed RocksDB State Backend. Cleaning up RocksDB working directory > /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39. > >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task > [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34 > (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with > failure cause: java.io.IOException: Could not perform checkpoint 2 for > operator KEYED PROCESS -> Sink: Unnamed (1/1)#34. > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) > >> at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) > >> at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > >> at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > >> at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > >> at java.lang.Thread.run(Thread.java:748) > >> Caused by: java.lang.RuntimeException: Failed to close remote bundle > >> at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383) > >> at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331) > >> at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320) > >> at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175) > >> at > org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415) > >> at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) > >> at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) > >> ... 19 more > >> Caused by: java.util.concurrent.ExecutionException: > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: > CANCELLED: cancelled before receiving half close > >> at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > >> at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > >> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) > >> at > org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504) > >> at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555) > >> at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381) > >> ... 28 more > >> Caused by: > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: > CANCELLED: cancelled before receiving half close > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > >> at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > >> at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> ... 1 more > >> > >> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task > [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34 > (8f4fd40e863dd058822060dc3cf98831). > >> 2021-09-22 16:18:14,411 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task > and sending final execution state FAILED to JobManager for task KEYED > PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831. > >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task > [] - Attempting to cancel task Source: Custom Source -> > _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831). > >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task > [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 > (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING. > >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task > [] - Triggering cancellation of task code Source: Custom Source -> > _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831). > >> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task > [] - Ignoring checkpoint aborted notification for non-running task Source: > Custom Source -> _stream_key_by_map_operator (1/1)#34. > >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] > - Metrics scheduler closed > >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] > - Closing reporter org.apache.kafka.common.metrics.JmxReporter > >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] > - Metrics reporters closed > >> > >> > >> And, here is some source code: > >> > >> class DedupeFunction(FlatMapFunction): > >> def __init__(self, schema): > >> super().__init__() > >> self.logger = None > >> self.state = None > >> self.my_state = None > >> self.schema = schema > >> self.metric_columns = [c.column_name for c in schema.columns if > c.is_metric] > >> > >> def open(self, runtime_context: RuntimeContext): > >> self.logger = logging > >> self.logger.info('Opening the FlatMapFunction') > >> descriptor = MapStateDescriptor("my_map_state_descriptor", > Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY()) > >> self.state = runtime_context.get_map_state(descriptor) > >> > >> def flat_map(self, value): > >> try: > >> if not self.state.is_empty(): > >> # self.logger.info('key in state') > >> previous_dict = {} > >> for k, v in self.state.items(): > >> # reverse the metric columns > >> if k in self.metric_columns: > >> if v: > >> v = -v > >> previous_dict[k] = v > >> yield Row(**previous_dict) > >> new_dict = value.as_dict() > >> self.state.put_all(new_dict.items()) > >> yield value > >> except Exception as ex: > >> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}') > >> > >> class PrimaryKeySelector(KeySelector): > >> > >> def __init__(self, primary_key): > >> self.__primary_key__ = primary_key > >> > >> def get_key(self, kv_obj): > >> return kv_obj.as_dict().get(self.__primary_key__) > >> > >> > >> > >> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR')) > >> > >> self.__env__.set_state_backend(backend) > >> > >> > >> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__), > key_type_info=primary_key_type_info) > >> > >> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__), > output_type=type_info) > >> > >> > >> This program works fine if checkpointing is not enabled. Any advice > here? > >> > >> > >> Thanks > >