lowka commented on code in PR #2192:
URL: https://github.com/apache/ignite-3/pull/2192#discussion_r1229388536


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -374,121 +389,190 @@ private void registerIndexListener(IndexEvent evt, 
AbstractIndexEventListener ls
 
     private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
             SessionId sessionId,
-            QueryContext context,
+            QueryContext queryContext,
             String sql,
             Object... params
     ) {
         Session session = sessionManager.session(sessionId);
 
         if (session == null) {
-            return CompletableFuture.failedFuture(
+            return failedFuture(
                     new SqlException(SESSION_NOT_FOUND_ERR, format("Session 
not found [{}]", sessionId)));
         }
 
+        QueryCancel queryCancel;
+
+        try {
+            queryCancel = createQueryCancel(session);
+        } catch (IllegalStateException ex) {
+            return failedFuture(new 
IgniteInternalException(SESSION_EXPIRED_ERR,
+                    format("Session has been expired [{}]", 
session.sessionId()), ex));
+        }
+
         String schemaName = 
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
 
-        InternalTransaction outerTx = 
context.unwrap(InternalTransaction.class);
+        InternalTransaction outerTx = 
queryContext.unwrap(InternalTransaction.class);
+        long timestamp = outerTx == null ? clock.nowLong() : 
outerTx.startTimestamp().longValue();
 
-        QueryCancel queryCancel = new QueryCancel();
+        boolean implicitTx = outerTx == null;
 
-        AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
-                queryCancel::cancel,
-                taskExecutor
-        );
+        // TODO: IGNITE-19497 Switch to Catalog manager and uncomment next 
lines.
+        // int plannerCatalogVersion = 
catalogManager.activeCatalogVersion(timestamp);
+        // int schemaId = catalogManager.schema(plannerCatalogVersion).id();
+        // SchemaPlus schema = sqlSchemaManager.schema(schemaName, 
plannerCatalogVersion);
 
-        queryCancel.add(() -> session.unregisterResource(closeableResource));
+        int plannerCatalogVersion = 0;
+        SchemaPlus schema = sqlSchemaManager.schema(schemaName);
 
-        try {
-            session.registerResource(closeableResource);
-        } catch (IllegalStateException ex) {
-            return CompletableFuture.failedFuture(new 
IgniteInternalException(SESSION_EXPIRED_ERR,
-                    format("Session has been expired [{}]", 
session.sessionId()), ex));
+        if (schema == null) {
+            return failedFuture(new SchemaNotFoundException(schemaName));
         }
 
-        CompletableFuture<Void> start = new CompletableFuture<>();
+        CacheKey cacheKey = new CacheKey(plannerCatalogVersion, schemaName, 
sql, params);
+
+        BaseQueryContext plannerContext = BaseQueryContext.builder()
+                
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+                .logger(LOG)
+                .cancel(queryCancel)
+                .parameters(params)
+                .plannerTimeout(PLANNER_TIMEOUT)
+                .build();
+
+        CompletableFuture<QueryPlan> planFuture = queryCache.get(cacheKey);
+
+        if (planFuture == null) {
+            planFuture = CompletableFuture.supplyAsync(() -> {

Review Comment:
   I think it would be better to specify an executor here since 
`CompletableFuture.supplyAsync(task)` uses ForkJoin's common pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to