pawel-big-lebowski commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1818853390


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize(
                     value,
                     headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+                LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");
+                return Optional.empty();
+            }
+
+            Optional<KafkaDatasetIdentifier> topicsIdentifier =
+                    ((KafkaDatasetIdentifierProvider) 
(topicSelector)).getDatasetIdentifier();
+
+            if (!topicsIdentifier.isPresent()) {
+                LOG.warn("No topics' identifiers provided");
+                return Optional.empty();
+            }
+
+            TypeInformation typeInformation;
+            if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+                typeInformation =
+                        ((ResultTypeQueryable<?>) 
this.valueSerializationSchema).getProducedType();
+            } else {
+                // gets type information from serialize method signature
+                typeInformation =

Review Comment:
   This is returned within the facet and then listener (like 
OpenLineageJobListener) converts it to dataset schema format description. For 
OpenLineage, it's called `SchemaDatasetFacet`. I think this is not Kafka 
connector specific and there should be a general schema-alike facet within 
flink core. However, I don't feel I would be able to achieve this now. Schema 
information is valuable for both input and output datasets.
   
   I hope typeInformation approach will work well for `Avro` and `Protobuf`. 
Hopefully, in some time, I create separate tests within OpenLineage job 
listener to verify this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to