This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 7b58fca49a [#7396] feat(lineage): Add http lineage sink (#7765)
7b58fca49a is described below
commit 7b58fca49a09a1de1c60ee59de9b83e119887555
Author: Pacman <[email protected]>
AuthorDate: Wed Aug 6 09:33:05 2025 +0530
[#7396] feat(lineage): Add http lineage sink (#7765)
### What changes were proposed in this pull request?
Add http lineage sink
### Why the changes are needed?
Fixs: https://github.com/apache/gravitino/issues/7396
### How was this patch tested?
This PR was tested by integrating with Marquez in local.
Writing unit test cases is still pending. Will be added once the PR is
reviewed for required code changes.
---------
Co-authored-by: vishnu <[email protected]>
---
docs/lineage/gravitino-server-lineage.md | 11 ++++
gradle/libs.versions.toml | 9 ++-
lineage/build.gradle.kts | 4 ++
.../java/org/apache/gravitino/lineage/Utils.java | 11 ++++
.../gravitino/lineage/auth/ApiKeyAuthStrategy.java | 55 ++++++++++++++++
.../lineage/auth/AuthenticationFactory.java | 38 +++++++++++
.../auth/LineageServerAuthenticationStrategy.java | 39 ++++++++++++
.../gravitino/lineage/auth/NoAuthStrategy.java | 38 +++++++++++
.../gravitino/lineage/sink/LineageHttpSink.java | 74 ++++++++++++++++++++++
9 files changed, 278 insertions(+), 1 deletion(-)
diff --git a/docs/lineage/gravitino-server-lineage.md
b/docs/lineage/gravitino-server-lineage.md
index c5ccb468f1..03f998d372 100644
--- a/docs/lineage/gravitino-server-lineage.md
+++ b/docs/lineage/gravitino-server-lineage.md
@@ -55,6 +55,17 @@ curl -X POST \
Log sink prints the log in a separate log file `gravitino_lineage.log`, you
could change the default behavior in `conf/log4j2.properties`.
+## Lineage HTTP sink
+
+The HTTP sink supports sending the lineage event to an HTTP server that
follows the OpenLineage REST specification, like marquez
+| Property Name | Description
| Default Value | Required |
Since Version |
+|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------|----------|---------------|
+| gravitino.lineage.sinks | Specifies the lineage sink
implementation to use. For http sink `http`.
| `log`
| Yes | 0.9.0 |
+| gravitino.lineage.http.sinkClass | Fully qualified class name of the http
sink lineage sink implementation
`org.apache.gravitino.lineage.sink.LineageHttpSink`) |
`org.apache.gravitino.lineage.sink.LineageLogSink` | Yes | 0.9.0 |
+| gravitino.lineage.http.url | URL of the http sink server endpoint for
lineage collection(e.g., `http://localhost:5000`)
| none | Yes |
1.0.0 |
+| gravitino.lineage.http.authType | Authentication type for http sink
(options: `apiKey` or `none`)
| none | Yes
| 1.0.0 |
+| gravitino.lineage.http.apiKey | API key for authenticating with http
sink (required if authType=`apiKey`)
| none | No
| 1.0.0 |
+
## High watermark status
When the lineage sink operates slowly, lineage events accumulate in the async
queue. Once the queue size exceeds 90% of its capacity (high watermark
threshold), the lineage system enters a high watermark status. In this state,
the lineage source must implement retry and logging mechanisms for rejected
events to prevent system overload. For the HTTP source, it returns the `429 Too
Many Requests` status code to the client.
\ No newline at end of file
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 85c1a07427..3cde9cd7f1 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -43,7 +43,10 @@ hadoop3-aliyun = "3.3.1"
hadoop-minikdc = "3.3.1"
htrace-core4 = "4.1.0-incubating"
httpclient = "4.4.1"
-httpclient5 = "5.2.1"
+httpclient5 = "5.4.4"
+httpcore5 = "5.3.4"
+micrometer-core = "1.12.2"
+jackson-dataformat-yaml = "2.16.1"
mockserver = "5.15.0"
commons-csv = "1.12.0"
commons-lang3 = "3.14.0"
@@ -141,6 +144,7 @@ jackson-databind = { group = "com.fasterxml.jackson.core",
name = "jackson-datab
jackson-annotations = { group = "com.fasterxml.jackson.core", name =
"jackson-annotations", version.ref = "jackson" }
jackson-datatype-jdk8 = { group = "com.fasterxml.jackson.datatype", name =
"jackson-datatype-jdk8", version.ref = "jackson" }
jackson-datatype-jsr310 = { group = "com.fasterxml.jackson.datatype", name =
"jackson-datatype-jsr310", version.ref = "jackson" }
+jackson-dataformat-yaml = { group = "com.fasterxml.jackson.dataformat", name =
"jackson-dataformat-yaml", version.ref = "jackson-dataformat-yaml" }
guava = { group = "com.google.guava", name = "guava", version.ref = "guava" }
kerby-core = { group = "org.apache.kerby", name = "kerb-core", version.ref =
"kerby"}
kerby-simplekdc = { group = "org.apache.kerby", name = "kerb-simplekdc",
version.ref = "kerby"}
@@ -195,6 +199,9 @@ airlift-json = { group = "io.airlift", name = "json",
version.ref = "airlift-jso
airlift-resolver = { group = "io.airlift.resolver", name = "resolver",
version.ref = "airlift-resolver"}
httpclient = { group = "org.apache.httpcomponents", name = "httpclient",
version.ref = "httpclient" }
httpclient5 = { group = "org.apache.httpcomponents.client5", name =
"httpclient5", version.ref = "httpclient5" }
+httpcore5 = { group = "org.apache.httpcomponents.core5", name = "httpcore5",
version.ref = "httpcore5" }
+micrometer-core = { group = "io.micrometer", name = "micrometer-core",
version.ref = "micrometer-core" }
+
mockserver-netty = { group = "org.mock-server", name = "mockserver-netty",
version.ref = "mockserver" }
mockserver-client-java = { group = "org.mock-server", name =
"mockserver-client-java", version.ref = "mockserver" }
commons-csv = { group = "org.apache.commons", name = "commons-csv",
version.ref = "commons-csv" }
diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts
index 3706d42c70..a85ee88ecc 100644
--- a/lineage/build.gradle.kts
+++ b/lineage/build.gradle.kts
@@ -33,6 +33,10 @@ dependencies {
isTransitive = false
}
implementation(libs.slf4j.api)
+ implementation(libs.httpclient5)
+ implementation(libs.httpcore5)
+ implementation(libs.jackson.dataformat.yaml)
+ implementation(libs.micrometer.core)
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)
diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
b/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
index 087acf231f..cf2e904544 100644
--- a/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/Utils.java
@@ -19,9 +19,13 @@
package org.apache.gravitino.lineage;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.openlineage.client.OpenLineage;
import io.openlineage.server.OpenLineage.Job;
import io.openlineage.server.OpenLineage.Run;
import io.openlineage.server.OpenLineage.RunEvent;
+import org.apache.gravitino.server.web.ObjectMapperProvider;
public class Utils {
private Utils() {}
@@ -35,4 +39,11 @@ public class Utils {
Job job = event.getJob();
return job == null ? "Unknown" : job.getName();
}
+
+ public static OpenLineage.RunEvent getClientRunEvent(RunEvent event)
+ throws JsonProcessingException {
+ String value =
ObjectMapperProvider.objectMapper().writeValueAsString(event);
+ return ObjectMapperProvider.objectMapper()
+ .readValue(value, new TypeReference<OpenLineage.RunEvent>() {});
+ }
}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/auth/ApiKeyAuthStrategy.java
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/ApiKeyAuthStrategy.java
new file mode 100644
index 0000000000..4a8f18e5f5
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/ApiKeyAuthStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.lineage.auth;
+
+import io.openlineage.client.transports.HttpConfig;
+import io.openlineage.client.transports.TokenProvider;
+import java.net.URI;
+import java.util.Map;
+import javax.ws.rs.BadRequestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ApiKeyAuthStrategy implements LineageServerAuthenticationStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(ApiKeyAuthStrategy.class);
+
+ @Override
+ public HttpConfig configureHttpConfig(String url, Map<String, String>
configs) {
+ String apiKey = configs.get("apiKey");
+ if (apiKey == null || apiKey.trim().isEmpty()) {
+ throw new BadRequestException("API key is required when auth type is
apiKey");
+ }
+
+ HttpConfig config = new HttpConfig();
+ config.setUrl(URI.create(url));
+
+ // Create TokenProvider for API key
+ TokenProvider tokenProvider =
+ new TokenProvider() {
+ @Override
+ public String getToken() {
+ return apiKey;
+ }
+ };
+ config.setAuth(tokenProvider);
+
+ LOG.info("Configured API key authentication for OpenLineage client");
+ return config;
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/auth/AuthenticationFactory.java
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/AuthenticationFactory.java
new file mode 100644
index 0000000000..5c660d85e1
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/AuthenticationFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.lineage.auth;
+
+import javax.ws.rs.BadRequestException;
+
+public class AuthenticationFactory {
+ public static LineageServerAuthenticationStrategy createStrategy(String
authType) {
+ if (authType == null || authType.trim().isEmpty()) {
+ return new NoAuthStrategy();
+ }
+
+ switch (authType) {
+ case "apiKey":
+ return new ApiKeyAuthStrategy();
+ case "none":
+ return new NoAuthStrategy();
+ default:
+ throw new BadRequestException("Unsupported authentication ");
+ }
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/auth/LineageServerAuthenticationStrategy.java
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/LineageServerAuthenticationStrategy.java
new file mode 100644
index 0000000000..f9f4eab61c
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/LineageServerAuthenticationStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.lineage.auth;
+
+import io.openlineage.client.transports.HttpConfig;
+import java.util.Map;
+
+/**
+ * Defines an authentication strategy for lineage server communication.
Implementations of this
+ * interface provide HTTP configuration with appropriate authentication
mechanisms for lineage event
+ * transport.
+ */
+public interface LineageServerAuthenticationStrategy {
+
+ /**
+ * Configures HTTP transport with authentication parameters for lineage
event submission.
+ *
+ * @param url The target URL of the lineage server endpoint
+ * @param configs A map of configuration properties required for
authentication.
+ * @return Configured {@link HttpConfig} instance with authentication
parameters applied
+ */
+ HttpConfig configureHttpConfig(String url, Map<String, String> configs);
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/auth/NoAuthStrategy.java
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/NoAuthStrategy.java
new file mode 100644
index 0000000000..cb9c9e81b4
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/auth/NoAuthStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.lineage.auth;
+
+import io.openlineage.client.transports.HttpConfig;
+import java.net.URI;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// No authentication
+class NoAuthStrategy implements LineageServerAuthenticationStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(NoAuthStrategy.class);
+
+ @Override
+ public HttpConfig configureHttpConfig(String url, Map<String, String>
configs) {
+ LOG.info("Using no authentication for OpenLineage client");
+ HttpConfig config = new HttpConfig();
+ config.setUrl(URI.create(url));
+ return config;
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageHttpSink.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageHttpSink.java
new file mode 100644
index 0000000000..8c4aab100a
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageHttpSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.lineage.sink;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.openlineage.client.OpenLineageClient;
+import io.openlineage.client.transports.HttpConfig;
+import io.openlineage.client.transports.HttpTransport;
+import io.openlineage.server.OpenLineage;
+import java.util.Map;
+import org.apache.gravitino.lineage.Utils;
+import org.apache.gravitino.lineage.auth.AuthenticationFactory;
+import org.apache.gravitino.lineage.auth.LineageServerAuthenticationStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LineageHttpSink implements LineageSink {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LineageHttpSink.class);
+ private OpenLineageClient client;
+
+ @Override
+ public void initialize(Map<String, String> configs) {
+ String httpSinkUrl = configs.getOrDefault("url", "http://localhost:5000/");
+ String authType = configs.get("authType");
+ LOG.info("Http sink URL: {}, authentication type: {}", httpSinkUrl,
authType);
+ LineageServerAuthenticationStrategy authStrategy =
+ AuthenticationFactory.createStrategy(authType);
+
+ HttpConfig httpConfig = authStrategy.configureHttpConfig(httpSinkUrl,
configs);
+
+ HttpTransport transport = new HttpTransport(httpConfig);
+
+ client = OpenLineageClient.builder().transport(transport).build();
+ }
+
+ @Override
+ public void sink(OpenLineage.RunEvent runEvent) {
+ try {
+ client.emit(Utils.getClientRunEvent(runEvent));
+ LOG.info("Sent lineage event to http sink: {}", runEvent);
+ } catch (JsonProcessingException e) {
+ LOG.warn("Could not parse lineage run event", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}