[ https://issues.apache.org/jira/browse/BEAM-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on BEAM-13310 started by John Casey. ----------------------------------------- > KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does > -------------------------------------------------------------------- > > Key: BEAM-13310 > URL: https://issues.apache.org/jira/browse/BEAM-13310 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Reporter: Luke Cwik > Assignee: John Casey > Priority: P2 > > When run using SDF the pipeline does not commit offsets but when run using > the SDF UnboundedSourceWrapper via *use_deprecated_read* experiment the > pipeline does. This implies that the UnboundedSource version is able to > correctly commit offsets but the pure SDF does not. > Sample code: > {code:java} > final Pipeline p = Pipeline.create(options); > p.apply( > KafkaIO.<Long, String>read() > .withBootstrapServers(options.getKafkaBroker()) > .withTopic(options.getTopic()) > .withConsumerConfigUpdates( > Map.of( > ConsumerConfig.GROUP_ID_CONFIG, > options.getConsumerGroup(), > CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, > "SASL_SSL", > SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka", > SaslConfigs.SASL_JAAS_CONFIG, > "com.sun.security.auth.module.GssLoginModule required initiate=true;")) > .withKeyDeserializer(LongDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .commitOffsetsInFinalize() > .withoutMetadata()); > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)