Hi Lars, Unfortunately, there is at the moment a small bug in our documentation [1]. You can set the DeliveryGuarantee on the builder object and not on the serialization schema. Sorry for the inconvenience.
Best, Fabian [1] https://github.com/apache/flink/pull/17971 On Thu, Dec 2, 2021 at 12:34 PM Lars Skjærven <lar...@gmail.com> wrote: > > Hello, > upgrading to 1.14 I bumped into an issue with the kafka sink builder when > defining delivery guarantee: > > value setDeliveryGuarantee is not a member of > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[....] > > > Seems to be working with the default value (i.e. without mentioning > setDeliveryGuarantee), but compile error when including it. > > Is it better to leave it with the default, and let the application cluster > config define this ? > > I believe I build the KafkaSink according to the docs: > > import org.apache.flink.connector.base.DeliveryGuarantee > import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, > KafkaSink} > import org.apache.flink.connector.kafka.source.KafkaSource > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer > > val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]() > .setBootstrapServers("...") > .setRecordSerializer( > KafkaRecordSerializationSchema > .builder[SomePBStuff]() > .setTopic("mytopic") > .setKeySerializationSchema((v: SomePBStuff) => > v.key.getBytes(StandardCharsets.UTF_8)) > .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > ) > .build() > > > in build.sbt I have: > > ThisBuild / scalaVersion := "2.12.13" > val flinkVersion = "1.14.0" > > val flinkDependencies = Seq( > "org.apache.flink" % "flink-runtime" % flinkVersion % Test, > > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", > "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", > > "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, > ) > > >