curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-618879789
> So I had an offline discussion with Stephan to clarify the scope. Indeed, `KafkaShuffle` has been requested by users and serves as a bridge until we get fully persistent channels. > > We both agree that it would have been nice to also support reading from non-Flink shuffles (a.k.a from any partitioned Kafka topic) by making the serializer pluggable at the composition level. Please have a look at `StreamElementSerializer` and see if we can use it. If that doesn't work for some reason, then I can live with a pure `KafkaShuffle` in the first iteration. > > Implementation-wise, we are both a bit skeptical that an API change (to `SinkFunction`) is the best course as that requires more coordination and should have probably been triggered already if you want this feature in 1.11. Using custom operators would give you all freedom without that the need of coordiation. It would also avoid the changes to `KafkaProducer`/`KafkaConsumer` on the cost of replicating some logic. > > Lastly, I have strong headaches on how checkpoints and savepoints are working with `KafkaShuffle`. I think for storing checkpoints and recovery in terms of fault tolerance, the approach is good as-is. However, for savepoints, we should probably ensure that no unconsumed data is still in lingering in the shuffle topic as that would translate to in-flight data. Hence, restoring from an old savepoint would completely screw up the data. At this point, we also need to ensure that the topic is purged (probably with some assertion). Not supporting going back in checkpoints should be save from current guarantees. Alternatively, we also need to implement some recovery logic for older check/savepoints that ignores "future" data somehow (so some higher level Kafka offset management). Hey Arvid, thanks so much for the quick response! I think you have several concerns about 1. why `StreamElementSerializer` can not be reused 2. why I have to have a different `KafkaProducer/KafkaConsumer` 3. Have a better way instead of changing `SinkFunction` (that's exactly my concern, and why I want to get early feedback, I am hesitating as well) 4. `save points`, which I do not completely get it. For the first two, I have reasons; for the third one, I have concerns as well. For the forth, not completely sure I understand it correctly. Do, do you have time to chat a bit on these four points on Monday? BTW, I am not insisting to get this in 1.11. Instead, I really want to do it in the right way. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org