[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767006#comment-16767006 ]
Mateusz Owczarek commented on KAFKA-7882: ----------------------------------------- [~mjsax] As I reported, I was using Kafka Streams 2.0.0 with Scala DSL API, where transform method accepts Transformer instance (not TransformerSupplier) as a parameter: {code} def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = { val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] { override def get(): Transformer[K, V, KeyValue[K1, V1]] = { new Transformer[K, V, KeyValue[K1, V1]] { override def transform(key: K, value: V): KeyValue[K1, V1] = { transformer.transform(key, value) match { case (k1, v1) => KeyValue.pair(k1, v1) case _ => null } } override def init(context: ProcessorContext): Unit = transformer.init(context) override def close(): Unit = transformer.close() } } } inner.transform(transformerSupplierJ, stateStoreNames: _*) } {code} I believe the implementation changed now in 2.1.0 and does actually accept TransformerSupplier. > StateStores are frequently closed during the 'transform' method > --------------------------------------------------------------- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Mateusz Owczarek > Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)