This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 462438d428 [#8856] fix(iceberg): Override table owner with OAuth 
authenticated user (#8857)
462438d428 is described below

commit 462438d4286cfae31d0990c7bb116fe30731e2ce
Author: Bharath Krishna <[email protected]>
AuthorDate: Wed Oct 22 19:26:30 2025 -0700

    [#8856] fix(iceberg): Override table owner with OAuth authenticated user 
(#8857)
    
    ### What changes were proposed in this pull request?
    
    Modified IcebergTableOperationExecutor to override the client-provided
    owner property with the authenticated user from OAuth/JWT tokens when
    creating Iceberg tables.
    
    ### Why are the changes needed?
    
    When tables are created via Spark or other Iceberg clients, the client
    sends its own 'owner' property value (e.g., 'spark', 'system', etc.) in
    the CreateTableRequest. This results in incorrect table ownership being
    stored in the metadata, making audit trails unreliable and not
    reflecting the actual authenticated user.
    
    The fix ensures that table ownership reflects the actual authenticated
    user rather than the client's environment or default values.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Tables created through the Iceberg REST API will now show the
    authenticated user (from OAuth/JWT token) as the owner instead of the
    value sent by the client (e.g., 'spark').
    
    ### How was this patch tested?
    
    Tested with Spark SQL creating tables through Gravitino Iceberg REST
    service with OAuth authentication. Verified that:
    - Tables now show correct owner (e.g., '[email protected]')
    - Previous behavior showed 'spark' as owner
    - Backward compatible when OAuth is disabled
    
    Before fix:
    <img width="1045" height="945" alt="Screenshot 2025-10-20 at 2 45 56 PM"
    
src="https://github.com/user-attachments/assets/e912e844-13fd-48df-86af-fe405000c5d8";
    />
    
    After fix:
    <img width="1038" height="922" alt="Screenshot 2025-10-20 at 2 46 04 PM"
    
src="https://github.com/user-attachments/assets/819e1822-5439-45ad-80a3-603b8d19124f";
    />
    
    
    Fixes: #8856
---
 .../lakehouse/iceberg/IcebergConstants.java        |   1 +
 .../dispatcher/IcebergTableOperationExecutor.java  |  34 +++++
 .../TestIcebergTableOperationExecutor.java         | 147 +++++++++++++++++++++
 3 files changed, 182 insertions(+)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index d4fd8baad4..8e1a520252 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -57,6 +57,7 @@ public class IcebergConstants {
 
   public static final String COMMENT = "comment";
   public static final String CREATOR = "creator";
+  public static final String OWNER = "owner";
   public static final String LOCATION = "location";
   public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
   public static final String CHERRY_PICK_SNAPSHOT_ID = 
"cherry-pick-snapshot-id";
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
index cec833136a..bebc9c1afa 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
@@ -19,6 +19,10 @@
 
 package org.apache.gravitino.iceberg.service.dispatcher;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.iceberg.catalog.Namespace;
@@ -29,9 +33,13 @@ import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IcebergTableOperationExecutor implements 
IcebergTableOperationDispatcher {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableOperationExecutor.class);
+
   private IcebergCatalogWrapperManager icebergCatalogWrapperManager;
 
   public IcebergTableOperationExecutor(IcebergCatalogWrapperManager 
icebergCatalogWrapperManager) {
@@ -41,6 +49,32 @@ public class IcebergTableOperationExecutor implements 
IcebergTableOperationDispa
   @Override
   public LoadTableResponse createTable(
       IcebergRequestContext context, Namespace namespace, CreateTableRequest 
createTableRequest) {
+    String authenticatedUser = context.userName();
+    if (!AuthConstants.ANONYMOUS_USER.equals(authenticatedUser)) {
+      String existingOwner = 
createTableRequest.properties().get(IcebergConstants.OWNER);
+
+      // Override the owner as the authenticated user if different from 
authenticated user
+      if (!authenticatedUser.equals(existingOwner)) {
+        Map<String, String> properties = new 
HashMap<>(createTableRequest.properties());
+        properties.put(IcebergConstants.OWNER, authenticatedUser);
+        LOG.debug(
+            "Overriding table owner from '{}' to authenticated user: '{}'",
+            existingOwner,
+            authenticatedUser);
+
+        // CreateTableRequest is immutable, so we need to rebuild it with 
modified properties
+        createTableRequest =
+            CreateTableRequest.builder()
+                .withName(createTableRequest.name())
+                .withSchema(createTableRequest.schema())
+                .withPartitionSpec(createTableRequest.spec())
+                .withWriteOrder(createTableRequest.writeOrder())
+                .withLocation(createTableRequest.location())
+                .setProperties(properties)
+                .build();
+      }
+    }
+
     return icebergCatalogWrapperManager
         .getCatalogWrapper(context.catalogName())
         .createTable(namespace, createTableRequest, 
context.requestCredentialVending());
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableOperationExecutor.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableOperationExecutor.java
new file mode 100644
index 0000000000..47c2ffb4d2
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableOperationExecutor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
+import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.listener.api.event.IcebergRequestContext;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestIcebergTableOperationExecutor {
+
+  private static final Schema TABLE_SCHEMA =
+      new Schema(NestedField.required(1, "test_field", StringType.get()));
+
+  private IcebergTableOperationExecutor executor;
+  private IcebergCatalogWrapperManager mockWrapperManager;
+  private CatalogWrapperForREST mockCatalogWrapper;
+  private IcebergRequestContext mockContext;
+
+  @BeforeEach
+  public void setUp() {
+    mockWrapperManager = mock(IcebergCatalogWrapperManager.class);
+    mockCatalogWrapper = mock(CatalogWrapperForREST.class);
+    executor = new IcebergTableOperationExecutor(mockWrapperManager);
+
+    mockContext = mock(IcebergRequestContext.class);
+    when(mockContext.catalogName()).thenReturn("test_catalog");
+    when(mockContext.requestCredentialVending()).thenReturn(false);
+    
when(mockWrapperManager.getCatalogWrapper("test_catalog")).thenReturn(mockCatalogWrapper);
+  }
+
+  @Test
+  public void testCreateTableOverridesOwnerWithAuthenticatedUser() {
+    String authenticatedUser = "[email protected]";
+    String clientProvidedOwner = "spark";
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(IcebergConstants.OWNER, clientProvidedOwner);
+
+    CreateTableRequest originalRequest =
+        CreateTableRequest.builder()
+            .withName("test_table")
+            .withSchema(TABLE_SCHEMA)
+            .setProperties(properties)
+            .build();
+
+    when(mockContext.userName()).thenReturn(authenticatedUser);
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockCatalogWrapper.createTable(any(), any(), 
anyBoolean())).thenReturn(mockResponse);
+
+    executor.createTable(mockContext, Namespace.of("test_namespace"), 
originalRequest);
+
+    ArgumentCaptor<CreateTableRequest> requestCaptor =
+        ArgumentCaptor.forClass(CreateTableRequest.class);
+    verify(mockCatalogWrapper).createTable(any(), requestCaptor.capture(), 
anyBoolean());
+
+    CreateTableRequest capturedRequest = requestCaptor.getValue();
+    String actualOwner = 
capturedRequest.properties().get(IcebergConstants.OWNER);
+
+    Assertions.assertEquals(authenticatedUser, actualOwner);
+    Assertions.assertNotEquals(clientProvidedOwner, actualOwner);
+  }
+
+  @Test
+  public void testCreateTableAddsOwnerWhenMissing() {
+    String authenticatedUser = "[email protected]";
+
+    CreateTableRequest originalRequest =
+        
CreateTableRequest.builder().withName("test_table").withSchema(TABLE_SCHEMA).build();
+
+    when(mockContext.userName()).thenReturn(authenticatedUser);
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockCatalogWrapper.createTable(any(), any(), 
anyBoolean())).thenReturn(mockResponse);
+
+    executor.createTable(mockContext, Namespace.of("test_namespace"), 
originalRequest);
+
+    ArgumentCaptor<CreateTableRequest> requestCaptor =
+        ArgumentCaptor.forClass(CreateTableRequest.class);
+    verify(mockCatalogWrapper).createTable(any(), requestCaptor.capture(), 
anyBoolean());
+
+    String actualOwner = 
requestCaptor.getValue().properties().get(IcebergConstants.OWNER);
+    Assertions.assertEquals(authenticatedUser, actualOwner);
+  }
+
+  @Test
+  public void testCreateTablePreservesOwnerForAnonymousUser() {
+    String clientProvidedOwner = "spark";
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(IcebergConstants.OWNER, clientProvidedOwner);
+
+    CreateTableRequest originalRequest =
+        CreateTableRequest.builder()
+            .withName("test_table")
+            .withSchema(TABLE_SCHEMA)
+            .setProperties(properties)
+            .build();
+
+    when(mockContext.userName()).thenReturn("anonymous");
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockCatalogWrapper.createTable(any(), any(), 
anyBoolean())).thenReturn(mockResponse);
+
+    executor.createTable(mockContext, Namespace.of("test_namespace"), 
originalRequest);
+
+    ArgumentCaptor<CreateTableRequest> requestCaptor =
+        ArgumentCaptor.forClass(CreateTableRequest.class);
+    verify(mockCatalogWrapper).createTable(any(), requestCaptor.capture(), 
anyBoolean());
+
+    String actualOwner = 
requestCaptor.getValue().properties().get(IcebergConstants.OWNER);
+    Assertions.assertEquals(clientProvidedOwner, actualOwner);
+  }
+}

Reply via email to