morningman commented on code in PR #51185:
URL: https://github.com/apache/doris/pull/51185#discussion_r2113607866
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -229,19 +231,21 @@ public void startSplit(int numBackends) throws
UserException {
public void doStartSplit() {
TableScan scan = createTableScan();
CompletableFuture.runAsync(() -> {
+ AtomicReference<CloseableIterable<FileScanTask>> taskRef = new
AtomicReference<>();
try {
preExecutionAuthenticator.execute(
() -> {
CloseableIterable<FileScanTask> fileScanTasks =
planFileScanTask(scan);
-
- // 1. this task should stop when all splits are
assigned
- // 2. if we want to stop this plan, we can close
the fileScanTasks to stop
- splitAssignment.addCloseable(fileScanTasks);
-
- fileScanTasks.forEach(fileScanTask ->
-
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));
-
- return null;
+ taskRef.set(planFileScanTask(scan));
Review Comment:
BTW, the `threadPoolWithPreAuth` in `ExternalCatalog.java` is only used in
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -229,19 +231,21 @@ public void startSplit(int numBackends) throws
UserException {
public void doStartSplit() {
TableScan scan = createTableScan();
CompletableFuture.runAsync(() -> {
+ AtomicReference<CloseableIterable<FileScanTask>> taskRef = new
AtomicReference<>();
try {
preExecutionAuthenticator.execute(
() -> {
CloseableIterable<FileScanTask> fileScanTasks =
planFileScanTask(scan);
-
- // 1. this task should stop when all splits are
assigned
- // 2. if we want to stop this plan, we can close
the fileScanTasks to stop
- splitAssignment.addCloseable(fileScanTasks);
-
- fileScanTasks.forEach(fileScanTask ->
-
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));
-
- return null;
+ taskRef.set(planFileScanTask(scan));
Review Comment:
You call `planFileScanTask()` twice?
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java:
##########
@@ -100,10 +108,22 @@ private void appendBatch(Multimap<Backend, Split> batch)
throws UserException {
for (Split split : splits) {
locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys));
}
- try {
- assignment.computeIfAbsent(backend, be -> new
LinkedBlockingQueue<>(10000)).put(locations);
- } catch (Exception e) {
- throw new UserException("Failed to offer batch split", e);
+ while (true) {
+ BlockingQueue<Collection<TScanRangeLocations>> queue =
+ assignment.computeIfAbsent(backend, be -> new
LinkedBlockingQueue<>(10000));
+ try {
+ if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {
+ throw new UserException("Failed to offer batch split by
interrupted", e);
+ }
+ if (needMoreSplit()) {
Review Comment:
This logic is wrong
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]