This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new 842cfc3a46 [#6780] feat(lineage): support lineage endpoint for Gravitino server (#6865) 842cfc3a46 is described below commit 842cfc3a4666cd5f1cbc171f1e1bf9bd7d42a928 Author: FANNG <xiaoj...@datastrato.com> AuthorDate: Fri Apr 11 11:29:13 2025 +0800 [#6780] feat(lineage): support lineage endpoint for Gravitino server (#6865) ### What changes were proposed in this pull request? support lineage endpoint for Gravitino server ### Why are the changes needed? Fix: #6780 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? local test --- lineage/build.gradle.kts | 18 ++- .../{source/HTTPLineageSource.java => Utils.java} | 26 ++-- .../gravitino/lineage/sink/LineageLogSink.java | 9 +- .../lineage/source/HTTPLineageSource.java | 3 +- .../lineage/source/rest/LineageOperations.java | 77 +++++++++++ .../lineage/source/TestLineageOperations.java | 148 +++++++++++++++++++++ .../org/apache/gravitino/server/web/Utils.java | 9 ++ .../org/apache/gravitino/server/web/TestUtils.java | 0 8 files changed, 269 insertions(+), 21 deletions(-) diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts index 21da55dea3..dd462e3bf3 100644 --- a/lineage/build.gradle.kts +++ b/lineage/build.gradle.kts @@ -22,20 +22,36 @@ plugins { } dependencies { + implementation(project(":common")) implementation(project(":core")) implementation(project(":server-common")) + implementation(libs.bundles.jersey) implementation(libs.commons.lang3) implementation(libs.guava) - implementation(libs.slf4j.api) implementation(libs.jackson.datatype.jdk8) implementation(libs.jackson.datatype.jsr310) implementation(libs.jackson.databind) + implementation(libs.metrics.jersey2) implementation(libs.openlineage.java) { isTransitive = false } + implementation(libs.slf4j.api) + + annotationProcessor(libs.lombok) + compileOnly(libs.lombok) + testAnnotationProcessor(libs.lombok) + testCompileOnly(libs.lombok) + testImplementation(libs.jersey.test.framework.core) { + exclude(group = "org.junit.jupiter") + } + testImplementation(libs.jersey.test.framework.provider.jetty) { + exclude(group = "org.junit.jupiter") + } testImplementation(libs.junit.jupiter.api) testImplementation(libs.junit.jupiter.params) + testImplementation(libs.mockito.core) + testImplementation(libs.mockito.inline) testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java b/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java similarity index 61% copy from lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java copy to lineage/src/main/java/org/apache/gravitino/lineage/Utils.java index fd15eaa57f..cb3c90232e 100644 --- a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java +++ b/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java @@ -17,20 +17,22 @@ * under the License. */ -package org.apache.gravitino.lineage.source; +package org.apache.gravitino.lineage; -import com.google.common.collect.ImmutableSet; -import java.util.Map; -import java.util.Set; -import org.apache.gravitino.lineage.LineageDispatcher; -import org.apache.gravitino.server.web.SupportsRESTPackages; +import io.openlineage.server.OpenLineage.Job; +import io.openlineage.server.OpenLineage.Run; +import io.openlineage.server.OpenLineage.RunEvent; -public class HTTPLineageSource implements LineageSource, SupportsRESTPackages { - @Override - public void initialize(Map<String, String> configs, LineageDispatcher dispatcher) {} +public class Utils { + private Utils() {} - @Override - public Set<String> getRESTPackages() { - return ImmutableSet.of(); + public static String getRunID(RunEvent event) { + Run run = event.getRun(); + return run == null ? "Unknown" : run.getRunId().toString(); + } + + public static String getJobName(RunEvent event) { + Job job = event.getJob(); + return job == null ? "Unknown" : job.getName().toString(); } } diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java index 62367ad234..cdff7822f7 100644 --- a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java +++ b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java @@ -28,8 +28,8 @@ import com.fasterxml.jackson.databind.cfg.EnumFeature; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import io.openlineage.server.OpenLineage.Run; import io.openlineage.server.OpenLineage.RunEvent; +import org.apache.gravitino.lineage.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +61,8 @@ public class LineageLogSink implements LineageSink { } catch (JsonProcessingException e) { LOG.warn( "Process open lineage event failed, run id: {}, error message: {}", - getRunId(event), + Utils.getRunID(event), e.getMessage()); } } - - private String getRunId(RunEvent event) { - Run run = event.getRun(); - return run == null ? "Unknown" : run.getRunId().toString(); - } } diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java index fd15eaa57f..337a967420 100644 --- a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java +++ b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import java.util.Map; import java.util.Set; import org.apache.gravitino.lineage.LineageDispatcher; +import org.apache.gravitino.lineage.source.rest.LineageOperations; import org.apache.gravitino.server.web.SupportsRESTPackages; public class HTTPLineageSource implements LineageSource, SupportsRESTPackages { @@ -31,6 +32,6 @@ public class HTTPLineageSource implements LineageSource, SupportsRESTPackages { @Override public Set<String> getRESTPackages() { - return ImmutableSet.of(); + return ImmutableSet.of(LineageOperations.class.getPackage().getName()); } } diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/source/rest/LineageOperations.java b/lineage/src/main/java/org/apache/gravitino/lineage/source/rest/LineageOperations.java new file mode 100644 index 0000000000..2f5fa472b6 --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/source/rest/LineageOperations.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.source.rest; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import io.openlineage.server.OpenLineage; +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.gravitino.lineage.LineageDispatcher; +import org.apache.gravitino.metrics.MetricNames; +import org.apache.gravitino.server.web.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Path("/lineage") +public class LineageOperations { + + private static final Logger LOG = LoggerFactory.getLogger(LineageOperations.class); + private LineageDispatcher lineageDispatcher; + + @Context private HttpServletRequest httpRequest; + + @Inject + public LineageOperations(LineageDispatcher lineageDispatcher) { + this.lineageDispatcher = lineageDispatcher; + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Timed(name = "post-lineage." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "post-lineage", absolute = true) + public Response postLineage(OpenLineage.RunEvent event) { + LOG.info( + "Open lineage event, run id:{}, job name:{}", + org.apache.gravitino.lineage.Utils.getRunID(event), + org.apache.gravitino.lineage.Utils.getJobName(event)); + + try { + return Utils.doAs( + httpRequest, + () -> { + if (lineageDispatcher.dispatchLineageEvent(event)) { + return Utils.created(); + } else { + return Utils.tooManyRequests(); + } + }); + } catch (Exception e) { + LOG.warn("Process lineage failed,", e); + return Utils.internalError(e.getMessage(), e); + } + } +} diff --git a/lineage/src/test/java/org/apache/gravitino/lineage/source/TestLineageOperations.java b/lineage/src/test/java/org/apache/gravitino/lineage/source/TestLineageOperations.java new file mode 100644 index 0000000000..d2f428e3ee --- /dev/null +++ b/lineage/src/test/java/org/apache/gravitino/lineage/source/TestLineageOperations.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lineage.source; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.InputDataset; +import io.openlineage.client.OpenLineage.Job; +import io.openlineage.client.OpenLineage.JobFacets; +import io.openlineage.client.OpenLineage.OutputDataset; +import io.openlineage.client.OpenLineage.Run; +import io.openlineage.client.OpenLineage.RunEvent; +import io.openlineage.client.OpenLineage.RunFacets; +import java.io.IOException; +import java.net.URI; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import lombok.SneakyThrows; +import org.apache.gravitino.lineage.LineageDispatcher; +import org.apache.gravitino.lineage.source.rest.LineageOperations; +import org.apache.gravitino.rest.RESTUtils; +import org.glassfish.hk2.api.Factory; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.glassfish.jersey.test.TestProperties; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +public class TestLineageOperations extends JerseyTest { + + private ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + + private static class MockServletRequestFactory + implements Factory<HttpServletRequest>, Supplier<HttpServletRequest> { + + @Override + public HttpServletRequest provide() { + return get(); + } + + @Override + public HttpServletRequest get() { + HttpServletRequest request = Mockito.mock(HttpServletRequest.class); + Mockito.when(request.getRemoteUser()).thenReturn(null); + return request; + } + + @Override + public void dispose(HttpServletRequest instance) {} + } + + private LineageDispatcher lineageDispatcher = Mockito.mock(LineageDispatcher.class); + + @Override + protected Application configure() { + try { + forceSet( + TestProperties.CONTAINER_PORT, String.valueOf(RESTUtils.findAvailablePort(2000, 3000))); + } catch (IOException e) { + throw new RuntimeException(e); + } + + ResourceConfig resourceConfig = new ResourceConfig(); + resourceConfig.register(LineageOperations.class); + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + bind(lineageDispatcher).to(LineageDispatcher.class).ranked(2); + bindFactory(MockServletRequestFactory.class).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + @SneakyThrows + @Test + public void testUpdateLineageSucc() { + RunEvent runEvent = createRunEvent(); + Mockito.when(lineageDispatcher.dispatchLineageEvent(ArgumentMatchers.any())).thenReturn(true); + Response resp = + target("/lineage") + .request(MediaType.APPLICATION_JSON_TYPE) + .accept(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(runEvent, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Status.CREATED.getStatusCode(), resp.getStatus()); + } + + @SneakyThrows + @Test + public void testUpdateLineageFailed() { + RunEvent runEvent = createRunEvent(); + Mockito.when(lineageDispatcher.dispatchLineageEvent(ArgumentMatchers.any())).thenReturn(false); + Response resp = + target("/lineage") + .request(MediaType.APPLICATION_JSON_TYPE) + .accept(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(runEvent, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Status.TOO_MANY_REQUESTS.getStatusCode(), resp.getStatus()); + } + + private RunEvent createRunEvent() { + URI producer = URI.create("producer"); + OpenLineage ol = new OpenLineage(producer); + UUID runId = UUID.randomUUID(); + RunFacets runFacets = + ol.newRunFacetsBuilder().nominalTime(ol.newNominalTimeRunFacet(now, now)).build(); + Run run = ol.newRun(runId, runFacets); + String name = "jobName"; + String namespace = "namespace"; + JobFacets jobFacets = ol.newJobFacetsBuilder().build(); + Job job = ol.newJob(namespace, name, jobFacets); + List<InputDataset> inputs = Arrays.asList(ol.newInputDataset("ins", "input", null, null)); + List<OutputDataset> outputs = Arrays.asList(ol.newOutputDataset("ons", "output", null, null)); + RunEvent runStateUpdate = + ol.newRunEvent(now, OpenLineage.RunEvent.EventType.START, run, job, inputs, outputs); + return runStateUpdate; + } +} diff --git a/server/src/main/java/org/apache/gravitino/server/web/Utils.java b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java similarity index 96% rename from server/src/main/java/org/apache/gravitino/server/web/Utils.java rename to server-common/src/main/java/org/apache/gravitino/server/web/Utils.java index 69ec64daa0..80c2cd537d 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/Utils.java +++ b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java @@ -25,6 +25,7 @@ import java.util.Optional; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.UserPrincipal; import org.apache.gravitino.audit.FilesetAuditConstants; @@ -48,6 +49,14 @@ public class Utils { return Response.status(Response.Status.OK).entity(t).type(MediaType.APPLICATION_JSON).build(); } + public static Response created() { + return Response.status(Response.Status.CREATED).type(MediaType.APPLICATION_JSON).build(); + } + + public static Response tooManyRequests() { + return Response.status(Status.TOO_MANY_REQUESTS).type(MediaType.APPLICATION_JSON).build(); + } + public static Response ok() { return Response.status(Response.Status.NO_CONTENT).type(MediaType.APPLICATION_JSON).build(); } diff --git a/server/src/test/java/org/apache/gravitino/server/web/TestUtils.java b/server-common/src/test/java/org/apache/gravitino/server/web/TestUtils.java similarity index 100% rename from server/src/test/java/org/apache/gravitino/server/web/TestUtils.java rename to server-common/src/test/java/org/apache/gravitino/server/web/TestUtils.java