mjsax commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1938398492


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -662,87 +418,30 @@ public synchronized Topology addSink(final String name,
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic,
-     * using the supplied partitioner.
-     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to 
determine how records are distributed among
-     * the named Kafka topic's partitions.
-     * Such control is often useful with topologies that use {@link 
#addStateStore(StoreBuilder, String...) state
-     * stores} in its processors.
-     * In most other cases, however, a partitioner needs not be specified and 
Kafka will automatically distribute
-     * records among partitions using Kafka's default partitioning logic.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param partitioner the function that should be used to determine the 
partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
-    public synchronized <K, V> Topology addSink(final String name,
-                                                final String topic,
-                                                final StreamPartitioner<? 
super K, ? super V> partitioner,
-                                                final String... parentNames) {
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final StreamPartitioner<?, ?> 
partitioner,
+                                         final String... parentNames) {
         internalTopologyBuilder.addSink(name, topic, null, null, partitioner, 
parentNames);
         return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
-     * The sink will use the specified key and value serializers.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param keySerializer the {@link Serializer key serializer} used when 
consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG 
default key serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param valueSerializer the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG 
default value serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
-    public synchronized <K, V> Topology addSink(final String name,
-                                                final String topic,
-                                                final Serializer<K> 
keySerializer,
-                                                final Serializer<V> 
valueSerializer,
-                                                final String... parentNames) {
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final Serializer<?> keySerializer,
+                                         final Serializer<?> valueSerializer,

Review Comment:
   Not totally sure about the generic types here... In the end, we cannot 
verify any types in the PAPI, and above (`addSource` we also use `<?>`).
   
   On the other hand, when I try to remove generics from `addProcessor()` some 
code does not compile any longer, but I am not sure why... Does anybody know? 
Curious to understand this...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to