AHeise commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1800732462
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java: ########## @@ -0,0 +1,19 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.List; + +/** + * Contains method which can be used for lineage schema facet extraction. Useful for classes like + * topic selectors or serialization schemas to extract dataset information from. + */ +public interface LineageFacetProvider { Review Comment: Should this be part of flink-core in the future? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +/** Utility class with useful methods for managing dataset facets. */ +public class LineageUtil { Review Comment: Same question: does `List` provide users of the methods with any advantage over `Collection`? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java: ########## @@ -0,0 +1,19 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.List; + +/** + * Contains method which can be used for lineage schema facet extraction. Useful for classes like + * topic selectors or serialization schemas to extract dataset information from. + */ +public interface LineageFacetProvider { + + /** + * List of lineage dataset facets. + * + * @return + */ + List<LineageDatasetFacet> getDatasetFacets(); Review Comment: nit: is Collection sufficient? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +/** Utility class with useful methods for managing dataset facets. */ +public class LineageUtil { + + private static final String KAFKA_DATASET_PREFIX = "kafka://"; + private static final String COMMA = ","; + private static final String SEMICOLON = ";"; + + /** + * Loads facet from any object implementing @link{DatasetFacetProvider} interface. + * + * @param object + * @return + */ + public static List<LineageDatasetFacet> facetsFrom(Object object) { + return Optional.of(object) + .filter(LineageFacetProvider.class::isInstance) + .map(LineageFacetProvider.class::cast) + .map(LineageFacetProvider::getDatasetFacets) + .orElse(Collections.emptyList()); + } + + /** + * Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset + * name from. Dataset per each element of topic list is created + * + * @param facets + * @return + */ + public static List<LineageDataset> datasetsFrom( + String namespace, List<LineageDatasetFacet> facets) { + // Check if topic list facet is available -> if so explode the list of facets + Optional<KafkaTopicListFacet> topicList = + facets.stream() + .filter(KafkaTopicListFacet.class::isInstance) + .map(KafkaTopicListFacet.class::cast) + .findAny(); + + List<LineageDataset> datasets = new ArrayList<>(); + + // Explode list of other facets + if (topicList.isPresent()) { + List<LineageDatasetFacet> facetsWithoutTopicList = + facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList()); + + topicList.get().topics.stream() + .forEach(t -> datasets.add(datasetOf(namespace, t, facetsWithoutTopicList))); Review Comment: nit: If you use functional style, `forEach + add` is rather an anti-pattern. You'd instead chain Streams and materialize them at the very end with a `Collector`. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java: ########## @@ -132,4 +140,16 @@ public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() { protected Properties getKafkaProducerConfig() { return kafkaProducerConfig; } + + @Override + public LineageVertex getLineageVertex() { + List<LineageDatasetFacet> facets = new ArrayList<>(); + + // add all the facets from deserialization schema and subscriber + facets.addAll(LineageUtil.facetsFrom(recordSerializer)); + facets.add(new KafkaPropertiesFacet(this.kafkaProducerConfig)); Review Comment: See feedback on LineageUtil ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java: ########## @@ -369,5 +377,23 @@ public ProducerRecord<byte[], byte[]> serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + + @Override + public List<LineageDatasetFacet> getDatasetFacets() { + List<LineageDatasetFacet> facets = new ArrayList<>(); + facets.add(new KafkaTopicListFacet(Arrays.asList(topicSelector.apply(null)))); + + // gets type information from serialize method signature + Arrays.stream(this.valueSerializationSchema.getClass().getMethods()) Review Comment: Again we should probably check for the serializer to return the TypeInformation directly (by implementing ResultTypeQueryable). If not we could fallback to extract that as you do, but I'd use things like `org.apache.flink.shaded.guava31.com.google.common.reflect.TypeToken` to be more robust. Your implementation fails if you have some intermediate interface that forward the type parameter. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +/** Utility class with useful methods for managing dataset facets. */ +public class LineageUtil { + + private static final String KAFKA_DATASET_PREFIX = "kafka://"; + private static final String COMMA = ","; + private static final String SEMICOLON = ";"; + + /** + * Loads facet from any object implementing @link{DatasetFacetProvider} interface. + * + * @param object + * @return + */ + public static List<LineageDatasetFacet> facetsFrom(Object object) { + return Optional.of(object) + .filter(LineageFacetProvider.class::isInstance) + .map(LineageFacetProvider.class::cast) + .map(LineageFacetProvider::getDatasetFacets) + .orElse(Collections.emptyList()); + } + + /** + * Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset + * name from. Dataset per each element of topic list is created + * + * @param facets + * @return + */ + public static List<LineageDataset> datasetsFrom( Review Comment: This whole information flow around the facets looks a bit unclean to me. Both Source/Sink throw a bunch of information into a list of `LineageDatasetFacet`, then this method is applied to take that list apart and construct the actually intended `LineageDataset`. So we first deliberately lose the information of what the facets are about and then we need to use a lot of (hidden) if-else to extract that information again. WDYT of replacing the `List<LineageDatasetFacet>` instead with a value class that contains all relevant information: ```java class KafkaFacet { @Nullable String topicPattern; @Nullable List<String> topicList; Properties properties; @Nullable TypeInformation typeInformation; } ``` Then you can access all the different pieces of information without the isInstance/cast pattern that you use. You can then in this method still turn all the pieces of information into separate facets. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java: ########## @@ -369,5 +377,23 @@ public ProducerRecord<byte[], byte[]> serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + + @Override + public List<LineageDatasetFacet> getDatasetFacets() { + List<LineageDatasetFacet> facets = new ArrayList<>(); + facets.add(new KafkaTopicListFacet(Arrays.asList(topicSelector.apply(null)))); Review Comment: Is `topicSelector.apply(null)` guaranteed to work? Is this even the right thing to do? TopicSelector could return different topics coming from different inputs. I think we should instead check if TopicSelector is also a LineageProvider and ask it directly. Our TopicSelectors should then implement it and we should add to the javadoc that LineageProvider is encouraged. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaPropertiesFacet.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.lineage.facets; + +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Objects; +import java.util.Properties; + +/** Facet containing Kafka properties. */ +public class KafkaPropertiesFacet implements LineageDatasetFacet { + + public static final String KAFKA_PROPERTIES_FACET_NAME = "kafkaProperties"; + public Properties properties; Review Comment: What assumptions do we make about the mutability and thread-safety of the facades? Do we nned to make defensive copies of the mutable information such as the Properties? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org