Shiao-An Yuan created FLINK-24587: ------------------------------------- Summary: Let PubSub source support changing subscriptions Key: FLINK-24587 URL: https://issues.apache.org/jira/browse/FLINK-24587 Project: Flink Issue Type: Improvement Components: Connectors / Google Cloud PubSub Affects Versions: 1.12.2 Reporter: Shiao-An Yuan
Original post on user mailing list: [link|https://lists.apache.org/thread.html/ra3047e5105fccbea42de6c37d52d05b492af496bd0bb95cc534630de%40%3Cuser.flink.apache.org%3E] After resuming a Flink application from a snapshot with a *new subscription*, I got following errors repeatedly. {code:java} org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have passed a subscription that does not belong to the given ack ID (resource=projects/xxxxx/subscriptions/xxxx). at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244) ~[?:?] at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225) ~[?:?] at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142) ~[?:?] at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628) ~[?:?] at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99) ~[?:?] at org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84) ~[?:?] at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208) ~[?:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?] {code} As I see it, the [AckId|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L149] became invalid as long as we change to another subscription. I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow: # output a checkpoint/savepoint which contains non-acknowledged message's ackIds # If the checkpoint/savepoint success, do the ack ([s|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L84]rc) # remove those ackIds from state ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L87]) If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete. In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)