This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bcfba0b4e [#6779] feat(core): Support lineage framework in Gravitino 
(#6782)
8bcfba0b4e is described below

commit 8bcfba0b4ef0b433eee9d1ec8e4891e5c55c69b3
Author: FANNG <xiaoj...@datastrato.com>
AuthorDate: Tue Apr 8 12:18:50 2025 +0800

    [#6779] feat(core): Support lineage framework in Gravitino (#6782)
    
    ### What changes were proposed in this pull request?
    
    Support lineage framework in Gravitino, lineage endpoint and lineage
    sink manager will be proposed in separate PR.
    
    Total workflow draft PR: https://github.com/apache/gravitino/pull/6723
    
    The main work flow:
    1. Gravitino server creates lineage service which manages lineage source
    and lineage sinks.
    2. lineage source implementation receives lineage run event and
    dispatches to lineage service.
    3. lineage service process the run event and dispatch to lineage sink
    manager.
    4. lineage sink manager manges the life cycle of link lineage sinks,
    will dispatch run event to lineage sinks.
    
    ### Why are the changes needed?
    
    Fix: #6779
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    setup Spark&Marquez environment and test the work flow.
---
 LICENSE.bin                                        |   1 +
 conf/log4j2.properties.template                    |  24 ++++
 .../org/apache/gravitino/utils/ClassUtils.java     |  30 +++++
 gradle/libs.versions.toml                          |   2 +
 lineage/build.gradle.kts                           |  41 +++++++
 .../apache/gravitino/lineage/LineageConfig.java    | 132 +++++++++++++++++++++
 .../gravitino/lineage/LineageDispatcher.java       |  57 +++++++++
 .../apache/gravitino/lineage/LineageService.java   |  82 +++++++++++++
 .../lineage/processor/LineageProcessor.java        |  34 ++++++
 .../gravitino/lineage/processor/NoopProcessor.java |  30 +++++
 .../gravitino/lineage/sink/LineageLogSink.java     |  73 ++++++++++++
 .../apache/gravitino/lineage/sink/LineageSink.java |  46 +++++++
 .../gravitino/lineage/sink/LineageSinkManager.java |  41 +++++++
 .../lineage/source/HTTPLineageSource.java          |  36 ++++++
 .../gravitino/lineage/source/LineageSource.java    |  43 +++++++
 .../gravitino/lineage/TestLineageConfig.java       | 101 ++++++++++++++++
 .../gravitino/server/web/SupportsRESTPackages.java |  36 ++++++
 server/build.gradle.kts                            |   1 +
 .../apache/gravitino/server/GravitinoServer.java   |  28 +++--
 settings.gradle.kts                                |   1 +
 20 files changed, 832 insertions(+), 7 deletions(-)

diff --git a/LICENSE.bin b/LICENSE.bin
index d1dddd5279..58f7d0793d 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -258,6 +258,7 @@
    Airlift
    The Netty Project
    Open Telemetry
+   Open Lineage
    Trino
    Jakarta Dependency Injection
    Jakarta Bean Validation
diff --git a/conf/log4j2.properties.template b/conf/log4j2.properties.template
index bd26f33e37..f1eb30fcd9 100644
--- a/conf/log4j2.properties.template
+++ b/conf/log4j2.properties.template
@@ -47,6 +47,30 @@ appender.rolling.strategy.delete.ifLastModified.type = 
IfLastModified
 # Delete all files older than 30 days
 appender.rolling.strategy.delete.ifLastModified.age = 30d
 
+
+## use seperate file for lineage log
+appender.lineage_file.type=RollingFile
+appender.lineage_file.name=lineage_file
+appender.lineage_file.fileName=${basePath}/gravitino_lineage.log
+appender.lineage_file.filePattern=${basePath}/gravitino_lineage_%d{yyyyMMdd}.log.gz
+appender.lineage_file.layout.type=PatternLayout
+appender.lineage_file.layout.pattern=[%d{yyyy-MM-dd HH:mm:ss}] %m%n
+appender.lineage_file.policies.type = Policies
+
+appender.lineage_file.policies.time.type = TimeBasedTriggeringPolicy
+appender.lineage_file.policies.time.interval = 1
+appender.lineage_file.policies.time.modulate = true
+appender.lineage_file.strategy.type = DefaultRolloverStrategy
+appender.lineage_file.strategy.delete.type = Delete
+appender.lineage_file.strategy.delete.basePath = ${basePath}
+appender.lineage_file.strategy.delete.maxDepth = 10
+appender.lineage_file.strategy.delete.ifLastModified.type = IfLastModified
+
+logger.lineage.name = org.apache.gravitino.lineage.LineageLogSink$LineageLogger
+logger.lineage.level = info
+logger.lineage.appenderRef.lineage_file.ref = lineage_file
+logger.lineage.additivity = false
+
 # Configure root logger
 rootLogger.level = info
 rootLogger.appenderRef.rolling.ref = fileLogger
diff --git a/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java 
b/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java
new file mode 100644
index 0000000000..7307394eb8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.utils;
+
+public class ClassUtils {
+  public static <T> T loadClass(String className) {
+    try {
+      return (T) 
Class.forName(className).getDeclaredConstructor().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 530d502653..a157bb13d0 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -118,6 +118,7 @@ datanucleus-jdo = "3.2.0-m3"
 hudi = "0.15.0"
 google-auth = "1.28.0"
 aliyun-credentials = "0.3.12"
+openlineage = "1.29.0"
 
 [libraries]
 aws-iam = { group = "software.amazon.awssdk", name = "iam", version.ref = 
"awssdk" }
@@ -279,6 +280,7 @@ google-auth-credentials = { group = "com.google.auth", name 
= "google-auth-libra
 
 aliyun-credentials-sdk = { group='com.aliyun', name='credentials-java', 
version.ref='aliyun-credentials' }
 flinkjdbc = {group='org.apache.flink',name='flink-connector-jdbc', 
version.ref='flinkjdbc'}
+openlineage-java= { group = "io.openlineage", name = "openlineage-java", 
version.ref = "openlineage" }
 
 [bundles]
 log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", 
"log4j-12-api", "log4j-layout-template-json"]
diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts
new file mode 100644
index 0000000000..21da55dea3
--- /dev/null
+++ b/lineage/build.gradle.kts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("java")
+}
+
+dependencies {
+  implementation(project(":core"))
+  implementation(project(":server-common"))
+  implementation(libs.commons.lang3)
+  implementation(libs.guava)
+  implementation(libs.slf4j.api)
+  implementation(libs.jackson.datatype.jdk8)
+  implementation(libs.jackson.datatype.jsr310)
+  implementation(libs.jackson.databind)
+  implementation(libs.openlineage.java) {
+    isTransitive = false
+  }
+
+  testImplementation(libs.junit.jupiter.api)
+  testImplementation(libs.junit.jupiter.params)
+
+  testRuntimeOnly(libs.junit.jupiter.engine)
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
new file mode 100644
index 0000000000..ac7185336b
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
@@ -0,0 +1,132 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.gravitino.lineage.processor.NoopProcessor;
+import org.apache.gravitino.lineage.sink.LineageLogSink;
+import org.apache.gravitino.lineage.source.HTTPLineageSource;
+
+public class LineageConfig extends Config {
+
+  public static final String LINEAGE_CONFIG_PREFIX = "gravitino.lineage.";
+  public static final String LINEAGE_CONFIG_SINKS = "sinks";
+  public static final String LINEAGE_CONFIG_SOURCE = "source";
+  public static final String LINEAGE_SOURCE_CLASS_NAME = "sourceClass";
+  public static final String LINEAGE_PROCESSOR_CLASS_NAME = "processorClass";
+  public static final String LINEAGE_SINK_CLASS_NAME = "sinkClass";
+  public static final String LINEAGE_HTTP_SOURCE_CLASS_NAME = 
HTTPLineageSource.class.getName();
+
+  public static final String LINEAGE_LOG_SINK_NAME = "log";
+  public static final String LINEAGE_HTTP_SOURCE_NAME = "http";
+
+  private static final Splitter splitter = Splitter.on(",");
+
+  public static final ConfigEntry<String> SOURCE_NAME =
+      new ConfigBuilder(LINEAGE_CONFIG_SOURCE)
+          .doc("The name of lineage event source")
+          .version(ConfigConstants.VERSION_0_9_0)
+          .stringConf()
+          .createWithDefault(LINEAGE_HTTP_SOURCE_NAME);
+
+  public static final ConfigEntry<String> PROCESSOR_CLASS =
+      new ConfigBuilder(LINEAGE_PROCESSOR_CLASS_NAME)
+          .doc("The class name of lineage event processor")
+          .version(ConfigConstants.VERSION_0_9_0)
+          .stringConf()
+          .createWithDefault(NoopProcessor.class.getName());
+
+  public static final ConfigEntry<String> SINKS =
+      new ConfigBuilder(LINEAGE_CONFIG_SINKS)
+          .doc("The sinks of lineage event")
+          .version(ConfigConstants.VERSION_0_9_0)
+          .stringConf()
+          .createWithDefault(LINEAGE_LOG_SINK_NAME);
+
+  public LineageConfig(Map<String, String> properties) {
+    super(false);
+    loadFromMap(properties, k -> true);
+  }
+
+  public String source() {
+    return get(SOURCE_NAME);
+  }
+
+  public String sourceClass() {
+    if (source().equals(LINEAGE_HTTP_SOURCE_NAME)) {
+      return LINEAGE_HTTP_SOURCE_CLASS_NAME;
+    }
+    String sourceConfig = source() + "." + LINEAGE_SOURCE_CLASS_NAME;
+    String sourceClass = getRawString(sourceConfig);
+    Preconditions.checkArgument(StringUtils.isNotBlank(sourceClass), 
sourceConfig + " is not set");
+    return sourceClass;
+  }
+
+  public String processorClass() {
+    return get(PROCESSOR_CLASS);
+  }
+
+  public Map<String, String> getSinkConfigs() {
+    List<String> sinks = sinks();
+
+    Map<String, String> config = getAllConfig();
+    Map<String, String> m = new HashMap(config);
+
+    String sinkString = get(SINKS);
+    if (!m.containsKey(LINEAGE_CONFIG_SINKS)) {
+      m.put(LINEAGE_CONFIG_SINKS, sinkString);
+    }
+
+    String logClassConfigKey =
+        LineageConfig.LINEAGE_LOG_SINK_NAME + "." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME;
+    if (sinks.contains(LINEAGE_LOG_SINK_NAME) && 
!config.containsKey(logClassConfigKey)) {
+      m.put(logClassConfigKey, LineageLogSink.class.getName());
+    }
+
+    sinks.stream()
+        .forEach(
+            sinkName -> {
+              String sinkClassConfig = sinkName + "." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME;
+              Preconditions.checkArgument(
+                  m.containsKey(sinkClassConfig), sinkClassConfig + " is not 
set");
+            });
+
+    return m;
+  }
+
+  public List<String> sinks() {
+    String sinks = get(SINKS);
+    return splitter
+        .omitEmptyStrings()
+        .trimResults()
+        .splitToStream(sinks)
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java
new file mode 100644
index 0000000000..ffa7f92058
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.io.Closeable;
+
+/**
+ * Dispatches lineage events to configured sinks after processing. 
Implementations should handle
+ * initialization, event processing, and resource cleanup through {@link 
Closeable}.
+ *
+ * <p>Typical lifecycles:
+ *
+ * <ol>
+ *   <li>{@link #initialize(LineageConfig)} with required configurations
+ *   <li>Repeated calls to {@link #dispatchLineageEvent(RunEvent)}
+ *   <li>{@link #close()} for resource cleanup
+ * </ol>
+ */
+public interface LineageDispatcher extends Closeable {
+
+  /**
+   * Initializes the dispatcher with configuration. Must be called before 
event dispatching.
+   *
+   * @param lineageConfig configuration for lineage source, processor and 
sinks.
+   */
+  void initialize(LineageConfig lineageConfig);
+
+  /**
+   * Dispatches a lineage run event to the configured sink after processing.
+   *
+   * <p>Callers should implement appropriate retry/logging mechanisms for 
rejected events to prevent
+   * system overload.
+   *
+   * @param runEvent The OpenLineage run event to be processed and dispatched. 
Must not be null.
+   * @return {@code true} if the event was successfully processed and 
dispatched to the sinks,
+   *     {@code false} if the event was rejected due to the overload of 
lineage sinks.
+   */
+  boolean dispatchLineageEvent(RunEvent runEvent);
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
new file mode 100644
index 0000000000..2667d3e5ca
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import com.google.common.collect.ImmutableSet;
+import io.openlineage.server.OpenLineage;
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.util.Set;
+import org.apache.gravitino.lineage.processor.LineageProcessor;
+import org.apache.gravitino.lineage.sink.LineageSinkManager;
+import org.apache.gravitino.lineage.source.LineageSource;
+import org.apache.gravitino.server.web.SupportsRESTPackages;
+import org.apache.gravitino.utils.ClassUtils;
+
+/**
+ * The LineageService manages the life cycle of lineage sinks, sources, and 
processors. It provides
+ * {@code dispatchLineageEvent} method for lineage source to dispatch lineage 
events to the sinks.
+ */
+public class LineageService implements LineageDispatcher, SupportsRESTPackages 
{
+  private LineageSinkManager sinkManager;
+  private LineageSource source;
+  private LineageProcessor processor;
+
+  public void initialize(LineageConfig lineageConfig) {
+    String sourceName = lineageConfig.source();
+    String sourceClass = lineageConfig.sourceClass();
+    this.source = ClassUtils.loadClass(sourceClass);
+    this.sinkManager = new LineageSinkManager();
+
+    String processorClassName = lineageConfig.processorClass();
+    this.processor = ClassUtils.loadClass(processorClassName);
+
+    sinkManager.initialize(lineageConfig.sinks(), 
lineageConfig.getSinkConfigs());
+    source.initialize(lineageConfig.getConfigsWithPrefix(sourceName), this);
+  }
+
+  @Override
+  public void close() {
+    if (source != null) {
+      source.close();
+    }
+    if (sinkManager != null) {
+      sinkManager.close();
+    }
+  }
+
+  @Override
+  public boolean dispatchLineageEvent(OpenLineage.RunEvent runEvent) {
+    if (sinkManager.isHighWatermark()) {
+      return false;
+    }
+
+    RunEvent newEvent = processor.process(runEvent);
+    sinkManager.sink(newEvent);
+    return true;
+  }
+
+  @Override
+  public Set<String> getRESTPackages() {
+    if (source instanceof SupportsRESTPackages) {
+      return ((SupportsRESTPackages) source).getRESTPackages();
+    }
+    return ImmutableSet.of();
+  }
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java
new file mode 100644
index 0000000000..1a08e0cc0d
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.processor;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+
+/** Processes {@link RunEvent} objects to transform or enrich their lineage 
data. */
+public interface LineageProcessor {
+
+  /**
+   * Processes a run event and returns the modified instance.
+   *
+   * @param runEvent The original run event to process.
+   * @return Processed run event instance with updated lineage data.
+   */
+  RunEvent process(RunEvent runEvent);
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java
new file mode 100644
index 0000000000..cb09eaf705
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.processor;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+
+public class NoopProcessor implements LineageProcessor {
+
+  @Override
+  public RunEvent process(RunEvent runEvent) {
+    return runEvent;
+  }
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
new file mode 100644
index 0000000000..62367ad234
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.cfg.EnumFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.openlineage.server.OpenLineage.Run;
+import io.openlineage.server.OpenLineage.RunEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LineageLogSink implements LineageSink {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LineageLogSink.class);
+  private ObjectMapper objectMapper =
+      JsonMapper.builder()
+          .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+          .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
+          .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
+          .build()
+          .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+          .registerModule(new JavaTimeModule())
+          .registerModule(new Jdk8Module());
+  private LineageLogger logger = new LineageLogger();
+
+  private static class LineageLogger {
+    private static final Logger LINEAGE_LOG = 
LoggerFactory.getLogger(LineageLogger.class);
+
+    public void log(String lineageString) {
+      LINEAGE_LOG.info(lineageString);
+    }
+  }
+
+  @Override
+  public void sink(RunEvent event) {
+    try {
+      logger.log(objectMapper.writeValueAsString(event));
+    } catch (JsonProcessingException e) {
+      LOG.warn(
+          "Process open lineage event failed, run id: {}, error message: {}",
+          getRunId(event),
+          e.getMessage());
+    }
+  }
+
+  private String getRunId(RunEvent event) {
+    Run run = event.getRun();
+    return run == null ? "Unknown" : run.getRunId().toString();
+  }
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java
new file mode 100644
index 0000000000..b765138516
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage;
+import java.io.Closeable;
+import java.util.Map;
+
+/** The LineageSink interface defines a closable component responsible for 
sinking lineage event. */
+public interface LineageSink extends Closeable {
+
+  /**
+   * Initializes the lineage sink with the provided configuration.
+   *
+   * @param configs A map representing the configuration for the sink.
+   */
+  default void initialize(Map<String, String> configs) {}
+
+  /** Closes the lineage sink and releases associated resources. */
+  @Override
+  default void close() {}
+
+  /**
+   * Sinks the given lineage run event.
+   *
+   * @param event The lineage run event to be processed.
+   */
+  void sink(OpenLineage.RunEvent event);
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
new file mode 100644
index 0000000000..f01dcc80e7
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class LineageSinkManager implements Closeable {
+
+  public void initialize(List<String> sinks, Map<String, String> 
LineageConfigs) {}
+
+  // Checks if the sink queue size surpasses the threshold to avoid 
overwhelming lineage sinks.
+  public boolean isHighWatermark() {
+    return false;
+  }
+
+  public void sink(RunEvent runEvent) {}
+
+  @Override
+  public void close() {}
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
new file mode 100644
index 0000000000..fd15eaa57f
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.server.web.SupportsRESTPackages;
+
+public class HTTPLineageSource implements LineageSource, SupportsRESTPackages {
+  @Override
+  public void initialize(Map<String, String> configs, LineageDispatcher 
dispatcher) {}
+
+  @Override
+  public Set<String> getRESTPackages() {
+    return ImmutableSet.of();
+  }
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java
new file mode 100644
index 0000000000..d633bdbd2d
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.source;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.lineage.LineageDispatcher;
+
+/**
+ * The LineageSource interface defines a closable data source for receiving 
and dispatching lineage
+ * information.
+ */
+public interface LineageSource extends Closeable {
+
+  /**
+   * Initializes the data source with the given configurations and a lineage 
dispatcher.
+   *
+   * @param configs A map containing configuration information for the data 
source.
+   * @param dispatcher A dispatcher used to distribute lineage event.
+   */
+  default void initialize(Map<String, String> configs, LineageDispatcher 
dispatcher) {}
+
+  /** Closes the data source and releases related resources. */
+  @Override
+  default void close() {}
+}
diff --git 
a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java 
b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
new file mode 100644
index 0000000000..d4dbfa8a3a
--- /dev/null
+++ b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.lineage.sink.LineageLogSink;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestLineageConfig {
+
+  @Test
+  void testLineageSource() {
+    // default config with HTTP source
+    LineageConfig lineageConfig = new LineageConfig(ImmutableMap.of());
+    Assertions.assertEquals(LineageConfig.LINEAGE_HTTP_SOURCE_NAME, 
lineageConfig.source());
+    Assertions.assertEquals(
+        LineageConfig.LINEAGE_HTTP_SOURCE_CLASS_NAME, 
lineageConfig.sourceClass());
+
+    // config with custom source
+    lineageConfig =
+        new LineageConfig(
+            ImmutableMap.of(
+                LineageConfig.LINEAGE_CONFIG_SOURCE,
+                "source1",
+                "source1." + LineageConfig.LINEAGE_SOURCE_CLASS_NAME,
+                "test-class"));
+    Assertions.assertEquals("source1", lineageConfig.source());
+    Assertions.assertEquals("test-class", lineageConfig.sourceClass());
+
+    LineageConfig lineageConfig2 =
+        new LineageConfig(ImmutableMap.of(LineageConfig.LINEAGE_CONFIG_SOURCE, 
"source2"));
+
+    Assertions.assertThrowsExactly(
+        IllegalArgumentException.class, () -> lineageConfig2.sourceClass());
+  }
+
+  @Test
+  void testGetSinkConfigs() {
+    // default config with log sink
+    LineageConfig lineageConfig = new LineageConfig(ImmutableMap.of());
+    Map<String, String> sinkConfigs = lineageConfig.getSinkConfigs();
+    String sinks = sinkConfigs.get(LineageConfig.LINEAGE_CONFIG_SINKS);
+    Assertions.assertEquals(LineageConfig.LINEAGE_LOG_SINK_NAME, sinks);
+    String className =
+        sinkConfigs.get(
+            LineageConfig.LINEAGE_LOG_SINK_NAME + "." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME);
+    Assertions.assertEquals(LineageLogSink.class.getName(), className);
+
+    // config multi sinks
+    Map<String, String> config2 =
+        ImmutableMap.of(
+            LineageConfig.LINEAGE_CONFIG_SINKS,
+            "sink1,sink2",
+            "sink1." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
+            "test-class",
+            "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
+            "test-class2");
+    lineageConfig = new LineageConfig(config2);
+    sinkConfigs = lineageConfig.getSinkConfigs();
+    sinks = sinkConfigs.get(LineageConfig.LINEAGE_CONFIG_SINKS);
+    Assertions.assertEquals("sink1,sink2", sinks);
+    Assertions.assertEquals(
+        "test-class", sinkConfigs.get("sink1." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME));
+    Assertions.assertEquals(
+        "test-class2", sinkConfigs.get("sink2." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME));
+
+    // test missing sink1 class name
+    Map<String, String> config3 =
+        ImmutableMap.of(
+            LineageConfig.LINEAGE_CONFIG_SINKS,
+            "sink1,sink2",
+            "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
+            "test-class2");
+
+    Assertions.assertThrowsExactly(
+        IllegalArgumentException.class,
+        () -> {
+          LineageConfig lineageConfig1 = new LineageConfig(config3);
+          lineageConfig1.getSinkConfigs();
+        });
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java
 
b/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java
new file mode 100644
index 0000000000..80c89f05c9
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.server.web;
+
+import java.util.Set;
+
+/**
+ * This interface provides a method to retrieve a set of REST package names. 
These package names can
+ * be used to be injected into a Jetty service, which is helpful for the Jetty 
service to locate and
+ * handle REST resources.
+ */
+public interface SupportsRESTPackages {
+  /**
+   * Retrieves a set of REST package names.
+   *
+   * @return A set containing the names of REST packages.
+   */
+  Set<String> getRESTPackages();
+}
diff --git a/server/build.gradle.kts b/server/build.gradle.kts
index 4fe6ae2707..b58e42773d 100644
--- a/server/build.gradle.kts
+++ b/server/build.gradle.kts
@@ -27,6 +27,7 @@ dependencies {
   implementation(project(":api"))
   implementation(project(":common"))
   implementation(project(":core"))
+  implementation(project(":lineage"))
   implementation(project(":server-common"))
   implementation(libs.bundles.jetty)
   implementation(libs.bundles.jersey)
diff --git 
a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java 
b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index 0c730439b1..accf65cb5f 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -18,9 +18,8 @@
  */
 package org.apache.gravitino.server;
 
-import com.google.common.collect.Lists;
 import java.io.File;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Properties;
 import javax.servlet.Servlet;
 import org.apache.gravitino.Configs;
@@ -33,6 +32,9 @@ import org.apache.gravitino.catalog.SchemaDispatcher;
 import org.apache.gravitino.catalog.TableDispatcher;
 import org.apache.gravitino.catalog.TopicDispatcher;
 import org.apache.gravitino.credential.CredentialOperationDispatcher;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.lineage.LineageService;
 import org.apache.gravitino.metalake.MetalakeDispatcher;
 import org.apache.gravitino.metrics.MetricsSystem;
 import org.apache.gravitino.metrics.source.MetricsSource;
@@ -75,10 +77,13 @@ public class GravitinoServer extends ResourceConfig {
 
   private final GravitinoEnv gravitinoEnv;
 
+  private final LineageService lineageService;
+
   public GravitinoServer(ServerConfig config, GravitinoEnv gravitinoEnv) {
-    serverConfig = config;
-    server = new JettyServer();
+    this.serverConfig = config;
+    this.server = new JettyServer();
     this.gravitinoEnv = gravitinoEnv;
+    this.lineageService = new LineageService();
   }
 
   public void initialize() {
@@ -90,6 +95,9 @@ public class GravitinoServer extends ResourceConfig {
 
     ServerAuthenticator.getInstance().initialize(serverConfig);
 
+    lineageService.initialize(
+        new 
LineageConfig(serverConfig.getConfigsWithPrefix(LineageConfig.LINEAGE_CONFIG_PREFIX)));
+
     // initialize Jersey REST API resources.
     initializeRestApi();
   }
@@ -99,9 +107,11 @@ public class GravitinoServer extends ResourceConfig {
   }
 
   private void initializeRestApi() {
-    List<String> restApiPackages = 
Lists.newArrayList("org.apache.gravitino.server.web.rest");
-    
restApiPackages.addAll(serverConfig.get(Configs.REST_API_EXTENSION_PACKAGES));
-    packages(restApiPackages.toArray(new String[0]));
+    HashSet<String> restApiPackagesSet = new HashSet<>();
+    restApiPackagesSet.add("org.apache.gravitino.server.web.rest");
+    
restApiPackagesSet.addAll(serverConfig.get(Configs.REST_API_EXTENSION_PACKAGES));
+    restApiPackagesSet.addAll(lineageService.getRESTPackages());
+    packages(restApiPackagesSet.toArray(new String[0]));
 
     boolean enableAuthorization = 
serverConfig.get(Configs.ENABLE_AUTHORIZATION);
     register(
@@ -120,6 +130,7 @@ public class GravitinoServer extends ResourceConfig {
                 .to(CredentialOperationDispatcher.class)
                 .ranked(1);
             
bind(gravitinoEnv.modelDispatcher()).to(ModelDispatcher.class).ranked(1);
+            bind(lineageService).to(LineageDispatcher.class).ranked(1);
           }
         });
     register(JsonProcessingExceptionMapper.class);
@@ -161,6 +172,9 @@ public class GravitinoServer extends ResourceConfig {
   public void stop() {
     server.stop();
     gravitinoEnv.shutdown();
+    if (lineageService != null) {
+      lineageService.close();
+    }
   }
 
   public static void main(String[] args) {
diff --git a/settings.gradle.kts b/settings.gradle.kts
index c865e14e7a..d4262e69c3 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -82,3 +82,4 @@ include(":bundles:gcp", ":bundles:gcp-bundle")
 include(":bundles:aliyun", ":bundles:aliyun-bundle")
 include(":bundles:azure", ":bundles:azure-bundle")
 include(":catalogs:hadoop-common")
+include(":lineage")


Reply via email to