fapaul commented on a change in pull request #18612:
URL: https://github.com/apache/flink/pull/18612#discussion_r798722386



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
##########
@@ -86,45 +85,48 @@
         return new KafkaSinkBuilder<>();
     }
 
+    @Internal
     @Override
-    public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(
-            InitContext context, List<KafkaWriterState> states) throws 
IOException {
-        final Supplier<MetricGroup> metricGroupSupplier =
-                () -> context.metricGroup().addGroup("user");
-        return new KafkaWriter<>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                context,
-                recordSerializer,
-                new InitContextInitializationContextAdapter(
-                        context.getUserCodeClassLoader(), metricGroupSupplier),
-                states);
+    public Committer<KafkaCommittable> createCommitter() throws IOException {
+        return new KafkaCommitter(kafkaProducerConfig);
     }
 
+    @Internal
     @Override
-    public Optional<Committer<KafkaCommittable>> createCommitter() throws 
IOException {
-        return Optional.of(new KafkaCommitter(kafkaProducerConfig));
+    public SimpleVersionedSerializer<KafkaCommittable> 
getCommittableSerializer() {
+        return new KafkaCommittableSerializer();

Review comment:
       Yes because only the Flink runtime is supposed to call it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to