AHeise commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1815533794


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;
+    public final TypeInformation typeInformation;
+    public final KafkaDatasetIdentifier topicIdentifier;
+
+    public KafkaDatasetFacet(
+            KafkaDatasetIdentifier topicIdentifier,
+            Properties properties,
+            TypeInformation typeInformation) {
+        this.topicIdentifier = topicIdentifier;
+        this.properties = properties;
+        this.typeInformation = typeInformation;
+    }
+
+    public void addProperties(Properties properties) {
+        this.properties.putAll(properties);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaDatasetFacet that = (KafkaDatasetFacet) o;
+        return Objects.equals(properties, that.properties)
+                && Objects.equals(typeInformation, that.typeInformation)
+                && Objects.equals(topicIdentifier, that.topicIdentifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(properties, typeInformation, topicIdentifier);
+    }
+
+    @Override
+    public String name() {
+        return KAFKA_FACET_NAME;
+    }
+
+    /**
+     * Record class to contain topics' identifier information which can be 
either a list of topics
+     * or a topic pattern.
+     */
+    public static class KafkaDatasetIdentifier {
+        public final List<String> topics;
+        public final Pattern topicPattern;
+
+        public KafkaDatasetIdentifier(List<String> fixedTopics, Pattern 
topicPattern) {
+            this.topics = fixedTopics;
+            this.topicPattern = topicPattern;
+        }
+
+        public static KafkaDatasetIdentifier of(Pattern pattern) {
+            return new KafkaDatasetIdentifier(Collections.emptyList(), 
pattern);
+        }
+
+        public static KafkaDatasetIdentifier of(List<String> fixedTopics) {

Review Comment:
   
   ```suggestion
           public static KafkaDatasetIdentifier ofTopics(List<String> 
fixedTopics) {
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;

Review Comment:
   We usually avoid public and rather use the full jazz. It just makes it 
easier to later add more validation or defensive copies when needed.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java:
##########
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
+
+import java.util.Optional;
+
+/** Contains method to extract {@link KafkaDatasetFacet}. */
+public interface KafkaDatasetFacetProvider {
+
+    /**
+     * List of lineage dataset facets.

Review Comment:
   Adjust comment.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;
+    public final TypeInformation typeInformation;
+    public final KafkaDatasetIdentifier topicIdentifier;
+
+    public KafkaDatasetFacet(
+            KafkaDatasetIdentifier topicIdentifier,
+            Properties properties,
+            TypeInformation typeInformation) {

Review Comment:
   
   ```suggestion
               @Nullable TypeInformation typeInformation) {
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;

Review Comment:
   Is it coming to have a separate package for facets? If not, I'd use a 
package for all lineage classes. There are not that many.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;
+    public final TypeInformation typeInformation;
+    public final KafkaDatasetIdentifier topicIdentifier;
+
+    public KafkaDatasetFacet(
+            KafkaDatasetIdentifier topicIdentifier,
+            Properties properties,
+            TypeInformation typeInformation) {
+        this.topicIdentifier = topicIdentifier;
+        this.properties = properties;
+        this.typeInformation = typeInformation;
+    }
+
+    public void addProperties(Properties properties) {
+        this.properties.putAll(properties);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaDatasetFacet that = (KafkaDatasetFacet) o;
+        return Objects.equals(properties, that.properties)
+                && Objects.equals(typeInformation, that.typeInformation)
+                && Objects.equals(topicIdentifier, that.topicIdentifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(properties, typeInformation, topicIdentifier);
+    }
+
+    @Override
+    public String name() {
+        return KAFKA_FACET_NAME;
+    }
+
+    /**
+     * Record class to contain topics' identifier information which can be 
either a list of topics
+     * or a topic pattern.
+     */
+    public static class KafkaDatasetIdentifier {

Review Comment:
   Any reason to not make it top-level?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;
+    public final TypeInformation typeInformation;
+    public final KafkaDatasetIdentifier topicIdentifier;
+
+    public KafkaDatasetFacet(
+            KafkaDatasetIdentifier topicIdentifier,
+            Properties properties,
+            TypeInformation typeInformation) {
+        this.topicIdentifier = topicIdentifier;
+        this.properties = properties;
+        this.typeInformation = typeInformation;
+    }
+
+    public void addProperties(Properties properties) {

Review Comment:
   Since this method modifies the properties, the ctor should make a copy.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize(
                     value,
                     headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+                LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");
+                return Optional.empty();
+            }
+
+            Optional<KafkaDatasetIdentifier> topicsIdentifier =
+                    ((KafkaDatasetIdentifierProvider) 
(topicSelector)).getDatasetIdentifier();
+
+            if (!topicsIdentifier.isPresent()) {
+                LOG.warn("No topics' identifiers provided");
+                return Optional.empty();
+            }
+
+            TypeInformation typeInformation;
+            if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+                typeInformation =
+                        ((ResultTypeQueryable<?>) 
this.valueSerializationSchema).getProducedType();
+            } else {
+                // gets type information from serialize method signature
+                typeInformation =
+                        
Arrays.stream(this.valueSerializationSchema.getClass().getMethods())
+                                .map(m -> Invokable.from(m))
+                                .filter(m -> 
"serialize".equalsIgnoreCase(m.getName()))
+                                .map(m -> m.getParameters().get(0))
+                                .filter(p -> 
!p.getType().equals(TypeToken.of(Object.class)))
+                                .findFirst()
+                                .map(p -> p.getType())
+                                .map(t -> TypeInformation.of(t.getRawType()))
+                                .orElse(null);

Review Comment:
   This looks way more complicated as it should be. Here is what I had in mind.
   
   TypeToken<? extends SerializationSchema> serializationSchemaType = 
TypeToken.of(valueSerializationSchema.getClass());
   Class<?> parameterType = 
serializationSchemaType.resolveType(SerializationSchema.class.getTypeParameters()[0]).getRawType();
   if (parameterType != Object.class) {
       typeInformation = TypeInformation.of(parameterType);
   }



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;
+    public final TypeInformation typeInformation;

Review Comment:
   
   ```suggestion
       @Nullable  
       public final TypeInformation typeInformation;
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java:
##########
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
+
+import java.util.Optional;
+
+/** Contains method to extract {@link KafkaDatasetFacet}. */
+public interface KafkaDatasetFacetProvider {
+
+    /**
+     * List of lineage dataset facets.
+     *
+     * @return

Review Comment:
   Remove if you don't want to fill it out. Would be good to document when it's 
returning empty.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java:
##########
@@ -132,4 +142,30 @@ public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
     protected Properties getKafkaProducerConfig() {
         return kafkaProducerConfig;
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        // enrich dataset facet with properties
+        if (recordSerializer instanceof KafkaDatasetFacetProvider) {
+            Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+                    ((KafkaDatasetFacetProvider) 
recordSerializer).getKafkaDatasetFacet();
+
+            if (!kafkaDatasetFacet.isPresent()) {
+                LOG.warn("Provided did not return kafka dataset facet");
+                return null;

Review Comment:
   I don't think we are allowed to return null. The interface doesn't specify 
@Nullable.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java:
##########
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Facet containing all information related to sources and sinks on Kafka. */
+public class KafkaDatasetFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    public final Properties properties;
+    public final TypeInformation typeInformation;
+    public final KafkaDatasetIdentifier topicIdentifier;
+
+    public KafkaDatasetFacet(
+            KafkaDatasetIdentifier topicIdentifier,
+            Properties properties,
+            TypeInformation typeInformation) {
+        this.topicIdentifier = topicIdentifier;
+        this.properties = properties;
+        this.typeInformation = typeInformation;
+    }
+
+    public void addProperties(Properties properties) {
+        this.properties.putAll(properties);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaDatasetFacet that = (KafkaDatasetFacet) o;
+        return Objects.equals(properties, that.properties)
+                && Objects.equals(typeInformation, that.typeInformation)
+                && Objects.equals(topicIdentifier, that.topicIdentifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(properties, typeInformation, topicIdentifier);
+    }
+
+    @Override
+    public String name() {
+        return KAFKA_FACET_NAME;
+    }
+
+    /**
+     * Record class to contain topics' identifier information which can be 
either a list of topics
+     * or a topic pattern.
+     */
+    public static class KafkaDatasetIdentifier {
+        public final List<String> topics;

Review Comment:
   
   ```suggestion
           @Nullable
           public final List<String> topics;
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize(
                     value,
                     headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+                LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");

Review Comment:
   I'd not use warn in these cases. I think info is good enough.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing dataset facets. */
+public class LineageUtil {
+
+    private static final String KAFKA_DATASET_PREFIX = "kafka://";
+    private static final String COMMA = ",";
+    private static final String SEMICOLON = ";";
+
+    public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet 
kafkaDatasetFacet) {
+        return new LineageDataset() {
+            @Override
+            public String name() {
+                if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) {
+                    return kafkaDatasetFacet.topicIdentifier.toString();
+                }
+
+                return String.join(",", 
kafkaDatasetFacet.topicIdentifier.topics);
+            }
+
+            @Override
+            public String namespace() {
+                return namespace;
+            }
+
+            @Override
+            public Map<String, LineageDatasetFacet> facets() {
+                return Collections.singletonMap(
+                        KafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);

Review Comment:
   Is this correct? this looks like it duplicates the names and other 
properties. I'd expect two facets for properties and type info instead.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java:
##########
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import 
org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier;
+
+import java.util.Optional;
+
+/** Contains method which allows extracting topic identifier. */
+public interface KafkaDatasetIdentifierProvider {

Review Comment:
   Since we have 
   ```
   interface A {}
   interface AProvider{}
   ```
   there is also the pattern of using
   ```
   interface A {
       interface Provider {}
   }
   ```
   
   I don't have a strong opinion on that. Just wanted to point out the option.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize(
                     value,
                     headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+                LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");
+                return Optional.empty();
+            }
+
+            Optional<KafkaDatasetIdentifier> topicsIdentifier =
+                    ((KafkaDatasetIdentifierProvider) 
(topicSelector)).getDatasetIdentifier();
+
+            if (!topicsIdentifier.isPresent()) {
+                LOG.warn("No topics' identifiers provided");
+                return Optional.empty();
+            }
+
+            TypeInformation typeInformation;
+            if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+                typeInformation =
+                        ((ResultTypeQueryable<?>) 
this.valueSerializationSchema).getProducedType();
+            } else {
+                // gets type information from serialize method signature
+                typeInformation =

Review Comment:
   How do we use this type information later? This is the input type, right?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java:
##########
@@ -132,4 +142,30 @@ public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
     protected Properties getKafkaProducerConfig() {
         return kafkaProducerConfig;
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        // enrich dataset facet with properties
+        if (recordSerializer instanceof KafkaDatasetFacetProvider) {
+            Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+                    ((KafkaDatasetFacetProvider) 
recordSerializer).getKafkaDatasetFacet();
+
+            if (!kafkaDatasetFacet.isPresent()) {
+                LOG.warn("Provided did not return kafka dataset facet");
+                return null;
+            }
+
+            kafkaDatasetFacet.get().addProperties(this.kafkaProducerConfig);

Review Comment:
   Do we ever actually get the properties from the recordSerializer? So are we 
actually just setting here?



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -256,6 +267,102 @@ public void testSerializeRecordWithTimestamp() {
         assertThat(recordWithInvalidTimestamp.timestamp()).isNull();
     }
 
+    @Test
+    public void 
testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifierProvider() {
+        SerializationSchema<String> serializationSchema = new 
SimpleStringSchema();
+        KafkaRecordSerializationSchema<String> schema =
+                KafkaRecordSerializationSchema.builder()
+                        .setTopicSelector(mock(TopicSelector.class))
+                        .setValueSerializationSchema(serializationSchema)
+                        .setKeySerializationSchema(serializationSchema)
+                        .build();
+
+        assertThat(((KafkaDatasetFacetProvider) 
schema).getKafkaDatasetFacet()).isEmpty();
+    }
+
+    @Test
+    public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() {
+        TopicSelector topicSelector =
+                mock(
+                        TopicSelector.class,
+                        
withSettings().extraInterfaces(KafkaDatasetIdentifierProvider.class));
+        when(((KafkaDatasetIdentifierProvider) 
topicSelector).getDatasetIdentifier())
+                .thenReturn(Optional.empty());

Review Comment:
   I haven't looked too closely at the tests. But a high-level comment: In 
Flink, we don't use mockito (anymore). The main idea is that we use interfaces 
(as you did) and then just explicitly create our MockImplementation.
   
   ```
   class MockTopicSelector implements TopicSelector, 
KafkaDatasetIdentifierProvider {
     KafkaDatasetIdentifier id; // init with ctor or factory method
   
     KafkaDatasetIdentifier getDatasetIdentifier() { return id; }
   }
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing dataset facets. */
+public class LineageUtil {
+
+    private static final String KAFKA_DATASET_PREFIX = "kafka://";
+    private static final String COMMA = ",";
+    private static final String SEMICOLON = ";";
+
+    public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet 
kafkaDatasetFacet) {
+        return new LineageDataset() {
+            @Override
+            public String name() {
+                if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) {
+                    return kafkaDatasetFacet.topicIdentifier.toString();
+                }
+
+                return String.join(",", 
kafkaDatasetFacet.topicIdentifier.topics);

Review Comment:
   A more OOP approach would be to model the KafkaDatasetIdentifier as an 
interface and have two implementations for pattern/list. Then have a method 
`toLineageName` (or so) that implements these two ways.
   
   I already asked offline if wouldn't be easier to just use `String 
identifier` in the `KafkaDatasetFacet` and let the IdentifierProvider already 
preform these operations. You mentioned that you would like to differentiate 
between topicPattern and topicList but I haven't actually seen the need for it. 
Can you please point me where it's important to differentiate outside of this 
method?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java:
##########
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import 
org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier;
+
+import java.util.Optional;
+
+/** Contains method which allows extracting topic identifier. */
+public interface KafkaDatasetIdentifierProvider {
+
+    /**
+     * List of lineage dataset facets.
+     *
+     * @return

Review Comment:
   Same comments as for the other provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to