This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 842cfc3a46 [#6780] feat(lineage):  support lineage endpoint for 
Gravitino server (#6865)
842cfc3a46 is described below

commit 842cfc3a4666cd5f1cbc171f1e1bf9bd7d42a928
Author: FANNG <xiaoj...@datastrato.com>
AuthorDate: Fri Apr 11 11:29:13 2025 +0800

    [#6780] feat(lineage):  support lineage endpoint for Gravitino server 
(#6865)
    
    ### What changes were proposed in this pull request?
    
    support lineage endpoint for Gravitino server
    
    ### Why are the changes needed?
    
    Fix: #6780
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    local test
---
 lineage/build.gradle.kts                           |  18 ++-
 .../{source/HTTPLineageSource.java => Utils.java}  |  26 ++--
 .../gravitino/lineage/sink/LineageLogSink.java     |   9 +-
 .../lineage/source/HTTPLineageSource.java          |   3 +-
 .../lineage/source/rest/LineageOperations.java     |  77 +++++++++++
 .../lineage/source/TestLineageOperations.java      | 148 +++++++++++++++++++++
 .../org/apache/gravitino/server/web/Utils.java     |   9 ++
 .../org/apache/gravitino/server/web/TestUtils.java |   0
 8 files changed, 269 insertions(+), 21 deletions(-)

diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts
index 21da55dea3..dd462e3bf3 100644
--- a/lineage/build.gradle.kts
+++ b/lineage/build.gradle.kts
@@ -22,20 +22,36 @@ plugins {
 }
 
 dependencies {
+  implementation(project(":common"))
   implementation(project(":core"))
   implementation(project(":server-common"))
+  implementation(libs.bundles.jersey)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
-  implementation(libs.slf4j.api)
   implementation(libs.jackson.datatype.jdk8)
   implementation(libs.jackson.datatype.jsr310)
   implementation(libs.jackson.databind)
+  implementation(libs.metrics.jersey2)
   implementation(libs.openlineage.java) {
     isTransitive = false
   }
+  implementation(libs.slf4j.api)
+
+  annotationProcessor(libs.lombok)
+  compileOnly(libs.lombok)
+  testAnnotationProcessor(libs.lombok)
+  testCompileOnly(libs.lombok)
 
+  testImplementation(libs.jersey.test.framework.core) {
+    exclude(group = "org.junit.jupiter")
+  }
+  testImplementation(libs.jersey.test.framework.provider.jetty) {
+    exclude(group = "org.junit.jupiter")
+  }
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
+  testImplementation(libs.mockito.core)
+  testImplementation(libs.mockito.inline)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
 b/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
similarity index 61%
copy from 
lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
copy to lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
index fd15eaa57f..cb3c90232e 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
@@ -17,20 +17,22 @@
  *  under the License.
  */
 
-package org.apache.gravitino.lineage.source;
+package org.apache.gravitino.lineage;
 
-import com.google.common.collect.ImmutableSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.gravitino.lineage.LineageDispatcher;
-import org.apache.gravitino.server.web.SupportsRESTPackages;
+import io.openlineage.server.OpenLineage.Job;
+import io.openlineage.server.OpenLineage.Run;
+import io.openlineage.server.OpenLineage.RunEvent;
 
-public class HTTPLineageSource implements LineageSource, SupportsRESTPackages {
-  @Override
-  public void initialize(Map<String, String> configs, LineageDispatcher 
dispatcher) {}
+public class Utils {
+  private Utils() {}
 
-  @Override
-  public Set<String> getRESTPackages() {
-    return ImmutableSet.of();
+  public static String getRunID(RunEvent event) {
+    Run run = event.getRun();
+    return run == null ? "Unknown" : run.getRunId().toString();
+  }
+
+  public static String getJobName(RunEvent event) {
+    Job job = event.getJob();
+    return job == null ? "Unknown" : job.getName().toString();
   }
 }
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
index 62367ad234..cdff7822f7 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
@@ -28,8 +28,8 @@ import com.fasterxml.jackson.databind.cfg.EnumFeature;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import io.openlineage.server.OpenLineage.Run;
 import io.openlineage.server.OpenLineage.RunEvent;
+import org.apache.gravitino.lineage.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,13 +61,8 @@ public class LineageLogSink implements LineageSink {
     } catch (JsonProcessingException e) {
       LOG.warn(
           "Process open lineage event failed, run id: {}, error message: {}",
-          getRunId(event),
+          Utils.getRunID(event),
           e.getMessage());
     }
   }
-
-  private String getRunId(RunEvent event) {
-    Run run = event.getRun();
-    return run == null ? "Unknown" : run.getRunId().toString();
-  }
 }
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
index fd15eaa57f..337a967420 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.lineage.source.rest.LineageOperations;
 import org.apache.gravitino.server.web.SupportsRESTPackages;
 
 public class HTTPLineageSource implements LineageSource, SupportsRESTPackages {
@@ -31,6 +32,6 @@ public class HTTPLineageSource implements LineageSource, 
SupportsRESTPackages {
 
   @Override
   public Set<String> getRESTPackages() {
-    return ImmutableSet.of();
+    return ImmutableSet.of(LineageOperations.class.getPackage().getName());
   }
 }
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/rest/LineageOperations.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/rest/LineageOperations.java
new file mode 100644
index 0000000000..2f5fa472b6
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/rest/LineageOperations.java
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.source.rest;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import io.openlineage.server.OpenLineage;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.web.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/lineage")
+public class LineageOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LineageOperations.class);
+  private LineageDispatcher lineageDispatcher;
+
+  @Context private HttpServletRequest httpRequest;
+
+  @Inject
+  public LineageOperations(LineageDispatcher lineageDispatcher) {
+    this.lineageDispatcher = lineageDispatcher;
+  }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Timed(name = "post-lineage." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
+  @ResponseMetered(name = "post-lineage", absolute = true)
+  public Response postLineage(OpenLineage.RunEvent event) {
+    LOG.info(
+        "Open lineage event, run id:{}, job name:{}",
+        org.apache.gravitino.lineage.Utils.getRunID(event),
+        org.apache.gravitino.lineage.Utils.getJobName(event));
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            if (lineageDispatcher.dispatchLineageEvent(event)) {
+              return Utils.created();
+            } else {
+              return Utils.tooManyRequests();
+            }
+          });
+    } catch (Exception e) {
+      LOG.warn("Process lineage failed,", e);
+      return Utils.internalError(e.getMessage(), e);
+    }
+  }
+}
diff --git 
a/lineage/src/test/java/org/apache/gravitino/lineage/source/TestLineageOperations.java
 
b/lineage/src/test/java/org/apache/gravitino/lineage/source/TestLineageOperations.java
new file mode 100644
index 0000000000..d2f428e3ee
--- /dev/null
+++ 
b/lineage/src/test/java/org/apache/gravitino/lineage/source/TestLineageOperations.java
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.lineage.source;
+
+import io.openlineage.client.OpenLineage;
+import io.openlineage.client.OpenLineage.InputDataset;
+import io.openlineage.client.OpenLineage.Job;
+import io.openlineage.client.OpenLineage.JobFacets;
+import io.openlineage.client.OpenLineage.OutputDataset;
+import io.openlineage.client.OpenLineage.Run;
+import io.openlineage.client.OpenLineage.RunEvent;
+import io.openlineage.client.OpenLineage.RunFacets;
+import java.io.IOException;
+import java.net.URI;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import lombok.SneakyThrows;
+import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.lineage.source.rest.LineageOperations;
+import org.apache.gravitino.rest.RESTUtils;
+import org.glassfish.hk2.api.Factory;
+import org.glassfish.jersey.internal.inject.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.TestProperties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class TestLineageOperations extends JerseyTest {
+
+  private ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+
+  private static class MockServletRequestFactory
+      implements Factory<HttpServletRequest>, Supplier<HttpServletRequest> {
+
+    @Override
+    public HttpServletRequest provide() {
+      return get();
+    }
+
+    @Override
+    public HttpServletRequest get() {
+      HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+      Mockito.when(request.getRemoteUser()).thenReturn(null);
+      return request;
+    }
+
+    @Override
+    public void dispose(HttpServletRequest instance) {}
+  }
+
+  private LineageDispatcher lineageDispatcher = 
Mockito.mock(LineageDispatcher.class);
+
+  @Override
+  protected Application configure() {
+    try {
+      forceSet(
+          TestProperties.CONTAINER_PORT, 
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    ResourceConfig resourceConfig = new ResourceConfig();
+    resourceConfig.register(LineageOperations.class);
+    resourceConfig.register(
+        new AbstractBinder() {
+          @Override
+          protected void configure() {
+            bind(lineageDispatcher).to(LineageDispatcher.class).ranked(2);
+            
bindFactory(MockServletRequestFactory.class).to(HttpServletRequest.class);
+          }
+        });
+
+    return resourceConfig;
+  }
+
+  @SneakyThrows
+  @Test
+  public void testUpdateLineageSucc() {
+    RunEvent runEvent = createRunEvent();
+    
Mockito.when(lineageDispatcher.dispatchLineageEvent(ArgumentMatchers.any())).thenReturn(true);
+    Response resp =
+        target("/lineage")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(runEvent, MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Status.CREATED.getStatusCode(), resp.getStatus());
+  }
+
+  @SneakyThrows
+  @Test
+  public void testUpdateLineageFailed() {
+    RunEvent runEvent = createRunEvent();
+    
Mockito.when(lineageDispatcher.dispatchLineageEvent(ArgumentMatchers.any())).thenReturn(false);
+    Response resp =
+        target("/lineage")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(runEvent, MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Status.TOO_MANY_REQUESTS.getStatusCode(), 
resp.getStatus());
+  }
+
+  private RunEvent createRunEvent() {
+    URI producer = URI.create("producer");
+    OpenLineage ol = new OpenLineage(producer);
+    UUID runId = UUID.randomUUID();
+    RunFacets runFacets =
+        ol.newRunFacetsBuilder().nominalTime(ol.newNominalTimeRunFacet(now, 
now)).build();
+    Run run = ol.newRun(runId, runFacets);
+    String name = "jobName";
+    String namespace = "namespace";
+    JobFacets jobFacets = ol.newJobFacetsBuilder().build();
+    Job job = ol.newJob(namespace, name, jobFacets);
+    List<InputDataset> inputs = Arrays.asList(ol.newInputDataset("ins", 
"input", null, null));
+    List<OutputDataset> outputs = Arrays.asList(ol.newOutputDataset("ons", 
"output", null, null));
+    RunEvent runStateUpdate =
+        ol.newRunEvent(now, OpenLineage.RunEvent.EventType.START, run, job, 
inputs, outputs);
+    return runStateUpdate;
+  }
+}
diff --git a/server/src/main/java/org/apache/gravitino/server/web/Utils.java 
b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
similarity index 96%
rename from server/src/main/java/org/apache/gravitino/server/web/Utils.java
rename to server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
index 69ec64daa0..80c2cd537d 100644
--- a/server/src/main/java/org/apache/gravitino/server/web/Utils.java
+++ b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.audit.FilesetAuditConstants;
@@ -48,6 +49,14 @@ public class Utils {
     return 
Response.status(Response.Status.OK).entity(t).type(MediaType.APPLICATION_JSON).build();
   }
 
+  public static Response created() {
+    return 
Response.status(Response.Status.CREATED).type(MediaType.APPLICATION_JSON).build();
+  }
+
+  public static Response tooManyRequests() {
+    return 
Response.status(Status.TOO_MANY_REQUESTS).type(MediaType.APPLICATION_JSON).build();
+  }
+
   public static Response ok() {
     return 
Response.status(Response.Status.NO_CONTENT).type(MediaType.APPLICATION_JSON).build();
   }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/TestUtils.java 
b/server-common/src/test/java/org/apache/gravitino/server/web/TestUtils.java
similarity index 100%
rename from server/src/test/java/org/apache/gravitino/server/web/TestUtils.java
rename to 
server-common/src/test/java/org/apache/gravitino/server/web/TestUtils.java

Reply via email to