Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6387#discussion_r204348635 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java --- @@ -82,49 +129,97 @@ public KafkaTableSink( * * @param rowSchema the schema of the row to serialize. * @return Instance of serialization schema + * @deprecated Use the constructor to pass a serialization schema instead. */ - protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema); + @Deprecated + protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) { + throw new UnsupportedOperationException("This method only exists for backwards compatibility."); + } /** * Create a deep copy of this sink. * * @return Deep copy of this sink */ - protected abstract KafkaTableSink createCopy(); + @Deprecated + protected KafkaTableSink createCopy() { + throw new UnsupportedOperationException("This method only exists for backwards compatibility."); + } @Override public void emitDataStream(DataStream<Row> dataStream) { - FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); - // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. - kafkaProducer.setFlushOnCheckpoint(true); + SinkFunction<Row> kafkaProducer = createKafkaProducer( + topic, + properties, + serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")), + partitioner); dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public TypeInformation<Row> getOutputType() { - return new RowTypeInfo(getFieldTypes()); + return schema + .map(TableSchema::toRowType) + .orElseGet(() -> new RowTypeInfo(getFieldTypes())); } public String[] getFieldNames() { - return fieldNames; + return schema.map(TableSchema::getColumnNames).orElse(fieldNames); } @Override public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; + return schema.map(TableSchema::getTypes).orElse(fieldTypes); } @Override public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + // a fixed schema is defined so reconfiguration is not supported --- End diff -- Move this comment to exception description.
---