AHeise commented on a change in pull request #17792: URL: https://github.com/apache/flink/pull/17792#discussion_r751969408
########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java ########## @@ -105,8 +111,16 @@ private boolean reachLimit() { return null; } - RecordIterator<T> batch = reader.readBatch(); - return batch == null ? null : new LimitableIterator(batch); + try { + RecordIterator<T> batch = reader.readBatch(); + return batch == null ? null : new LimitableIterator(batch); + } catch (Exception e) { Review comment: This change is a no-op in a sequential threading model. If `reachLimit()` returns true, then the first `if` in `readBatch` already returns `null`. In all other cases, the exception is rethrown. So I'm assuming you are actually guarding against some concurrent modification in another thread. If so, then I'd rather fix the threading model. This class already uses two different mechanisms of dealing with concurrency (`synchronized` and `AtomicLong`) and you now add optimistic invocation with exception handling as a third. This is a hard to reason. You probably should settle with using a `Lock` and use it all the way. Btw I don't understand many parts of this class: - What's the purpose of `globalNumberRead`? The bulk format instance will be replicated per subtask, so why would you need it? - Do you ensure that only one subtask is executed or how do you consolidate the different lists of the different subtask? `LIMIT N` would produce a list of `N` records in each subtask, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org