becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-471952495 > Hello @rmetzger > > Let me address them point by point: > > > I really like the idea of providing an out of the box exactly-once implementation. > > I am worried about the state size on high volume PubSub subscriptions. As in my experience, PubSub _will_ be sending messages multiple times and this might happen up to 7 days after the initial message. The 7 days being the upper bound as this is the maximum PubSub retention time. So this would mean we would be storing message ids for 7 days. > > Implementation wise this should still be very doable though. We could replace the `DeserializationSchema` from `Object deserialize(byte[])` to a class with something like: `Object deserialize(PubSubMessage)` so we can decide if and how we want to pass the messageId on to the next operator. I think if we can add this behavior in this PR we can always provide an Exactly-once implementation later on without becoming backward incompatible. What do you think? > > I think either way it still makes sense to use the `MultipleIdsMessageAcknowledgingSourceBase` class for the Acknowledge-on-checkpoint behavior, just so we don't duplicate this behavior in code. The most elegant solution might be to split the `acknowledge-on-checkpoint` and `exactly-once on parallelism 1` code. So RabbitMQ connector can use both parts while PubSub only uses the `acknowledge-on-checkpoint` part. But this would require some large changes for the RabbitMQ connector which I cannot test myself. I'm a bit hesitant to go this direction. What do you think? > > > I'm going to test this myself this weekend. In the previous (async) implementation I would for sure expect this behavior but the new implementation should only be bounded by memory (the amount of acknowledge Ids it has to store between checkpoints) > I fully agree we should not force users to lower their checkpoint frequency if they want a higher throughput. I think this is fixable, I'll come back to this! > > When I first created the PR I had added `modes` to the PubSubSource. A NONE / ATLEAST_ONCE / EXACTLY_ONCE enum. > Where NONE would be the behavior you describe: acknowledge immediately with the risk of losing messages. If we can't seem to fix the performance for large checkpoint interval jobs we might consider adding these modes back again. > > > Could you play with the `withMaxMessagesPerPull` setting? I can imagine each pull has some overhead and reducing the number of pulls needed might be nicer for the CPU cycles. > > Do you see this high CPU consumption on empty subscriptions as well? > > We could always give the json api a try, but I would be suprised if this has a better performance.. @Xeli I tend to agree with Stephan on getting rid of the `MultipleIdsMessageAcknowledgingSourceBase`. The fact that its memory consumption is unbounded worries me.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services