AHeise commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1800732462


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.List;
+
+/**
+ * Contains method which can be used for lineage schema facet extraction. 
Useful for classes like
+ * topic selectors or serialization schemas to extract dataset information 
from.
+ */
+public interface LineageFacetProvider {

Review Comment:
   Should this be part of flink-core in the future?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing dataset facets. */
+public class LineageUtil {

Review Comment:
   Same question: does `List` provide users of the methods with any advantage 
over `Collection`?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.List;
+
+/**
+ * Contains method which can be used for lineage schema facet extraction. 
Useful for classes like
+ * topic selectors or serialization schemas to extract dataset information 
from.
+ */
+public interface LineageFacetProvider {
+
+    /**
+     * List of lineage dataset facets.
+     *
+     * @return
+     */
+    List<LineageDatasetFacet> getDatasetFacets();

Review Comment:
   nit: is Collection sufficient?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing dataset facets. */
+public class LineageUtil {
+
+    private static final String KAFKA_DATASET_PREFIX = "kafka://";
+    private static final String COMMA = ",";
+    private static final String SEMICOLON = ";";
+
+    /**
+     * Loads facet from any object implementing @link{DatasetFacetProvider} 
interface.
+     *
+     * @param object
+     * @return
+     */
+    public static List<LineageDatasetFacet> facetsFrom(Object object) {
+        return Optional.of(object)
+                .filter(LineageFacetProvider.class::isInstance)
+                .map(LineageFacetProvider.class::cast)
+                .map(LineageFacetProvider::getDatasetFacets)
+                .orElse(Collections.emptyList());
+    }
+
+    /**
+     * Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} 
to extract dataset
+     * name from. Dataset per each element of topic list is created
+     *
+     * @param facets
+     * @return
+     */
+    public static List<LineageDataset> datasetsFrom(
+            String namespace, List<LineageDatasetFacet> facets) {
+        // Check if topic list facet is available -> if so explode the list of 
facets
+        Optional<KafkaTopicListFacet> topicList =
+                facets.stream()
+                        .filter(KafkaTopicListFacet.class::isInstance)
+                        .map(KafkaTopicListFacet.class::cast)
+                        .findAny();
+
+        List<LineageDataset> datasets = new ArrayList<>();
+
+        // Explode list of other facets
+        if (topicList.isPresent()) {
+            List<LineageDatasetFacet> facetsWithoutTopicList =
+                    facets.stream().filter(f -> 
!f.equals(topicList)).collect(Collectors.toList());
+
+            topicList.get().topics.stream()
+                    .forEach(t -> datasets.add(datasetOf(namespace, t, 
facetsWithoutTopicList)));

Review Comment:
   nit: If you use functional style, `forEach + add` is rather an anti-pattern. 
You'd instead chain Streams and materialize them at the very end with a 
`Collector`.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java:
##########
@@ -132,4 +140,16 @@ public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
     protected Properties getKafkaProducerConfig() {
         return kafkaProducerConfig;
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        List<LineageDatasetFacet> facets = new ArrayList<>();
+
+        // add all the facets from deserialization schema and subscriber
+        facets.addAll(LineageUtil.facetsFrom(recordSerializer));
+        facets.add(new KafkaPropertiesFacet(this.kafkaProducerConfig));

Review Comment:
   See feedback on LineageUtil



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +377,23 @@ public ProducerRecord<byte[], byte[]> serialize(
                     value,
                     headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public List<LineageDatasetFacet> getDatasetFacets() {
+            List<LineageDatasetFacet> facets = new ArrayList<>();
+            facets.add(new 
KafkaTopicListFacet(Arrays.asList(topicSelector.apply(null))));
+
+            // gets type information from serialize method signature
+            
Arrays.stream(this.valueSerializationSchema.getClass().getMethods())

Review Comment:
   Again we should probably check for the serializer to return the 
TypeInformation directly (by implementing ResultTypeQueryable).
   If not we could fallback to extract that as you do, but I'd use things like 
`org.apache.flink.shaded.guava31.com.google.common.reflect.TypeToken` to be 
more robust. Your implementation fails if you have some intermediate interface 
that forward the type parameter.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing dataset facets. */
+public class LineageUtil {
+
+    private static final String KAFKA_DATASET_PREFIX = "kafka://";
+    private static final String COMMA = ",";
+    private static final String SEMICOLON = ";";
+
+    /**
+     * Loads facet from any object implementing @link{DatasetFacetProvider} 
interface.
+     *
+     * @param object
+     * @return
+     */
+    public static List<LineageDatasetFacet> facetsFrom(Object object) {
+        return Optional.of(object)
+                .filter(LineageFacetProvider.class::isInstance)
+                .map(LineageFacetProvider.class::cast)
+                .map(LineageFacetProvider::getDatasetFacets)
+                .orElse(Collections.emptyList());
+    }
+
+    /**
+     * Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} 
to extract dataset
+     * name from. Dataset per each element of topic list is created
+     *
+     * @param facets
+     * @return
+     */
+    public static List<LineageDataset> datasetsFrom(

Review Comment:
   This whole information flow around the facets looks a bit unclean to me.
   Both Source/Sink throw a bunch of information into a list of 
`LineageDatasetFacet`, then this method is applied to take that list apart and 
construct the actually intended `LineageDataset`. So we first deliberately lose 
the information of what the facets are about and then we need to use a lot of 
(hidden) if-else to extract that information again.
   
   WDYT of replacing the `List<LineageDatasetFacet>` instead with a value class 
that contains all relevant information:
   ```java
   class KafkaFacet { 
     @Nullable
     String topicPattern;
     @Nullable
     List<String> topicList;
     Properties properties;
     @Nullable
     TypeInformation typeInformation;
   }
   ```
   
   Then you can access all the different pieces of information without the 
isInstance/cast pattern that you use.
   You can then in this method still turn all the pieces of information into 
separate facets.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +377,23 @@ public ProducerRecord<byte[], byte[]> serialize(
                     value,
                     headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public List<LineageDatasetFacet> getDatasetFacets() {
+            List<LineageDatasetFacet> facets = new ArrayList<>();
+            facets.add(new 
KafkaTopicListFacet(Arrays.asList(topicSelector.apply(null))));

Review Comment:
   Is `topicSelector.apply(null)` guaranteed to work?
   Is this even the right thing to do? TopicSelector could return different 
topics coming from different inputs.
   I think we should instead check if TopicSelector is also a LineageProvider 
and ask it directly.
   Our TopicSelectors should then implement it and we should add to the javadoc 
that LineageProvider is encouraged.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaPropertiesFacet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.lineage.facets;
+
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Objects;
+import java.util.Properties;
+
+/** Facet containing Kafka properties. */
+public class KafkaPropertiesFacet implements LineageDatasetFacet {
+
+    public static final String KAFKA_PROPERTIES_FACET_NAME = "kafkaProperties";
+    public Properties properties;

Review Comment:
   What assumptions do we make about the mutability and thread-safety of the 
facades? Do we nned to make defensive copies of the mutable information such as 
the Properties?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to