PatrickRen commented on code in PR #25331:
URL: https://github.com/apache/flink/pull/25331#discussion_r1778440599


##########
pom.xml:
##########
@@ -2420,6 +2420,10 @@ under the License.
                                                                
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
                                                                <!-- 
FLINK-36327 Remove dependency about flink-scala and flink-streaming-scala in 
table module in flink-2.0 -->
                                                                
<exclude>org.apache.flink.table.api.typeutils.*</exclude>
+                                                               <!-- 
FLINK-36245 Remove SinkV1 API in flink-2.0. -->
+                                                               
<exclude>org.apache.flink.streaming.api.scala.DataStream#sinkTo()</exclude>

Review Comment:
   The DataStream Scala API has been removed on master



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -258,17 +198,7 @@ protected void submitRequestEntries(
      *     method should be called.
      */
     protected void submitRequestEntries(
-            List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> 
resultHandler) {
-        submitRequestEntries(
-                requestEntries,
-                requestsToRetry -> {
-                    if (requestsToRetry.isEmpty()) {
-                        resultHandler.complete();
-                    } else {
-                        resultHandler.retryForEntries(requestsToRetry);
-                    }
-                });
-    }
+            List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> 
resultHandler) {}

Review Comment:
   This method should throw `UnsupportedOperationException` to keep the same 
behavior as the previous deprecated method do.



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java:
##########
@@ -85,35 +81,39 @@ public static void main(String[] args) throws Exception {
         SingleOutputStreamOperator<String> mapToString =
                 input.map((MapFunction<User, String>) 
SpecificRecordBase::toString);
 
-        KafkaSink<String> stringSink =
-                KafkaSink.<String>builder()
-                        .setBootstrapServers(bootstrapServers)
-                        .setRecordSerializer(
-                                KafkaRecordSerializationSchema.builder()
-                                        .setValueSerializationSchema(new 
SimpleStringSchema())
-                                        
.setTopic(parameterTool.getRequired("output-string-topic"))
-                                        .build())
-                        .setKafkaProducerConfig(config)
-                        .build();
-        mapToString.sinkTo(stringSink);
-
-        KafkaSink<User> avroSink =
-                KafkaSink.<User>builder()
-                        .setBootstrapServers(bootstrapServers)
-                        .setRecordSerializer(
-                                KafkaRecordSerializationSchema.builder()
-                                        .setValueSerializationSchema(
-                                                
ConfluentRegistryAvroSerializationSchema
-                                                        .forSpecific(
-                                                                User.class,
-                                                                
parameterTool.getRequired(
-                                                                        
"output-subject"),
-                                                                
schemaRegistryUrl))
-                                        
.setTopic(parameterTool.getRequired("output-avro-topic"))
-                                        .build())
-                        .build();
-        input.sinkTo(avroSink);
-
-        env.execute("Kafka Confluent Schema Registry AVRO Example");
+        // TODO: [FLINK-36245] Release comments after KafkaSink does not rely 
on the Depreciated API

Review Comment:
   I don't get the point here. AFAIK Kafka sink is already using the latest 
Sink V2 API. Why we disable the code here? 
   
   And please don't leave any dead code (commented) here.



##########
flink-walkthroughs/flink-walkthrough-common/pom.xml:
##########
@@ -43,4 +43,74 @@ under the License.
                        <scope>provided</scope>
                </dependency>
        </dependencies>
+
+       <build>

Review Comment:
   What is this plugin used for?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java:
##########
@@ -1014,8 +1014,8 @@ public DataStreamSink<T> printToErr(String 
sinkIdentifier) {
      * @param path The path pointing to the location the text file is written 
to.
      * @return The closed DataStream.
      * @deprecated Please use the {@link
-     *     
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} 
explicitly
-     *     using the {@link #addSink(SinkFunction)} method.
+     *     
org.apache.flink.streaming.api.functions.sink.filesystem.legacy.StreamingFileSink}
+     *     explicitly using the {@link #addSink(SinkFunction)} method.
      */
     @Deprecated

Review Comment:
   What about removing these methods with deprecation annotation directly, like 
`writeAsText`, `writeAsCsv` etc.?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -281,69 +211,10 @@ protected void submitRequestEntries(
      */
     protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
-    /**
-     * This method is deprecated, please use the constructor that specifies 
the {@link
-     * AsyncSinkWriterConfiguration}.
-     */
-    @Deprecated
-    public AsyncSinkWriter(
-            ElementConverter<InputT, RequestEntryT> elementConverter,
-            Sink.InitContext context,
-            int maxBatchSize,
-            int maxInFlightRequests,
-            int maxBufferedRequests,
-            long maxBatchSizeInBytes,
-            long maxTimeInBufferMS,
-            long maxRecordSizeInBytes) {
-        this(
-                elementConverter,
-                context,
-                maxBatchSize,
-                maxInFlightRequests,
-                maxBufferedRequests,
-                maxBatchSizeInBytes,
-                maxTimeInBufferMS,
-                maxRecordSizeInBytes,
-                Collections.emptyList());
-    }
-
-    /**
-     * This method is deprecated, please use the constructor that specifies 
the {@link
-     * AsyncSinkWriterConfiguration}.
-     */
-    @Deprecated
-    public AsyncSinkWriter(
-            ElementConverter<InputT, RequestEntryT> elementConverter,
-            Sink.InitContext context,
-            int maxBatchSize,
-            int maxInFlightRequests,
-            int maxBufferedRequests,
-            long maxBatchSizeInBytes,
-            long maxTimeInBufferMS,
-            long maxRecordSizeInBytes,
-            Collection<BufferedRequestState<RequestEntryT>> states) {
-        this(
-                elementConverter,
-                context,
-                AsyncSinkWriterConfiguration.builder()
-                        .setMaxBatchSize(maxBatchSize)
-                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
-                        .setMaxInFlightRequests(maxInFlightRequests)
-                        .setMaxBufferedRequests(maxBufferedRequests)
-                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
-                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
-                        .build(),
-                states);
-    }
-
-    /**
-     * Should be removed along {@link
-     * org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter}.
-     */
-    @Deprecated
+    /** Should be removed along {@link 
org.apache.flink.api.connector.sink2.StatefulSinkWriter}. */

Review Comment:
   This comment should be removed.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:
##########
@@ -61,24 +57,25 @@ public static void main(String[] args) throws Exception {
                         Long.MAX_VALUE,
                         RateLimiterStrategy.perSecond(recordsPerSecond),
                         TypeInformation.of(Event.class));
-
-        env.fromSource(
-                        eventGeneratorSource,
-                        WatermarkStrategy.noWatermarks(),
-                        "Events Generator Source")
-                .sinkTo(
-                        KafkaSink.<Event>builder()
-                                .setBootstrapServers(brokers)
-                                .setRecordSerializer(
-                                        
KafkaRecordSerializationSchema.builder()
-                                                .setValueSerializationSchema(
-                                                        new 
EventDeSerializationSchema())
-                                                .setTopic(kafkaTopic)
-                                                .build())
-                                .build());
+        // TODO: [FLINK-36245] Release comments after KafkaSink does not rely 
on the Depreciated API

Review Comment:
   Same issue as `TestAvroConsumerConfluent`



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/BatchSink.java:
##########
@@ -73,64 +51,4 @@ public static DataStreamSink<Row> createBatchNoCompactSink(
         sink.getTransformation().setParallelism(parallelism, 
parallelismConfigured);
         return sink;
     }
-
-    public static <T> DataStreamSink<?> createBatchCompactSink(

Review Comment:
   Why removing this method?
   
   It looks like this method is copy-pasted into `HiveTableSink`. What about we 
keep the original one and reduce the amount of change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to