pawel-big-lebowski commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1818853390
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java: ########## @@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + + @Override + public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() { + if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) { + LOG.warn("Cannot identify topics. Not an TopicsIdentifierProvider"); + return Optional.empty(); + } + + Optional<KafkaDatasetIdentifier> topicsIdentifier = + ((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier(); + + if (!topicsIdentifier.isPresent()) { + LOG.warn("No topics' identifiers provided"); + return Optional.empty(); + } + + TypeInformation typeInformation; + if (this.valueSerializationSchema instanceof ResultTypeQueryable) { + typeInformation = + ((ResultTypeQueryable<?>) this.valueSerializationSchema).getProducedType(); + } else { + // gets type information from serialize method signature + typeInformation = Review Comment: This is returned within the facet and then listener (like OpenLineageJobListener) converts it to dataset schema format description. For OpenLineage, it's called `SchemaDatasetFacet`. I think this is not Kafka connector specific and there should be a general schema-alike facet within flink core. However, I don't feel I would be able to achieve this now. Schema information is valuable for both input and output datasets. I hope typeInformation approach will work well for `Avro` and `Protobuf`. Hopefully, in some time, I create separate tests within OpenLineage job listener to verify this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org