[ https://issues.apache.org/jira/browse/KAFKA-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570896#comment-16570896 ]
ASF GitHub Bot commented on KAFKA-7250: --------------------------------------- mdziemianko opened a new pull request #5468: KAFKA-7250: fix transform function in scala DSL to accept TranformerS… URL: https://github.com/apache/kafka/pull/5468 …upplier Restructuring scala DSL transform function to accept TransformerSupplier instead of a single instance of Transformer that was shared across tasks. Updated scaladoc. Added a unit test to ensure created topology corresponds to equivalent java definition. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka-Streams-Scala DSL transform shares transformer instance > ------------------------------------------------------------- > > Key: KAFKA-7250 > URL: https://issues.apache.org/jira/browse/KAFKA-7250 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Michal > Assignee: Michal Dziemianko > Priority: Major > Labels: scala > > The new Kafka Streams Scala DSL provides transform function with following > signature > {{def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], > stateStoreNames: String*): KStream[K1, V1]}} > the provided 'transformer' (will refer to it as scala-transformer) instance > is than used to derive java Transformer instance and in turn a > TransformerSupplier that is passed to the underlying java DSL. However that > causes all the tasks to share the same instance of the scala-transformer. > This introduce all sort of issues. The simplest way to reproduce is to > implement simplest transformer of the following shape: > {{.transform(new Transformer[String, String, (String, String)] {}} > var context: ProcessorContext = _ > {{ def init(pc: ProcessorContext) = \{ context = pc}}} > {{ def transform(k: String, v: String): (String, String) = {}} > context.timestamp() > ... > {{ }}}{{})}} > the call to timestmap will die with exception "This should not happen as > timestamp() should only be called while a record is processed" due to record > context not being set - while the update of record context was actually > performed, but due to shared nature of the scala-transformer the local > reference to the processor context is pointing to the one of the last > initialized task rather than the current task. > The solution is to accept a function in following manner: > def transform[K1, V1](getTransformer: () => Transformer[K, V, (K1, V1)], > stateStoreNames: String*): KStream[K1, V1] > or TransformerSupplier - like the transformValues DSL function does. -- This message was sent by Atlassian JIRA (v7.6.3#76005)