senegalo commented on pull request #12056: URL: https://github.com/apache/flink/pull/12056#issuecomment-640193242
@dawidwys @austince no worries at all .. I've been having some computer trouble as well lately my 7 year old laptop decided it was time ! lucky for me that was an excuse for a new XPS 15 ! It seems like i miss understood you before so here is another iteration of what i got from you the above. > we should not expose the concrete internal implementation of RMQCollector from RMQSource. It's better to have an interface defined in RMQDeserializationSchema Done now we have a new interface that lives in the `RMQDeserializationSchema` >the correlation check will pass only for the first element of a correlationId from a batch. We should account for a situation when from a single RMQ record (single correlationId) we produce multiple flink record You are absolutely right !! i have no clue how this slipped by. i fixed that by moving all the checks into one method `setMessageIdentifiers` this one takes both the `correlationID` and the `deliveryTag` of the message and sets a flag depending on if the conditions where met or not. Calling `collect` without calling this method first will throw an exception. >In the end I think with @austince we discussed a slightly different RMQCollector interface Sorry again for the missunderstanding :) Let me know if this does the deed or not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org