Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204348635
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 ---
    @@ -82,49 +129,97 @@ public KafkaTableSink(
         *
         * @param rowSchema the schema of the row to serialize.
         * @return Instance of serialization schema
    +    * @deprecated Use the constructor to pass a serialization schema 
instead.
         */
    -   protected abstract SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema);
    +   @Deprecated
    +   protected SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema) {
    +           throw new UnsupportedOperationException("This method only 
exists for backwards compatibility.");
    +   }
     
        /**
         * Create a deep copy of this sink.
         *
         * @return Deep copy of this sink
         */
    -   protected abstract KafkaTableSink createCopy();
    +   @Deprecated
    +   protected KafkaTableSink createCopy() {
    +           throw new UnsupportedOperationException("This method only 
exists for backwards compatibility.");
    +   }
     
        @Override
        public void emitDataStream(DataStream<Row> dataStream) {
    -           FlinkKafkaProducerBase<Row> kafkaProducer = 
createKafkaProducer(topic, properties, serializationSchema, partitioner);
    -           // always enable flush on checkpoint to achieve at-least-once 
if query runs with checkpointing enabled.
    -           kafkaProducer.setFlushOnCheckpoint(true);
    +           SinkFunction<Row> kafkaProducer = createKafkaProducer(
    +                   topic,
    +                   properties,
    +                   serializationSchema.orElseThrow(() -> new 
IllegalStateException("No serialization schema defined.")),
    +                   partitioner);
                
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(),
 fieldNames));
        }
     
        @Override
        public TypeInformation<Row> getOutputType() {
    -           return new RowTypeInfo(getFieldTypes());
    +           return schema
    +                   .map(TableSchema::toRowType)
    +                   .orElseGet(() -> new RowTypeInfo(getFieldTypes()));
        }
     
        public String[] getFieldNames() {
    -           return fieldNames;
    +           return 
schema.map(TableSchema::getColumnNames).orElse(fieldNames);
        }
     
        @Override
        public TypeInformation<?>[] getFieldTypes() {
    -           return fieldTypes;
    +           return schema.map(TableSchema::getTypes).orElse(fieldTypes);
        }
     
        @Override
        public KafkaTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
    +           // a fixed schema is defined so reconfiguration is not supported
    --- End diff --
    
    Move this comment to exception description.


---

Reply via email to