dimas-b commented on code in PR #4061: URL: https://github.com/apache/polaris/pull/4061#discussion_r3030697196
########## runtime/service/src/main/java/org/apache/polaris/service/task/PrincipalContextPropagator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.ContextNotActiveException; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import org.apache.polaris.core.auth.ImmutablePolarisPrincipal; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Propagates the authenticated principal across the async task boundary via {@link + * PolarisPrincipalHolder}. + * + * <p>A clone of the principal is captured at submission time so the task thread uses a stable + * snapshot that is independent of the originating request scope's lifecycle. + */ +@ApplicationScoped +public class PrincipalContextPropagator implements AsyncContextPropagator { + + private static final Logger LOGGER = LoggerFactory.getLogger(PrincipalContextPropagator.class); + + private final PolarisPrincipalHolder polarisPrincipalHolder; + private final Instance<PolarisPrincipal> polarisPrincipal; Review Comment: Why `Instance`? Most code is able to deal with simple `PolarisPrincipal` typed fields 🤔 ########## runtime/service/src/main/java/org/apache/polaris/service/task/AsyncContextPropagator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.annotation.Nonnull; + +/** + * Extension point for propagating request-scoped context across the async task boundary. + * + * <p>Each implementation is responsible for a single piece of request-scoped context (e.g. realm + * identity, authenticated principal, request ID). Implementations are CDI beans (typically + * {@code @ApplicationScoped}). {@link TaskExecutorImpl} discovers all implementations via CDI + * {@code Instance} injection, so adding a new propagation concern requires only a new bean — no + * existing code needs to change. + * + * <p>Lifecycle: + * + * <ol> + * <li>{@link #capture()} is called on the request thread (active request scope). The + * implementation reads its relevant context and returns a {@link RestoreAction} that + * encapsulates the captured state and knows how to restore it. + * <li>The action is carried across the async boundary. + * <li>{@link RestoreAction#restore()} is called inside the task thread's new CDI request scope. + * <li>{@link RestoreAction#close()} is called after the task finishes, for optional cleanup (e.g. + * MDC restoration). The default implementation is a no-op. + * </ol> + */ +public interface AsyncContextPropagator { + + /** + * Captures relevant context from the current request scope. + * + * <p>The returned action may be restored multiple times across retries and different threads. + * Implementations must ensure the captured state within the action is <strong>immutable</strong> + * and <strong>thread-safe</strong>. + * + * @return an action that can restore the captured context, or {@link RestoreAction#NOOP} if no + * context is available to capture. + */ + @Nonnull + RestoreAction capture(); + + /** + * Encapsulates captured context and the logic to restore it on a task thread. + * + * <p>Implementations that need cleanup after task completion (e.g. MDC restoration) override + * {@link #close()}. The default {@code close()} is a no-op, so propagators with no cleanup + * requirement need not implement it. + */ + interface RestoreAction extends AutoCloseable { + + /** Shared no-op instance for propagators that have nothing to capture. */ + RestoreAction NOOP = () -> {}; + + /** Restores the captured context into the task thread's active request scope. */ + void restore(); + + /** Optional cleanup after the task finishes. Default is a no-op. */ Review Comment: Optional cleanup sounds like a misnomer, TBH... What is optional? Calling it or implementing? ########## runtime/service/src/main/java/org/apache/polaris/service/task/RequestIdPropagator.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.ContextNotActiveException; +import jakarta.inject.Inject; +import org.apache.polaris.service.context.catalog.RequestIdHolder; +import org.apache.polaris.service.tracing.RequestIdFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Propagates the request ID across the async task boundary. + * + * <p>At capture time the request ID is read from {@link RequestIdHolder}, which is populated by + * {@code RequestIdFilter} on HTTP request threads and by this propagator's {@link + * RestoreAction#restore()} path on task threads (enabling nested task submission). + * + * <p>At restore time the ID is written to both the {@link RequestIdHolder} (so that {@code + * RequestIdSupplier} works in task threads) and to the SLF4J MDC (so that log messages emitted by + * the task carry the originating request ID). + * + * <p>MDC cleanup is performed by the action's {@link RestoreAction#close()} so that thread-pool + * threads are left in a clean state after the task completes. + */ +@ApplicationScoped +public class RequestIdPropagator implements AsyncContextPropagator { + + private static final Logger LOGGER = LoggerFactory.getLogger(RequestIdPropagator.class); + + private final RequestIdHolder requestIdHolder; + + @SuppressWarnings("unused") // Required by CDI + protected RequestIdPropagator() { + this(null); + } + + @Inject + public RequestIdPropagator(RequestIdHolder requestIdHolder) { + this.requestIdHolder = requestIdHolder; + } + + @Override + public RestoreAction capture() { + String id = null; + try { + id = requestIdHolder.get(); + } catch (ContextNotActiveException e) { + // scope not active + } + LOGGER.trace("capture requestId={}", id); + if (id == null) { + return RestoreAction.NOOP; + } + String captured = id; + return new RestoreAction() { + private String previous; + + @Override + public void restore() { + LOGGER.trace("restore requestId={}", captured); + requestIdHolder.set(captured); + previous = MDC.get(RequestIdFilter.REQUEST_ID_KEY); + MDC.put(RequestIdFilter.REQUEST_ID_KEY, captured); + } + + @Override + public void close() { + if (previous != null) { + MDC.put(RequestIdFilter.REQUEST_ID_KEY, previous); Review Comment: I do not think MDC should be mixed with CDI context propagation. Moreover, why is request ID in MDC but Principal is not? This is not to say that Principal should be in MDC, but to illustrate that putting request IDs into MDC goes beyond the scope of CDI context propagation. If you're striving to have a unified way of setting request IDs in MDC, I propose to do that inside the `requestIdHolder.set()` method (and never remove it from MDC). If the idea is to ensure try/finally style cleanup for request ID in MDC, I do not think context propagators are the right place for that conceptually. Please consider a different approach. That said, I do not see a reason to ever remove request IDs from MDC. Do you have a use case where that is required? ########## runtime/service/src/main/java/org/apache/polaris/service/task/PrincipalContextPropagator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.ContextNotActiveException; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import org.apache.polaris.core.auth.ImmutablePolarisPrincipal; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Propagates the authenticated principal across the async task boundary via {@link + * PolarisPrincipalHolder}. + * + * <p>A clone of the principal is captured at submission time so the task thread uses a stable + * snapshot that is independent of the originating request scope's lifecycle. + */ +@ApplicationScoped +public class PrincipalContextPropagator implements AsyncContextPropagator { + + private static final Logger LOGGER = LoggerFactory.getLogger(PrincipalContextPropagator.class); + + private final PolarisPrincipalHolder polarisPrincipalHolder; + private final Instance<PolarisPrincipal> polarisPrincipal; + + @SuppressWarnings("unused") // Required by CDI + protected PrincipalContextPropagator() { + this(null, null); + } + + @Inject + public PrincipalContextPropagator( + PolarisPrincipalHolder polarisPrincipalHolder, Instance<PolarisPrincipal> polarisPrincipal) { + this.polarisPrincipalHolder = polarisPrincipalHolder; + this.polarisPrincipal = polarisPrincipal; + } + + @Override + public RestoreAction capture() { + PolarisPrincipal clone = null; + try { + // Clone to allow task thread get a stable snapshot regardless of the request scope + // lifecycle. + clone = ImmutablePolarisPrincipal.builder().from(polarisPrincipal.get()).build(); + } catch (ContextNotActiveException e) { + // scope not active Review Comment: Capturing out of scope is a mistake. We should not ignore this exception. ########## runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java: ########## @@ -96,22 +91,18 @@ public TaskExecutorImpl( Clock clock, MetaStoreManagerFactory metaStoreManagerFactory, TaskFileIOSupplier fileIOSupplier, - RealmContextHolder realmContextHolder, PolarisEventDispatcher polarisEventDispatcher, PolarisEventMetadataFactory eventMetadataFactory, @Nullable Tracer tracer, - PolarisPrincipalHolder polarisPrincipalHolder, - PolarisPrincipal polarisPrincipal) { + Instance<AsyncContextPropagator> contextPropagators) { Review Comment: I believe a `List<AsyncContextPropagator>` will work, won't it? Using `Instance` should be reserved to infrastructure-level code. Normal beans can get instances of direct java types injected in most cases. ########## runtime/service/src/main/java/org/apache/polaris/service/task/PrincipalContextPropagator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.ContextNotActiveException; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import org.apache.polaris.core.auth.ImmutablePolarisPrincipal; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Propagates the authenticated principal across the async task boundary via {@link + * PolarisPrincipalHolder}. + * + * <p>A clone of the principal is captured at submission time so the task thread uses a stable + * snapshot that is independent of the originating request scope's lifecycle. + */ +@ApplicationScoped +public class PrincipalContextPropagator implements AsyncContextPropagator { + + private static final Logger LOGGER = LoggerFactory.getLogger(PrincipalContextPropagator.class); + + private final PolarisPrincipalHolder polarisPrincipalHolder; + private final Instance<PolarisPrincipal> polarisPrincipal; + + @SuppressWarnings("unused") // Required by CDI + protected PrincipalContextPropagator() { + this(null, null); + } + + @Inject + public PrincipalContextPropagator( + PolarisPrincipalHolder polarisPrincipalHolder, Instance<PolarisPrincipal> polarisPrincipal) { + this.polarisPrincipalHolder = polarisPrincipalHolder; + this.polarisPrincipal = polarisPrincipal; + } + + @Override + public RestoreAction capture() { + PolarisPrincipal clone = null; + try { + // Clone to allow task thread get a stable snapshot regardless of the request scope + // lifecycle. + clone = ImmutablePolarisPrincipal.builder().from(polarisPrincipal.get()).build(); Review Comment: Why not `polarisPrincipalHolder.get()`? ########## runtime/service/src/main/java/org/apache/polaris/service/task/AsyncContextPropagator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.annotation.Nonnull; + +/** + * Extension point for propagating request-scoped context across the async task boundary. + * + * <p>Each implementation is responsible for a single piece of request-scoped context (e.g. realm + * identity, authenticated principal, request ID). Implementations are CDI beans (typically + * {@code @ApplicationScoped}). {@link TaskExecutorImpl} discovers all implementations via CDI + * {@code Instance} injection, so adding a new propagation concern requires only a new bean — no + * existing code needs to change. + * + * <p>Lifecycle: + * + * <ol> + * <li>{@link #capture()} is called on the request thread (active request scope). The + * implementation reads its relevant context and returns a {@link RestoreAction} that + * encapsulates the captured state and knows how to restore it. + * <li>The action is carried across the async boundary. + * <li>{@link RestoreAction#restore()} is called inside the task thread's new CDI request scope. + * <li>{@link RestoreAction#close()} is called after the task finishes, for optional cleanup (e.g. + * MDC restoration). The default implementation is a no-op. + * </ol> + */ +public interface AsyncContextPropagator { + + /** + * Captures relevant context from the current request scope. + * + * <p>The returned action may be restored multiple times across retries and different threads. + * Implementations must ensure the captured state within the action is <strong>immutable</strong> + * and <strong>thread-safe</strong>. + * + * @return an action that can restore the captured context, or {@link RestoreAction#NOOP} if no + * context is available to capture. + */ + @Nonnull + RestoreAction capture(); + + /** + * Encapsulates captured context and the logic to restore it on a task thread. + * + * <p>Implementations that need cleanup after task completion (e.g. MDC restoration) override + * {@link #close()}. The default {@code close()} is a no-op, so propagators with no cleanup + * requirement need not implement it. + */ + interface RestoreAction extends AutoCloseable { + + /** Shared no-op instance for propagators that have nothing to capture. */ + RestoreAction NOOP = () -> {}; + + /** Restores the captured context into the task thread's active request scope. */ + void restore(); + + /** Optional cleanup after the task finishes. Default is a no-op. */ + @Override + default void close() throws Exception {} Review Comment: Having a `close()` method is logically inconsistent with the idea of restoring state multiple times (javadoc on line 49). To be clear: I support the idea of repeated calls to `restore()`. I believe this interface should not have a `close()` method. ########## runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java: ########## @@ -260,37 +256,52 @@ protected void handleTask( @ActivateRequestContext protected void handleTaskWithTracing( - String realmId, long taskEntityId, CallContext callContext, - PolarisPrincipal principal, + List<AsyncContextPropagator.RestoreAction> actions, PolarisEventMetadata eventMetadata, int attempt) { - // Note: each call to this method runs in a new CDI request context - - realmContextHolder.set(() -> realmId); - // since this is now a different context we store clone of the principal in a holder object - // which essentially reauthenticates the principal. PolarisPrincipal bean always looks for a - // principal set in PolarisPrincipalHolder first and assumes that identity if set. - polarisPrincipalHolder.set(principal); + // Note: each call to this method runs in a new CDI request context. + // Restore all propagated context into the fresh request scope. + // A restore failure is fatal: running a task without proper realm or principal context + // risks wrong-tenant or wrong-identity execution, so we abort rather than continue. + // The restore loop is inside the try block so the finally block cleans up any + // actions that were already restored before the failure. + int restored = 0; + try { + for (AsyncContextPropagator.RestoreAction action : actions) { + action.restore(); + restored++; + } - if (tracer == null) { - handleTask(taskEntityId, callContext, eventMetadata, attempt); - } else { - Span span = - tracer - .spanBuilder("polaris.task") - .setParent(Context.current()) - .setAttribute( - TracingFilter.REALM_ID_ATTRIBUTE, - callContext.getRealmContext().getRealmIdentifier()) - .setAttribute("polaris.task.entity.id", taskEntityId) - .setAttribute("polaris.task.attempt", attempt) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { + if (tracer == null) { handleTask(taskEntityId, callContext, eventMetadata, attempt); - } finally { - span.end(); + } else { + Span span = + tracer + .spanBuilder("polaris.task") + .setParent(Context.current()) + .setAttribute( + TracingFilter.REALM_ID_ATTRIBUTE, + callContext.getRealmContext().getRealmIdentifier()) + .setAttribute("polaris.task.entity.id", taskEntityId) + .setAttribute("polaris.task.attempt", attempt) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + handleTask(taskEntityId, callContext, eventMetadata, attempt); + } finally { + span.end(); + } + } + } finally { + // Close in reverse order (LIFO) so that later actions clean up before + // earlier ones, matching standard stacked-context conventions. + for (int i = restored - 1; i >= 0; i--) { + try { + actions.get(i).close(); Review Comment: I think I understand what you'd like to build, but I believe the term "context propagation" (from PR title) would be really confusing in that case. If the idea is to establish a convenience framework for running code inside some context and make sure the current thread removes traces of that context on completion, I believe a better approach would to follow the pattern of `Bootstrapper` (suggested new name: `ContrextualTaskRunner`). https://github.com/apache/polaris/blob/6c1a12bfd25a77b34e9e803295cc92492dda47b6/runtime/service/src/main/java/org/apache/polaris/service/config/Bootstrapper.java#L39 Inject `RequestIdHolder`, `PolarisPrincipalHolder` and `RealmContextHolder` into it. Use them to propagate context internally. No need for a new SPI. `TaskExecutorImpl` would call: 1. (on task creation) `ContextualAction action = ContrextualTaskRunner.capture()` 2. (on execution) `action.runWithNewContext( { ... } );` MDC can be handled internally and cleared upon exit from `runWithContext` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
