fapaul commented on a change in pull request #18612: URL: https://github.com/apache/flink/pull/18612#discussion_r798722386
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java ########## @@ -86,45 +85,48 @@ return new KafkaSinkBuilder<>(); } + @Internal @Override - public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( - InitContext context, List<KafkaWriterState> states) throws IOException { - final Supplier<MetricGroup> metricGroupSupplier = - () -> context.metricGroup().addGroup("user"); - return new KafkaWriter<>( - deliveryGuarantee, - kafkaProducerConfig, - transactionalIdPrefix, - context, - recordSerializer, - new InitContextInitializationContextAdapter( - context.getUserCodeClassLoader(), metricGroupSupplier), - states); + public Committer<KafkaCommittable> createCommitter() throws IOException { + return new KafkaCommitter(kafkaProducerConfig); } + @Internal @Override - public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { - return Optional.of(new KafkaCommitter(kafkaProducerConfig)); + public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() { + return new KafkaCommittableSerializer(); Review comment: Yes because only the Flink runtime is supposed to call it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org