[ https://issues.apache.org/jira/browse/KAFKA-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575146#comment-16575146 ]
ASF GitHub Bot commented on KAFKA-7250: --------------------------------------- guozhangwang closed pull request #5481: KAFKA-7250: switch scala transform to TransformSupplier URL: https://github.com/apache/kafka/pull/5481 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 83b169b0a51..7f5a1d9e83c 100644 --- a/build.gradle +++ b/build.gradle @@ -1015,6 +1015,7 @@ project(':streams:streams-scala') { testCompile libs.junit testCompile libs.scalatest + testCompile libs.easymock testRuntime libs.slf4jlog4j } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index a8766bd3566..adc1850dc32 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} -import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor} +import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ @@ -31,8 +31,8 @@ import scala.collection.JavaConverters._ /** * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object. * - * @param [K] Type of keys - * @param [V] Type of values + * @tparam K Type of keys + * @tparam V Type of values * @param inner The underlying Java abstraction for KStream * * @see `org.apache.kafka.streams.kstream.KStream` @@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def print(printed: Printed[K, V]): Unit = inner.print(printed) /** - * Perform an action on each record of 'KStream` + * Perform an action on each record of `KStream` * * @param action an action to perform on each record * @see `org.apache.kafka.streams.kstream.KStream#foreach` @@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.foreach((k: K, v: V) => action(k, v)) /** - * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on + * Creates an array of `KStream` from this stream by branching the records in the original stream based on * the supplied predicates. * * @param predicates the ordered list of functions that return a Boolean * @return multiple distinct substreams of this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#branch` */ - def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] = + //noinspection ScalaUnnecessaryParentheses + def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream)) /** @@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param topic the topic name - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` */ @@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param topic the topic name - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @see `org.apache.kafka.streams.kstream.KStream#to` */ def to(topic: String)(implicit produced: Produced[K, V]): Unit = @@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @see `org.apache.kafka.streams.kstream.KStream#to` */ def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = @@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def transform[K1, V1](transformerSupplier: () => Transformer[K, V, KeyValue[K1, V1]], + def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], stateStoreNames: String*): KStream[K1, V1] = - inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*) + inner.transform(transformerSupplier, stateStoreNames: _*) /** * Transform the value of each input record into a new value (with possible new type) of the output record. @@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * In order to assign a state, the state must be created and registered * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` * - * @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]] + * @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]] * @param stateStoreNames the names of the state store used by the processor * @see `org.apache.kafka.streams.kstream.KStream#process` */ def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = { + //noinspection ConvertExpressionToSAM // because of the 2.11 build val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] { override def get(): Processor[K, V] = processorSupplier() } @@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * // to the groupByKey call * }}} * - * @param (implicit) serialized the instance of Serialized that gives the serdes + * @param serialized the instance of Serialized that gives the serdes * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupByKey` */ @@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner) /** - * Perform an action on each record of {@code KStream}. + * Perform an action on each record of `KStream`. * <p> * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) * and returns an unchanged stream. diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 8a0eabb3af4..b596dd37fa6 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala import java.util.regex.Pattern -import org.scalatest.junit.JUnitSuite -import org.junit.Assert._ -import org.junit._ - +import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KStream => KStreamJ, KTable => KTableJ, _} +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.kstream._ - -import ImplicitConversions._ - import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _} -import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _} -import org.apache.kafka.streams.processor.ProcessorContext +import org.junit.Assert._ +import org.junit._ +import org.scalatest.junit.JUnitSuite -import collection.JavaConverters._ +import _root_.scala.collection.JavaConverters._ /** * Test suite that verifies that the topology built by the Java and Scala APIs match. @@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite { val streamBuilder = new StreamsBuilder val textLines = streamBuilder.stream[String, String](inputTopic) + //noinspection ConvertExpressionToSAM due to 2.11 build val _: KTable[String, Long] = textLines - .transform( - () => + .transform(new TransformerSupplier[String, String, KeyValue[String, String]] { + override def get(): Transformer[String, String, KeyValue[String, String]] = new Transformer[String, String, KeyValue[String, String]] { override def init(context: ProcessorContext): Unit = Unit + override def transform(key: String, value: String): KeyValue[String, String] = new KeyValue(key, value.toLowerCase) + override def close(): Unit = Unit - } - ) + } + }) .groupBy((k, v) => v) .count() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka-Streams-Scala DSL transform shares transformer instance > ------------------------------------------------------------- > > Key: KAFKA-7250 > URL: https://issues.apache.org/jira/browse/KAFKA-7250 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Michal > Assignee: Michal Dziemianko > Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > > The new Kafka Streams Scala DSL provides transform function with following > signature > {{def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], > stateStoreNames: String*): KStream[K1, V1]}} > the provided 'transformer' (will refer to it as scala-transformer) instance > is than used to derive java Transformer instance and in turn a > TransformerSupplier that is passed to the underlying java DSL. However that > causes all the tasks to share the same instance of the scala-transformer. > This introduce all sort of issues. The simplest way to reproduce is to > implement simplest transformer of the following shape: > {{.transform(new Transformer[String, String, (String, String)] {}} > var context: ProcessorContext = _ > {{ def init(pc: ProcessorContext) = \{ context = pc}}} > {{ def transform(k: String, v: String): (String, String) = {}} > context.timestamp() > ... > {{ }}}{{})}} > the call to timestmap will die with exception "This should not happen as > timestamp() should only be called while a record is processed" due to record > context not being set - while the update of record context was actually > performed, but due to shared nature of the scala-transformer the local > reference to the processor context is pointing to the one of the last > initialized task rather than the current task. > The solution is to accept a function in following manner: > def transform[K1, V1](getTransformer: () => Transformer[K, V, (K1, V1)], > stateStoreNames: String*): KStream[K1, V1] > or TransformerSupplier - like the transformValues DSL function does. -- This message was sent by Atlassian JIRA (v7.6.3#76005)