Hello,
upgrading to 1.14 I bumped into an issue with the kafka sink builder when
defining delivery guarantee:

value setDeliveryGuarantee is not a member of
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[....]


Seems to be working with the default value (i.e. without mentioning
setDeliveryGuarantee), but compile error when including it.

Is it better to leave it with the default, and let the application cluster
config define this ?

I believe I build the KafkaSink according to the docs:

import org.apache.flink.connector.base.DeliveryGuarantee
import
org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema,
KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer

    val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]()
      .setBootstrapServers("...")
      .setRecordSerializer(
        KafkaRecordSerializationSchema
          .builder[SomePBStuff]()
          .setTopic("mytopic")
          .setKeySerializationSchema((v: SomePBStuff) =>
v.key.getBytes(StandardCharsets.UTF_8))
          .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
          .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
          .build()
      )
      .build()


in build.sbt I have:

ThisBuild / scalaVersion := "2.12.13"
val flinkVersion = "1.14.0"

val flinkDependencies = Seq(
  "org.apache.flink" % "flink-runtime" % flinkVersion % Test,

  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",

  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
  "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
)

Reply via email to