PatrickRen commented on code in PR #25331: URL: https://github.com/apache/flink/pull/25331#discussion_r1778440599
########## pom.xml: ########## @@ -2420,6 +2420,10 @@ under the License. <exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude> <!-- FLINK-36327 Remove dependency about flink-scala and flink-streaming-scala in table module in flink-2.0 --> <exclude>org.apache.flink.table.api.typeutils.*</exclude> + <!-- FLINK-36245 Remove SinkV1 API in flink-2.0. --> + <exclude>org.apache.flink.streaming.api.scala.DataStream#sinkTo()</exclude> Review Comment: The DataStream Scala API has been removed on master ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java: ########## @@ -258,17 +198,7 @@ protected void submitRequestEntries( * method should be called. */ protected void submitRequestEntries( - List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> resultHandler) { - submitRequestEntries( - requestEntries, - requestsToRetry -> { - if (requestsToRetry.isEmpty()) { - resultHandler.complete(); - } else { - resultHandler.retryForEntries(requestsToRetry); - } - }); - } + List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> resultHandler) {} Review Comment: This method should throw `UnsupportedOperationException` to keep the same behavior as the previous deprecated method do. ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java: ########## @@ -85,35 +81,39 @@ public static void main(String[] args) throws Exception { SingleOutputStreamOperator<String> mapToString = input.map((MapFunction<User, String>) SpecificRecordBase::toString); - KafkaSink<String> stringSink = - KafkaSink.<String>builder() - .setBootstrapServers(bootstrapServers) - .setRecordSerializer( - KafkaRecordSerializationSchema.builder() - .setValueSerializationSchema(new SimpleStringSchema()) - .setTopic(parameterTool.getRequired("output-string-topic")) - .build()) - .setKafkaProducerConfig(config) - .build(); - mapToString.sinkTo(stringSink); - - KafkaSink<User> avroSink = - KafkaSink.<User>builder() - .setBootstrapServers(bootstrapServers) - .setRecordSerializer( - KafkaRecordSerializationSchema.builder() - .setValueSerializationSchema( - ConfluentRegistryAvroSerializationSchema - .forSpecific( - User.class, - parameterTool.getRequired( - "output-subject"), - schemaRegistryUrl)) - .setTopic(parameterTool.getRequired("output-avro-topic")) - .build()) - .build(); - input.sinkTo(avroSink); - - env.execute("Kafka Confluent Schema Registry AVRO Example"); + // TODO: [FLINK-36245] Release comments after KafkaSink does not rely on the Depreciated API Review Comment: I don't get the point here. AFAIK Kafka sink is already using the latest Sink V2 API. Why we disable the code here? And please don't leave any dead code (commented) here. ########## flink-walkthroughs/flink-walkthrough-common/pom.xml: ########## @@ -43,4 +43,74 @@ under the License. <scope>provided</scope> </dependency> </dependencies> + + <build> Review Comment: What is this plugin used for? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java: ########## @@ -1014,8 +1014,8 @@ public DataStreamSink<T> printToErr(String sinkIdentifier) { * @param path The path pointing to the location the text file is written to. * @return The closed DataStream. * @deprecated Please use the {@link - * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly - * using the {@link #addSink(SinkFunction)} method. + * org.apache.flink.streaming.api.functions.sink.filesystem.legacy.StreamingFileSink} + * explicitly using the {@link #addSink(SinkFunction)} method. */ @Deprecated Review Comment: What about removing these methods with deprecation annotation directly, like `writeAsText`, `writeAsCsv` etc.? ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java: ########## @@ -281,69 +211,10 @@ protected void submitRequestEntries( */ protected abstract long getSizeInBytes(RequestEntryT requestEntry); - /** - * This method is deprecated, please use the constructor that specifies the {@link - * AsyncSinkWriterConfiguration}. - */ - @Deprecated - public AsyncSinkWriter( - ElementConverter<InputT, RequestEntryT> elementConverter, - Sink.InitContext context, - int maxBatchSize, - int maxInFlightRequests, - int maxBufferedRequests, - long maxBatchSizeInBytes, - long maxTimeInBufferMS, - long maxRecordSizeInBytes) { - this( - elementConverter, - context, - maxBatchSize, - maxInFlightRequests, - maxBufferedRequests, - maxBatchSizeInBytes, - maxTimeInBufferMS, - maxRecordSizeInBytes, - Collections.emptyList()); - } - - /** - * This method is deprecated, please use the constructor that specifies the {@link - * AsyncSinkWriterConfiguration}. - */ - @Deprecated - public AsyncSinkWriter( - ElementConverter<InputT, RequestEntryT> elementConverter, - Sink.InitContext context, - int maxBatchSize, - int maxInFlightRequests, - int maxBufferedRequests, - long maxBatchSizeInBytes, - long maxTimeInBufferMS, - long maxRecordSizeInBytes, - Collection<BufferedRequestState<RequestEntryT>> states) { - this( - elementConverter, - context, - AsyncSinkWriterConfiguration.builder() - .setMaxBatchSize(maxBatchSize) - .setMaxBatchSizeInBytes(maxBatchSizeInBytes) - .setMaxInFlightRequests(maxInFlightRequests) - .setMaxBufferedRequests(maxBufferedRequests) - .setMaxTimeInBufferMS(maxTimeInBufferMS) - .setMaxRecordSizeInBytes(maxRecordSizeInBytes) - .build(), - states); - } - - /** - * Should be removed along {@link - * org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter}. - */ - @Deprecated + /** Should be removed along {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter}. */ Review Comment: This comment should be removed. ########## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java: ########## @@ -61,24 +57,25 @@ public static void main(String[] args) throws Exception { Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordsPerSecond), TypeInformation.of(Event.class)); - - env.fromSource( - eventGeneratorSource, - WatermarkStrategy.noWatermarks(), - "Events Generator Source") - .sinkTo( - KafkaSink.<Event>builder() - .setBootstrapServers(brokers) - .setRecordSerializer( - KafkaRecordSerializationSchema.builder() - .setValueSerializationSchema( - new EventDeSerializationSchema()) - .setTopic(kafkaTopic) - .build()) - .build()); + // TODO: [FLINK-36245] Release comments after KafkaSink does not rely on the Depreciated API Review Comment: Same issue as `TestAvroConsumerConfluent` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/BatchSink.java: ########## @@ -73,64 +51,4 @@ public static DataStreamSink<Row> createBatchNoCompactSink( sink.getTransformation().setParallelism(parallelism, parallelismConfigured); return sink; } - - public static <T> DataStreamSink<?> createBatchCompactSink( Review Comment: Why removing this method? It looks like this method is copy-pasted into `HiveTableSink`. What about we keep the original one and reduce the amount of change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org