This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new 8bcfba0b4e [#6779] feat(core): Support lineage framework in Gravitino (#6782) 8bcfba0b4e is described below commit 8bcfba0b4ef0b433eee9d1ec8e4891e5c55c69b3 Author: FANNG <xiaoj...@datastrato.com> AuthorDate: Tue Apr 8 12:18:50 2025 +0800 [#6779] feat(core): Support lineage framework in Gravitino (#6782) ### What changes were proposed in this pull request? Support lineage framework in Gravitino, lineage endpoint and lineage sink manager will be proposed in separate PR. Total workflow draft PR: https://github.com/apache/gravitino/pull/6723 The main work flow: 1. Gravitino server creates lineage service which manages lineage source and lineage sinks. 2. lineage source implementation receives lineage run event and dispatches to lineage service. 3. lineage service process the run event and dispatch to lineage sink manager. 4. lineage sink manager manges the life cycle of link lineage sinks, will dispatch run event to lineage sinks. ### Why are the changes needed? Fix: #6779 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? setup Spark&Marquez environment and test the work flow. --- LICENSE.bin | 1 + conf/log4j2.properties.template | 24 ++++ .../org/apache/gravitino/utils/ClassUtils.java | 30 +++++ gradle/libs.versions.toml | 2 + lineage/build.gradle.kts | 41 +++++++ .../apache/gravitino/lineage/LineageConfig.java | 132 +++++++++++++++++++++ .../gravitino/lineage/LineageDispatcher.java | 57 +++++++++ .../apache/gravitino/lineage/LineageService.java | 82 +++++++++++++ .../lineage/processor/LineageProcessor.java | 34 ++++++ .../gravitino/lineage/processor/NoopProcessor.java | 30 +++++ .../gravitino/lineage/sink/LineageLogSink.java | 73 ++++++++++++ .../apache/gravitino/lineage/sink/LineageSink.java | 46 +++++++ .../gravitino/lineage/sink/LineageSinkManager.java | 41 +++++++ .../lineage/source/HTTPLineageSource.java | 36 ++++++ .../gravitino/lineage/source/LineageSource.java | 43 +++++++ .../gravitino/lineage/TestLineageConfig.java | 101 ++++++++++++++++ .../gravitino/server/web/SupportsRESTPackages.java | 36 ++++++ server/build.gradle.kts | 1 + .../apache/gravitino/server/GravitinoServer.java | 28 +++-- settings.gradle.kts | 1 + 20 files changed, 832 insertions(+), 7 deletions(-) diff --git a/LICENSE.bin b/LICENSE.bin index d1dddd5279..58f7d0793d 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -258,6 +258,7 @@ Airlift The Netty Project Open Telemetry + Open Lineage Trino Jakarta Dependency Injection Jakarta Bean Validation diff --git a/conf/log4j2.properties.template b/conf/log4j2.properties.template index bd26f33e37..f1eb30fcd9 100644 --- a/conf/log4j2.properties.template +++ b/conf/log4j2.properties.template @@ -47,6 +47,30 @@ appender.rolling.strategy.delete.ifLastModified.type = IfLastModified # Delete all files older than 30 days appender.rolling.strategy.delete.ifLastModified.age = 30d + +## use seperate file for lineage log +appender.lineage_file.type=RollingFile +appender.lineage_file.name=lineage_file +appender.lineage_file.fileName=${basePath}/gravitino_lineage.log +appender.lineage_file.filePattern=${basePath}/gravitino_lineage_%d{yyyyMMdd}.log.gz +appender.lineage_file.layout.type=PatternLayout +appender.lineage_file.layout.pattern=[%d{yyyy-MM-dd HH:mm:ss}] %m%n +appender.lineage_file.policies.type = Policies + +appender.lineage_file.policies.time.type = TimeBasedTriggeringPolicy +appender.lineage_file.policies.time.interval = 1 +appender.lineage_file.policies.time.modulate = true +appender.lineage_file.strategy.type = DefaultRolloverStrategy +appender.lineage_file.strategy.delete.type = Delete +appender.lineage_file.strategy.delete.basePath = ${basePath} +appender.lineage_file.strategy.delete.maxDepth = 10 +appender.lineage_file.strategy.delete.ifLastModified.type = IfLastModified + +logger.lineage.name = org.apache.gravitino.lineage.LineageLogSink$LineageLogger +logger.lineage.level = info +logger.lineage.appenderRef.lineage_file.ref = lineage_file +logger.lineage.additivity = false + # Configure root logger rootLogger.level = info rootLogger.appenderRef.rolling.ref = fileLogger diff --git a/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java b/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java new file mode 100644 index 0000000000..7307394eb8 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.utils; + +public class ClassUtils { + public static <T> T loadClass(String className) { + try { + return (T) Class.forName(className).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 530d502653..a157bb13d0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -118,6 +118,7 @@ datanucleus-jdo = "3.2.0-m3" hudi = "0.15.0" google-auth = "1.28.0" aliyun-credentials = "0.3.12" +openlineage = "1.29.0" [libraries] aws-iam = { group = "software.amazon.awssdk", name = "iam", version.ref = "awssdk" } @@ -279,6 +280,7 @@ google-auth-credentials = { group = "com.google.auth", name = "google-auth-libra aliyun-credentials-sdk = { group='com.aliyun', name='credentials-java', version.ref='aliyun-credentials' } flinkjdbc = {group='org.apache.flink',name='flink-connector-jdbc', version.ref='flinkjdbc'} +openlineage-java= { group = "io.openlineage", name = "openlineage-java", version.ref = "openlineage" } [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api", "log4j-layout-template-json"] diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts new file mode 100644 index 0000000000..21da55dea3 --- /dev/null +++ b/lineage/build.gradle.kts @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("java") +} + +dependencies { + implementation(project(":core")) + implementation(project(":server-common")) + implementation(libs.commons.lang3) + implementation(libs.guava) + implementation(libs.slf4j.api) + implementation(libs.jackson.datatype.jdk8) + implementation(libs.jackson.datatype.jsr310) + implementation(libs.jackson.databind) + implementation(libs.openlineage.java) { + isTransitive = false + } + + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + + testRuntimeOnly(libs.junit.jupiter.engine) +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java new file mode 100644 index 0000000000..ac7185336b --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.gravitino.lineage.processor.NoopProcessor; +import org.apache.gravitino.lineage.sink.LineageLogSink; +import org.apache.gravitino.lineage.source.HTTPLineageSource; + +public class LineageConfig extends Config { + + public static final String LINEAGE_CONFIG_PREFIX = "gravitino.lineage."; + public static final String LINEAGE_CONFIG_SINKS = "sinks"; + public static final String LINEAGE_CONFIG_SOURCE = "source"; + public static final String LINEAGE_SOURCE_CLASS_NAME = "sourceClass"; + public static final String LINEAGE_PROCESSOR_CLASS_NAME = "processorClass"; + public static final String LINEAGE_SINK_CLASS_NAME = "sinkClass"; + public static final String LINEAGE_HTTP_SOURCE_CLASS_NAME = HTTPLineageSource.class.getName(); + + public static final String LINEAGE_LOG_SINK_NAME = "log"; + public static final String LINEAGE_HTTP_SOURCE_NAME = "http"; + + private static final Splitter splitter = Splitter.on(","); + + public static final ConfigEntry<String> SOURCE_NAME = + new ConfigBuilder(LINEAGE_CONFIG_SOURCE) + .doc("The name of lineage event source") + .version(ConfigConstants.VERSION_0_9_0) + .stringConf() + .createWithDefault(LINEAGE_HTTP_SOURCE_NAME); + + public static final ConfigEntry<String> PROCESSOR_CLASS = + new ConfigBuilder(LINEAGE_PROCESSOR_CLASS_NAME) + .doc("The class name of lineage event processor") + .version(ConfigConstants.VERSION_0_9_0) + .stringConf() + .createWithDefault(NoopProcessor.class.getName()); + + public static final ConfigEntry<String> SINKS = + new ConfigBuilder(LINEAGE_CONFIG_SINKS) + .doc("The sinks of lineage event") + .version(ConfigConstants.VERSION_0_9_0) + .stringConf() + .createWithDefault(LINEAGE_LOG_SINK_NAME); + + public LineageConfig(Map<String, String> properties) { + super(false); + loadFromMap(properties, k -> true); + } + + public String source() { + return get(SOURCE_NAME); + } + + public String sourceClass() { + if (source().equals(LINEAGE_HTTP_SOURCE_NAME)) { + return LINEAGE_HTTP_SOURCE_CLASS_NAME; + } + String sourceConfig = source() + "." + LINEAGE_SOURCE_CLASS_NAME; + String sourceClass = getRawString(sourceConfig); + Preconditions.checkArgument(StringUtils.isNotBlank(sourceClass), sourceConfig + " is not set"); + return sourceClass; + } + + public String processorClass() { + return get(PROCESSOR_CLASS); + } + + public Map<String, String> getSinkConfigs() { + List<String> sinks = sinks(); + + Map<String, String> config = getAllConfig(); + Map<String, String> m = new HashMap(config); + + String sinkString = get(SINKS); + if (!m.containsKey(LINEAGE_CONFIG_SINKS)) { + m.put(LINEAGE_CONFIG_SINKS, sinkString); + } + + String logClassConfigKey = + LineageConfig.LINEAGE_LOG_SINK_NAME + "." + LineageConfig.LINEAGE_SINK_CLASS_NAME; + if (sinks.contains(LINEAGE_LOG_SINK_NAME) && !config.containsKey(logClassConfigKey)) { + m.put(logClassConfigKey, LineageLogSink.class.getName()); + } + + sinks.stream() + .forEach( + sinkName -> { + String sinkClassConfig = sinkName + "." + LineageConfig.LINEAGE_SINK_CLASS_NAME; + Preconditions.checkArgument( + m.containsKey(sinkClassConfig), sinkClassConfig + " is not set"); + }); + + return m; + } + + public List<String> sinks() { + String sinks = get(SINKS); + return splitter + .omitEmptyStrings() + .trimResults() + .splitToStream(sinks) + .collect(Collectors.toList()); + } +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java b/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java new file mode 100644 index 0000000000..ffa7f92058 --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage; + +import io.openlineage.server.OpenLineage.RunEvent; +import java.io.Closeable; + +/** + * Dispatches lineage events to configured sinks after processing. Implementations should handle + * initialization, event processing, and resource cleanup through {@link Closeable}. + * + * <p>Typical lifecycles: + * + * <ol> + * <li>{@link #initialize(LineageConfig)} with required configurations + * <li>Repeated calls to {@link #dispatchLineageEvent(RunEvent)} + * <li>{@link #close()} for resource cleanup + * </ol> + */ +public interface LineageDispatcher extends Closeable { + + /** + * Initializes the dispatcher with configuration. Must be called before event dispatching. + * + * @param lineageConfig configuration for lineage source, processor and sinks. + */ + void initialize(LineageConfig lineageConfig); + + /** + * Dispatches a lineage run event to the configured sink after processing. + * + * <p>Callers should implement appropriate retry/logging mechanisms for rejected events to prevent + * system overload. + * + * @param runEvent The OpenLineage run event to be processed and dispatched. Must not be null. + * @return {@code true} if the event was successfully processed and dispatched to the sinks, + * {@code false} if the event was rejected due to the overload of lineage sinks. + */ + boolean dispatchLineageEvent(RunEvent runEvent); +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java new file mode 100644 index 0000000000..2667d3e5ca --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage; + +import com.google.common.collect.ImmutableSet; +import io.openlineage.server.OpenLineage; +import io.openlineage.server.OpenLineage.RunEvent; +import java.util.Set; +import org.apache.gravitino.lineage.processor.LineageProcessor; +import org.apache.gravitino.lineage.sink.LineageSinkManager; +import org.apache.gravitino.lineage.source.LineageSource; +import org.apache.gravitino.server.web.SupportsRESTPackages; +import org.apache.gravitino.utils.ClassUtils; + +/** + * The LineageService manages the life cycle of lineage sinks, sources, and processors. It provides + * {@code dispatchLineageEvent} method for lineage source to dispatch lineage events to the sinks. + */ +public class LineageService implements LineageDispatcher, SupportsRESTPackages { + private LineageSinkManager sinkManager; + private LineageSource source; + private LineageProcessor processor; + + public void initialize(LineageConfig lineageConfig) { + String sourceName = lineageConfig.source(); + String sourceClass = lineageConfig.sourceClass(); + this.source = ClassUtils.loadClass(sourceClass); + this.sinkManager = new LineageSinkManager(); + + String processorClassName = lineageConfig.processorClass(); + this.processor = ClassUtils.loadClass(processorClassName); + + sinkManager.initialize(lineageConfig.sinks(), lineageConfig.getSinkConfigs()); + source.initialize(lineageConfig.getConfigsWithPrefix(sourceName), this); + } + + @Override + public void close() { + if (source != null) { + source.close(); + } + if (sinkManager != null) { + sinkManager.close(); + } + } + + @Override + public boolean dispatchLineageEvent(OpenLineage.RunEvent runEvent) { + if (sinkManager.isHighWatermark()) { + return false; + } + + RunEvent newEvent = processor.process(runEvent); + sinkManager.sink(newEvent); + return true; + } + + @Override + public Set<String> getRESTPackages() { + if (source instanceof SupportsRESTPackages) { + return ((SupportsRESTPackages) source).getRESTPackages(); + } + return ImmutableSet.of(); + } +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java b/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java new file mode 100644 index 0000000000..1a08e0cc0d --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.processor; + +import io.openlineage.server.OpenLineage.RunEvent; + +/** Processes {@link RunEvent} objects to transform or enrich their lineage data. */ +public interface LineageProcessor { + + /** + * Processes a run event and returns the modified instance. + * + * @param runEvent The original run event to process. + * @return Processed run event instance with updated lineage data. + */ + RunEvent process(RunEvent runEvent); +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java b/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java new file mode 100644 index 0000000000..cb09eaf705 --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.processor; + +import io.openlineage.server.OpenLineage.RunEvent; + +public class NoopProcessor implements LineageProcessor { + + @Override + public RunEvent process(RunEvent runEvent) { + return runEvent; + } +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java new file mode 100644 index 0000000000..62367ad234 --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.sink; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.cfg.EnumFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.openlineage.server.OpenLineage.Run; +import io.openlineage.server.OpenLineage.RunEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LineageLogSink implements LineageSink { + private static final Logger LOG = LoggerFactory.getLogger(LineageLogSink.class); + private ObjectMapper objectMapper = + JsonMapper.builder() + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true) + .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS) + .build() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .registerModule(new JavaTimeModule()) + .registerModule(new Jdk8Module()); + private LineageLogger logger = new LineageLogger(); + + private static class LineageLogger { + private static final Logger LINEAGE_LOG = LoggerFactory.getLogger(LineageLogger.class); + + public void log(String lineageString) { + LINEAGE_LOG.info(lineageString); + } + } + + @Override + public void sink(RunEvent event) { + try { + logger.log(objectMapper.writeValueAsString(event)); + } catch (JsonProcessingException e) { + LOG.warn( + "Process open lineage event failed, run id: {}, error message: {}", + getRunId(event), + e.getMessage()); + } + } + + private String getRunId(RunEvent event) { + Run run = event.getRun(); + return run == null ? "Unknown" : run.getRunId().toString(); + } +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java new file mode 100644 index 0000000000..b765138516 --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.sink; + +import io.openlineage.server.OpenLineage; +import java.io.Closeable; +import java.util.Map; + +/** The LineageSink interface defines a closable component responsible for sinking lineage event. */ +public interface LineageSink extends Closeable { + + /** + * Initializes the lineage sink with the provided configuration. + * + * @param configs A map representing the configuration for the sink. + */ + default void initialize(Map<String, String> configs) {} + + /** Closes the lineage sink and releases associated resources. */ + @Override + default void close() {} + + /** + * Sinks the given lineage run event. + * + * @param event The lineage run event to be processed. + */ + void sink(OpenLineage.RunEvent event); +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java new file mode 100644 index 0000000000..f01dcc80e7 --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.sink; + +import io.openlineage.server.OpenLineage.RunEvent; +import java.io.Closeable; +import java.util.List; +import java.util.Map; + +@SuppressWarnings("unused") +public class LineageSinkManager implements Closeable { + + public void initialize(List<String> sinks, Map<String, String> LineageConfigs) {} + + // Checks if the sink queue size surpasses the threshold to avoid overwhelming lineage sinks. + public boolean isHighWatermark() { + return false; + } + + public void sink(RunEvent runEvent) {} + + @Override + public void close() {} +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java new file mode 100644 index 0000000000..fd15eaa57f --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.source; + +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Set; +import org.apache.gravitino.lineage.LineageDispatcher; +import org.apache.gravitino.server.web.SupportsRESTPackages; + +public class HTTPLineageSource implements LineageSource, SupportsRESTPackages { + @Override + public void initialize(Map<String, String> configs, LineageDispatcher dispatcher) {} + + @Override + public Set<String> getRESTPackages() { + return ImmutableSet.of(); + } +} diff --git a/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java b/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java new file mode 100644 index 0000000000..d633bdbd2d --- /dev/null +++ b/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage.source; + +import java.io.Closeable; +import java.util.Map; +import org.apache.gravitino.lineage.LineageDispatcher; + +/** + * The LineageSource interface defines a closable data source for receiving and dispatching lineage + * information. + */ +public interface LineageSource extends Closeable { + + /** + * Initializes the data source with the given configurations and a lineage dispatcher. + * + * @param configs A map containing configuration information for the data source. + * @param dispatcher A dispatcher used to distribute lineage event. + */ + default void initialize(Map<String, String> configs, LineageDispatcher dispatcher) {} + + /** Closes the data source and releases related resources. */ + @Override + default void close() {} +} diff --git a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java new file mode 100644 index 0000000000..d4dbfa8a3a --- /dev/null +++ b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lineage; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.lineage.sink.LineageLogSink; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestLineageConfig { + + @Test + void testLineageSource() { + // default config with HTTP source + LineageConfig lineageConfig = new LineageConfig(ImmutableMap.of()); + Assertions.assertEquals(LineageConfig.LINEAGE_HTTP_SOURCE_NAME, lineageConfig.source()); + Assertions.assertEquals( + LineageConfig.LINEAGE_HTTP_SOURCE_CLASS_NAME, lineageConfig.sourceClass()); + + // config with custom source + lineageConfig = + new LineageConfig( + ImmutableMap.of( + LineageConfig.LINEAGE_CONFIG_SOURCE, + "source1", + "source1." + LineageConfig.LINEAGE_SOURCE_CLASS_NAME, + "test-class")); + Assertions.assertEquals("source1", lineageConfig.source()); + Assertions.assertEquals("test-class", lineageConfig.sourceClass()); + + LineageConfig lineageConfig2 = + new LineageConfig(ImmutableMap.of(LineageConfig.LINEAGE_CONFIG_SOURCE, "source2")); + + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> lineageConfig2.sourceClass()); + } + + @Test + void testGetSinkConfigs() { + // default config with log sink + LineageConfig lineageConfig = new LineageConfig(ImmutableMap.of()); + Map<String, String> sinkConfigs = lineageConfig.getSinkConfigs(); + String sinks = sinkConfigs.get(LineageConfig.LINEAGE_CONFIG_SINKS); + Assertions.assertEquals(LineageConfig.LINEAGE_LOG_SINK_NAME, sinks); + String className = + sinkConfigs.get( + LineageConfig.LINEAGE_LOG_SINK_NAME + "." + LineageConfig.LINEAGE_SINK_CLASS_NAME); + Assertions.assertEquals(LineageLogSink.class.getName(), className); + + // config multi sinks + Map<String, String> config2 = + ImmutableMap.of( + LineageConfig.LINEAGE_CONFIG_SINKS, + "sink1,sink2", + "sink1." + LineageConfig.LINEAGE_SINK_CLASS_NAME, + "test-class", + "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME, + "test-class2"); + lineageConfig = new LineageConfig(config2); + sinkConfigs = lineageConfig.getSinkConfigs(); + sinks = sinkConfigs.get(LineageConfig.LINEAGE_CONFIG_SINKS); + Assertions.assertEquals("sink1,sink2", sinks); + Assertions.assertEquals( + "test-class", sinkConfigs.get("sink1." + LineageConfig.LINEAGE_SINK_CLASS_NAME)); + Assertions.assertEquals( + "test-class2", sinkConfigs.get("sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME)); + + // test missing sink1 class name + Map<String, String> config3 = + ImmutableMap.of( + LineageConfig.LINEAGE_CONFIG_SINKS, + "sink1,sink2", + "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME, + "test-class2"); + + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> { + LineageConfig lineageConfig1 = new LineageConfig(config3); + lineageConfig1.getSinkConfigs(); + }); + } +} diff --git a/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java b/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java new file mode 100644 index 0000000000..80c89f05c9 --- /dev/null +++ b/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.server.web; + +import java.util.Set; + +/** + * This interface provides a method to retrieve a set of REST package names. These package names can + * be used to be injected into a Jetty service, which is helpful for the Jetty service to locate and + * handle REST resources. + */ +public interface SupportsRESTPackages { + /** + * Retrieves a set of REST package names. + * + * @return A set containing the names of REST packages. + */ + Set<String> getRESTPackages(); +} diff --git a/server/build.gradle.kts b/server/build.gradle.kts index 4fe6ae2707..b58e42773d 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -27,6 +27,7 @@ dependencies { implementation(project(":api")) implementation(project(":common")) implementation(project(":core")) + implementation(project(":lineage")) implementation(project(":server-common")) implementation(libs.bundles.jetty) implementation(libs.bundles.jersey) diff --git a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java index 0c730439b1..accf65cb5f 100644 --- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java +++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java @@ -18,9 +18,8 @@ */ package org.apache.gravitino.server; -import com.google.common.collect.Lists; import java.io.File; -import java.util.List; +import java.util.HashSet; import java.util.Properties; import javax.servlet.Servlet; import org.apache.gravitino.Configs; @@ -33,6 +32,9 @@ import org.apache.gravitino.catalog.SchemaDispatcher; import org.apache.gravitino.catalog.TableDispatcher; import org.apache.gravitino.catalog.TopicDispatcher; import org.apache.gravitino.credential.CredentialOperationDispatcher; +import org.apache.gravitino.lineage.LineageConfig; +import org.apache.gravitino.lineage.LineageDispatcher; +import org.apache.gravitino.lineage.LineageService; import org.apache.gravitino.metalake.MetalakeDispatcher; import org.apache.gravitino.metrics.MetricsSystem; import org.apache.gravitino.metrics.source.MetricsSource; @@ -75,10 +77,13 @@ public class GravitinoServer extends ResourceConfig { private final GravitinoEnv gravitinoEnv; + private final LineageService lineageService; + public GravitinoServer(ServerConfig config, GravitinoEnv gravitinoEnv) { - serverConfig = config; - server = new JettyServer(); + this.serverConfig = config; + this.server = new JettyServer(); this.gravitinoEnv = gravitinoEnv; + this.lineageService = new LineageService(); } public void initialize() { @@ -90,6 +95,9 @@ public class GravitinoServer extends ResourceConfig { ServerAuthenticator.getInstance().initialize(serverConfig); + lineageService.initialize( + new LineageConfig(serverConfig.getConfigsWithPrefix(LineageConfig.LINEAGE_CONFIG_PREFIX))); + // initialize Jersey REST API resources. initializeRestApi(); } @@ -99,9 +107,11 @@ public class GravitinoServer extends ResourceConfig { } private void initializeRestApi() { - List<String> restApiPackages = Lists.newArrayList("org.apache.gravitino.server.web.rest"); - restApiPackages.addAll(serverConfig.get(Configs.REST_API_EXTENSION_PACKAGES)); - packages(restApiPackages.toArray(new String[0])); + HashSet<String> restApiPackagesSet = new HashSet<>(); + restApiPackagesSet.add("org.apache.gravitino.server.web.rest"); + restApiPackagesSet.addAll(serverConfig.get(Configs.REST_API_EXTENSION_PACKAGES)); + restApiPackagesSet.addAll(lineageService.getRESTPackages()); + packages(restApiPackagesSet.toArray(new String[0])); boolean enableAuthorization = serverConfig.get(Configs.ENABLE_AUTHORIZATION); register( @@ -120,6 +130,7 @@ public class GravitinoServer extends ResourceConfig { .to(CredentialOperationDispatcher.class) .ranked(1); bind(gravitinoEnv.modelDispatcher()).to(ModelDispatcher.class).ranked(1); + bind(lineageService).to(LineageDispatcher.class).ranked(1); } }); register(JsonProcessingExceptionMapper.class); @@ -161,6 +172,9 @@ public class GravitinoServer extends ResourceConfig { public void stop() { server.stop(); gravitinoEnv.shutdown(); + if (lineageService != null) { + lineageService.close(); + } } public static void main(String[] args) { diff --git a/settings.gradle.kts b/settings.gradle.kts index c865e14e7a..d4262e69c3 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -82,3 +82,4 @@ include(":bundles:gcp", ":bundles:gcp-bundle") include(":bundles:aliyun", ":bundles:aliyun-bundle") include(":bundles:azure", ":bundles:azure-bundle") include(":catalogs:hadoop-common") +include(":lineage")