lowka commented on code in PR #2192: URL: https://github.com/apache/ignite-3/pull/2192#discussion_r1229388536
########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java: ########## @@ -374,121 +389,190 @@ private void registerIndexListener(IndexEvent evt, AbstractIndexEventListener ls private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0( SessionId sessionId, - QueryContext context, + QueryContext queryContext, String sql, Object... params ) { Session session = sessionManager.session(sessionId); if (session == null) { - return CompletableFuture.failedFuture( + return failedFuture( new SqlException(SESSION_NOT_FOUND_ERR, format("Session not found [{}]", sessionId))); } + QueryCancel queryCancel; + + try { + queryCancel = createQueryCancel(session); + } catch (IllegalStateException ex) { + return failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR, + format("Session has been expired [{}]", session.sessionId()), ex)); + } + String schemaName = session.properties().get(QueryProperty.DEFAULT_SCHEMA); - InternalTransaction outerTx = context.unwrap(InternalTransaction.class); + InternalTransaction outerTx = queryContext.unwrap(InternalTransaction.class); + long timestamp = outerTx == null ? clock.nowLong() : outerTx.startTimestamp().longValue(); - QueryCancel queryCancel = new QueryCancel(); + boolean implicitTx = outerTx == null; - AsyncCloseable closeableResource = () -> CompletableFuture.runAsync( - queryCancel::cancel, - taskExecutor - ); + // TODO: IGNITE-19497 Switch to Catalog manager and uncomment next lines. + // int plannerCatalogVersion = catalogManager.activeCatalogVersion(timestamp); + // int schemaId = catalogManager.schema(plannerCatalogVersion).id(); + // SchemaPlus schema = sqlSchemaManager.schema(schemaName, plannerCatalogVersion); - queryCancel.add(() -> session.unregisterResource(closeableResource)); + int plannerCatalogVersion = 0; + SchemaPlus schema = sqlSchemaManager.schema(schemaName); - try { - session.registerResource(closeableResource); - } catch (IllegalStateException ex) { - return CompletableFuture.failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR, - format("Session has been expired [{}]", session.sessionId()), ex)); + if (schema == null) { + return failedFuture(new SchemaNotFoundException(schemaName)); } - CompletableFuture<Void> start = new CompletableFuture<>(); + CacheKey cacheKey = new CacheKey(plannerCatalogVersion, schemaName, sql, params); + + BaseQueryContext plannerContext = BaseQueryContext.builder() + .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) + .logger(LOG) + .cancel(queryCancel) + .parameters(params) + .plannerTimeout(PLANNER_TIMEOUT) + .build(); + + CompletableFuture<QueryPlan> planFuture = queryCache.get(cacheKey); + + if (planFuture == null) { + planFuture = CompletableFuture.supplyAsync(() -> { Review Comment: I think it would be better to specify an executor here since `CompletableFuture.supplyAsync(task)` uses ForkJoin's common pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org