[ 
https://issues.apache.org/jira/browse/KAFKA-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575146#comment-16575146
 ] 

ASF GitHub Bot commented on KAFKA-7250:
---------------------------------------

guozhangwang closed pull request #5481: KAFKA-7250: switch scala transform to 
TransformSupplier
URL: https://github.com/apache/kafka/pull/5481
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 83b169b0a51..7f5a1d9e83c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1015,6 +1015,7 @@ project(':streams:streams-scala') {
 
     testCompile libs.junit
     testCompile libs.scalatest
+    testCompile libs.easymock
 
     testRuntime libs.slf4jlog4j
   }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index a8766bd3566..adc1850dc32 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, 
TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorContext, 
ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -31,8 +31,8 @@ import scala.collection.JavaConverters._
 /**
  * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and 
delegates method calls to the underlying Java object.
  *
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
  * @param inner The underlying Java abstraction for KStream
  *
  * @see `org.apache.kafka.streams.kstream.KStream`
@@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   def print(printed: Printed[K, V]): Unit = inner.print(printed)
 
   /**
-   * Perform an action on each record of 'KStream`
+   * Perform an action on each record of `KStream`
    *
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
@@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.foreach((k: K, v: V) => action(k, v))
 
   /**
-   * Creates an array of {@code KStream} from this stream by branching the 
records in the original stream based on
+   * Creates an array of `KStream` from this stream by branching the records 
in the original stream based on
    * the supplied predicates.
    *
    * @param predicates the ordered list of functions that return a Boolean
    * @return multiple distinct substreams of this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#branch`
    */
-  def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
+  //noinspection ScalaUnnecessaryParentheses
+  def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] =
     inner.branch(predicates.map(_.asPredicate): _*).map(kstream => 
wrapKStream(kstream))
 
   /**
@@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * }}}
    *
    * @param topic the topic name
-   * @param (implicit) produced the instance of Produced that gives the serdes 
and `StreamPartitioner`
+   * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially 
repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
    */
@@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * }}}
    *
    * @param topic the topic name
-   * @param (implicit) produced the instance of Produced that gives the serdes 
and `StreamPartitioner`
+   * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @see `org.apache.kafka.streams.kstream.KStream#to`
    */
   def to(topic: String)(implicit produced: Produced[K, V]): Unit =
@@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * }}}
    *
    * @param extractor the extractor to determine the name of the Kafka topic 
to write to for reach record
-   * @param (implicit) produced the instance of Produced that gives the serdes 
and `StreamPartitioner`
+   * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @see `org.apache.kafka.streams.kstream.KStream#to`
    */
   def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, 
V]): Unit =
@@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with new key and 
value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def transform[K1, V1](transformerSupplier: () => Transformer[K, V, 
KeyValue[K1, V1]],
+  def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, 
KeyValue[K1, V1]],
                         stateStoreNames: String*): KStream[K1, V1] =
-    inner.transform(transformerSupplier.asTransformerSupplier, 
stateStoreNames: _*)
+    inner.transform(transformerSupplier, stateStoreNames: _*)
 
   /**
    * Transform the value of each input record into a new value (with possible 
new type) of the output record.
@@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * In order to assign a state, the state must be created and registered
    * beforehand via stores added via `addStateStore` or `addGlobalStore` 
before they can be connected to the `Transformer`
    *
-   * @param processorSupplier a function that generates a 
[[org.apache.kafka.stream.Processor]]
+   * @param processorSupplier a function that generates a 
[[org.apache.kafka.streams.processor.Processor]]
    * @param stateStoreNames   the names of the state store used by the 
processor
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */
   def process(processorSupplier: () => Processor[K, V], stateStoreNames: 
String*): Unit = {
+    //noinspection ConvertExpressionToSAM // because of the 2.11 build
     val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, 
V] {
       override def get(): Processor[K, V] = processorSupplier()
     }
@@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * // to the groupByKey call
    * }}}
    *
-   * @param (implicit) serialized the instance of Serialized that gives the 
serdes
+   * @param serialized the instance of Serialized that gives the serdes
    * @return a [[KGroupedStream]] that contains the grouped records of the 
original [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
    */
@@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner)
 
   /**
-   * Perform an action on each record of {@code KStream}.
+   * Perform an action on each record of `KStream`.
    * <p>
    * Peek is a non-terminal operation that triggers a side effect (such as 
logging or statistics collection)
    * and returns an unchanged stream.
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 8a0eabb3af4..b596dd37fa6 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala
 
 import java.util.regex.Pattern
 
-import org.scalatest.junit.JUnitSuite
-import org.junit.Assert._
-import org.junit._
-
+import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, 
KStream => KStreamJ, KTable => KTableJ, _}
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
-
-import ImplicitConversions._
-
 import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
-import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => 
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
-import org.apache.kafka.streams.processor.ProcessorContext
+import org.junit.Assert._
+import org.junit._
+import org.scalatest.junit.JUnitSuite
 
-import collection.JavaConverters._
+import _root_.scala.collection.JavaConverters._
 
 /**
  * Test suite that verifies that the topology built by the Java and Scala APIs 
match.
@@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite {
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
 
+      //noinspection ConvertExpressionToSAM due to 2.11 build
       val _: KTable[String, Long] =
         textLines
-          .transform(
-            () =>
+          .transform(new TransformerSupplier[String, String, KeyValue[String, 
String]] {
+            override def get(): Transformer[String, String, KeyValue[String, 
String]] =
               new Transformer[String, String, KeyValue[String, String]] {
                 override def init(context: ProcessorContext): Unit = Unit
+
                 override def transform(key: String, value: String): 
KeyValue[String, String] =
                   new KeyValue(key, value.toLowerCase)
+
                 override def close(): Unit = Unit
-            }
-          )
+              }
+          })
           .groupBy((k, v) => v)
           .count()
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka-Streams-Scala DSL transform shares transformer instance
> -------------------------------------------------------------
>
>                 Key: KAFKA-7250
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7250
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Michal
>            Assignee: Michal Dziemianko
>            Priority: Major
>              Labels: scala
>             Fix For: 2.0.1, 2.1.0
>
>
> The new Kafka Streams Scala DSL provides transform function with following 
> signature
> {{def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], 
> stateStoreNames: String*): KStream[K1, V1]}}
> the provided 'transformer' (will refer to it as scala-transformer)  instance 
> is than used to derive java Transformer instance and in turn a 
> TransformerSupplier that is passed to the underlying java DSL. However that 
> causes all the tasks to share the same instance of the scala-transformer. 
> This introduce all sort of issues. The simplest way to reproduce is to 
> implement simplest transformer of the following shape:
> {{.transform(new Transformer[String, String, (String, String)] {}}
>     var context: ProcessorContext = _
> {{  def init(pc: ProcessorContext) = \{ context = pc}}}
> {{  def transform(k: String, v: String): (String, String) = {}}
>         context.timestamp() 
>         ...
> {{  }}}{{})}}
> the call to timestmap will die with exception "This should not happen as 
> timestamp() should only be called while a record is processed" due to record 
> context not being set - while the update of record context was actually 
> performed, but due to shared nature of the scala-transformer the local 
> reference to the processor context is pointing to the one of the last 
> initialized task rather than the current task. 
> The solution is to accept a function in following manner: 
> def transform[K1, V1](getTransformer: () => Transformer[K, V, (K1, V1)], 
> stateStoreNames: String*): KStream[K1, V1]
>  or TransformerSupplier - like the transformValues DSL function does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to