Hi Arvid, Thank you for the suggestion. I have created a ticket: https://issues.apache.org/jira/browse/FLINK-24587
Thanks, sayuan On Mon, Oct 18, 2021 at 4:45 PM Arvid Heise <ar...@apache.org> wrote: > Hi Sayuan, > > I'm not familiar with PubSub and can't assess if that's a valid request or > not. Maybe Niels can help as he worked on the last connector feature. > > In any case, you can create a ticket and even submit a PR if you want once > the ticket is assigned to you. > > Best, > > Arvid > > On Thu, Oct 14, 2021 at 12:08 PM Shiao-An Yuan <shiao.an.y...@gmail.com> > wrote: > >> Hi community, >> >> Google Cloud PubSub has a feature called snapshot[1], which allows us to >> apply snapshots to subscriptions. >> >> I recently have a requirement to update the "filter" of subscription, but >> "filter" is unable to modify once it is created. >> Therefore, I create a snapshot on the current subscription and apply it >> to a new subscription. >> >> After resuming the Flink application with the new subscription, I got >> following error repeatedly: >> ``` >> org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have >> passed a subscription that does not belong to the given ack ID >> (resource=projects/xxxxx/subscriptions/xxxx). >> at >> io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244) >> ~[?:?] >> at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225) >> ~[?:?] >> at >> io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142) ~[?:?] >> at >> com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628) >> ~[?:?] >> at >> org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99) >> ~[?:?] >> at >> org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84) >> ~[?:?] >> at >> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208) >> ~[?:?] >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at java.lang.Thread.run(Thread.java:834) ~[?:?] >> ``` >> >> I think the "ack ID" stored in savepoint became invalid after I changed >> the subscription. >> Since PubSub has an at-least-once guarantee, it seems safe to just ignore >> these errors, or even not saving "ack ID" in checkpoint/savepoint? >> >> I am new here. Is there any suggestion for follow-up? >> Can I just create a Jira ticket for this feature request? >> >> [1] https://cloud.google.com/pubsub/docs/replay-overview >> >> Thanks, >> sayuan >> >