[ https://issues.apache.org/jira/browse/FLINK-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-12376: ----------------------------------- Component/s: (was: FileSystems) Connectors / Google Cloud PubSub > GCS runtime exn: Request payload size exceeds the limit > ------------------------------------------------------- > > Key: FLINK-12376 > URL: https://issues.apache.org/jira/browse/FLINK-12376 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub > Affects Versions: 1.7.2 > Environment: FROM flink:1.8.0-scala_2.11 > ARG version=0.17 > ADD > https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar > /opt/flink/lib > COPY target/analytics-${version}.jar /opt/flink/lib/analytics.jar > Reporter: Henrik > Assignee: Richard Deurwaarder > Priority: Major > Attachments: Screenshot 2019-04-30 at 22.32.34.png > > > I'm trying to use the google cloud storage file system, but it would seem > that the FLINK / GCS client libs are creating too-large requests far down in > the GCS Java client. > The Java client is added to the lib folder with this command in Dockerfile > (probably > [hadoop2-1.9.16|https://search.maven.org/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop2-1.9.16/jar] > at the time of writing): > > {code:java} > ADD > https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar > /opt/flink/lib{code} > This is the crash output. Focus lines: > {code:java} > java.lang.RuntimeException: Error while confirming checkpoint{code} > and > {code:java} > Caused by: com.google.api.gax.rpc.InvalidArgumentException: > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size > exceeds the limit: 524288 bytes.{code} > Full stacktrace: > > {code:java} > [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO > org.apache.flink.runtime.taskmanager.Task - Source: > Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) > (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED. > [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error > while confirming checkpoint > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [analytics-867c867ff6-l622h taskmanager] at > java.lang.Thread.run(Thread.java:748) > [analytics-867c867ff6-l622h taskmanager] Caused by: > com.google.api.gax.rpc.InvalidArgumentException: > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size > exceeds the limit: 524288 bytes. > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:748) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:507) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:694) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [analytics-867c867ff6-l622h taskmanager] ... 3 more > [analytics-867c867ff6-l622h taskmanager] Suppressed: > com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112) > [analytics-867c867ff6-l622h taskmanager] at > okr.sources.PubSubSource.acknowledgeSessionIDs(PubSubSource.java:122) > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.acknowledgeIDs(MultipleIdsMessageAcknowledgingSourceBase.java:122) > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.notifyCheckpointComplete(MessageAcknowledgingSourceBase.java:231) > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1206) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > [analytics-867c867ff6-l622h taskmanager] ... 3 more > {code} > The file system is configured as such in `conf/flink-conf.yaml`: > > {code:java} > state.backend: rocksdb > state.checkpoints.num-retained: 3 > state.checkpoints.dir: gs://example_bucket/flink/checkpoints > state.savepoints.dir: gs://example_bucket/flink/savepoints > state.backend.incremental: true > {code} > ...and the checkpoints that are created before the crash are small in size: > > !Screenshot 2019-04-30 at 22.32.34.png! I'll be testing with Flink 1.8.0 as > well. > The pom.xml config: > {code:java} > <!-- > https://stackoverflow.com/questions/51860988/flink-checkpoints-to-google-cloud-storage > --> > <!-- https://search.maven.org/search?q=a:flink-statebackend-rocksdb_2.11 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <!-- https://search.maven.org/search?q=g:com.google.cloud.bigdataoss --> > <!-- > https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/pubsub/README.md > --> > <!-- Cloud Storage: --> > <dependency> > <groupId>com.google.cloud.bigdataoss</groupId> > <artifactId>gcs-connector</artifactId> > <version>hadoop2-1.9.16</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-filesystem_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)