Arseniy Tashoyan created FLINK-24850:
----------------------------------------
Summary: KafkaSinkBuilder: NullPointerException if
bootstrap.servers are defined via properties
Key: FLINK-24850
URL: https://issues.apache.org/jira/browse/FLINK-24850
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.0
Reporter: Arseniy Tashoyan
Let's create KafkaSink:
{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(""group.id"", "group1")
val kafkaSink = KafkaSink.builder[String]()
.setKafkaProducerConfig(props)
...// other settings
.build()
{code}
It fails:
{code:none}
java.lang.NullPointerException
at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
{code}
_KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap servers
are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores bootstrap
servers in the properties.
Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via properties
- not neccesarilly by calling _KafkaSourceBuilder.setBootstrapServers()_. It
would be good to enable the same contract for _KafkaSinkBuilder_.
This is convenient to have all Kafka-specific settings in a config-file:
{code:none}
settings {
"bootstrap.servers" = "broker1:9092,broker2:9092"
"group.id" = "retail-streaming"
"auto.offset.reset" = "latest"
}
{code}
Such settings can be passed as-is to
_KafkaSinkBuilder.setKafkaProducerConfig()_ - no special treatment for
_"bootstrap.servers"_.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)