Hi Burak, It seems the variable you have in the unit test has incorrect type or it is not fully compatible:
this variable --> stateDesc found : org.apache.flink.api.common.state.ListStateDescriptor[(String, Long)] required: org.apache.flink.api.common.state.StateDescriptor[ _ <: org.apache.flink.api.common.state.AppendingState[ (String, Long), Iterable[(String, Long)] ], _ ] // this is parent type in the hierarchy Perhaps try full type inference, without explicit type tags: val stateDesc = new ListStateDescriptor( "window-contents", TypeInformation.of(new TypeHint[(String, Long)] {} ).createSerializer(new ExecutionConfig)) It is hard to see mismatch in your loooong code, smaller example would be helpful to spot the issue. Maybe look at the example here: https://github.com/flink-extended/flink-scala-api/blob/master/modules/examples/src/test/scala/org/example/CustomTriggerTests.scala#L54 . Alexey On Wed, Nov 13, 2024 at 3:36 PM Burak Dursunlar < burak.dursun...@trendyol.com> wrote: > Hi, > > I have a very basic window function that finds minimum and maximum values > on a keyed stream. > > ``` > package org.example > > import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import org.apache.flink.util.Collector > > class ScalaSessionProcessWindowFunction extends > ProcessWindowFunction[(String, Long), (String, Long, Long), String, > TimeWindow]{ > override def process(key: String, context: Context, elements: > Iterable[(String, Long)], out: Collector[(String, Long, Long)]): Unit = { > > val maxId = elements.maxBy(_._2)._2 > val minId = elements.minBy(_._2)._2 > > out.collect((key, minId, maxId)) > } > } > ``` > > Firstly, by using JUnit, I have written a test case in Java, and It worked > fine: > > ``` > package org.example; > > import org.apache.flink.api.common.ExecutionConfig; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.state.StateDescriptor; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import > org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper > ; > import > org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows > ; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; > import > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction > ; > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; > import > org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; > import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; > import org.junit.Test; > import scala.Tuple2; > import scala.Tuple3; > > import java.util.List; > > import static org.junit.jupiter.api.Assertions.assertEquals; > > public class ScalaSessionProcessWindowFunctionJavaTest { > > @Test > public void testWindow() throws Exception { > > > StateDescriptor<ListState<Tuple2<String, Long>>, List<Tuple2<String, Long>>> > stateDesc = new ListStateDescriptor<>( > "window-contents", > TypeInformation.of(new TypeHint<Tuple2<String, Long>>() { > }).createSerializer(new ExecutionConfig())); > > WindowOperator<String, Tuple2<String, Long>, Iterable<Tuple2<String, > Long>>, Tuple3<String, Long, Long>, TimeWindow> operator = > new WindowOperator<>( > EventTimeSessionWindows.withGap(Time.milliseconds(2000)), > new TimeWindow.Serializer(), > (input) -> input._1, > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableProcessWindowFunction<>(new > ScalaProcessWindowFunctionWrapper(new > ScalaSessionProcessWindowFunction())), > EventTimeTrigger.create(), > 0, > null > ); > > OneInputStreamOperatorTestHarness<Tuple2<String, Long>, Tuple3<String, > Long, Long>> testHarness = new > KeyedOneInputStreamOperatorTestHarness<>(operator, (input) -> input._1, > BasicTypeInfo.STRING_TYPE_INFO); > > testHarness.open(); > > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1L), 0 > )); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2L), 10 > )); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3L), 25 > )); > > testHarness.processWatermark(3000); > testHarness.setProcessingTime(3000); > > > List<Tuple3<String, Long, Long>> outputValues = > testHarness.extractOutputValues(); > > assertEquals(new Tuple3("key1", 1L, 3L), outputValues.get(0)); > > } > > } > ``` > > Then because we write our Flink projects in Scala, I’ve tried to convert > above test case to Scala: > > ``` > package org.example > > import org.apache.flink.api.common.ExecutionConfig > import org.apache.flink.api.common.state.ListStateDescriptor > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, > TypeInformation} > import > org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper > import > org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator > import > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord > import > org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness > import org.scalatest.flatspec.AnyFlatSpec > import org.scalatest.matchers.should.Matchers > > class ScalaSessionProcessWindowFunctionTest extends AnyFlatSpec with Matchers > { > > "ScalaSessionProcessWindowFunction" should "testWindow" in { > val stateDesc = new ListStateDescriptor[(String, Long)]("window-contents", > TypeInformation.of(new TypeHint[(String, Long)] {}).createSerializer(new > ExecutionConfig)) > > val operator = new WindowOperator[String, (String, Long), Iterable[(String, > Long)], (String, Long, Long), TimeWindow]( > EventTimeSessionWindows.withGap(Time.milliseconds(2000)), > new TimeWindow.Serializer, > _._1, > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig), > stateDesc, > new InternalIterableProcessWindowFunction(new > ScalaProcessWindowFunctionWrapper(new > ScalaSessionProcessWindowFunction())), > EventTimeTrigger.create(), > 0, > null > ) > > val testHarness = new KeyedOneInputStreamOperatorTestHarness[String, > (String, Long), (String, Long, Long)](operator, _._1, BasicTypeInfo. > STRING_TYPE_INFO) > > testHarness.open() > > testHarness.processElement(new StreamRecord[(String, Long)](("key1", 1L), > 0)) > testHarness.processElement(new StreamRecord[(String, Long)](("key1", 2L), > 10)) > testHarness.processElement(new StreamRecord[(String, Long)](("key1", 3L), > 25)) > > testHarness.processWatermark(3000) > testHarness.setProcessingTime(3000) > > val outputValues = testHarness.extractOutputValues().toArray > > > outputValues(0) shouldBe("key1", 1L, 3L) > } > } > ``` > > > However the Scala test does not compile: > > ``` > [error] > /Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:28:7: > type mismatch; > [error] found : > org.apache.flink.api.common.state.ListStateDescriptor[(String, Long)] > [error] required: org.apache.flink.api.common.state.StateDescriptor[_ <: > org.apache.flink.api.common.state.AppendingState[(String, > Long),Iterable[(String, Long)]], _] > [error] stateDesc, > [error] ^ > [error] > /Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:29:7: > type mismatch; > [error] found : > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction[(String, > Long),(String, Long, > Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow] > [error] required: > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction[Iterable[(String, > Long)],(String, Long, > Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow] > [error] new InternalIterableProcessWindowFunction(new > ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())), > [error] ^ > [error] two errors found > [error] (Test / compileIncremental) Compilation failed > [error] Total time: 30 s, completed Nov 13, 2024 5:30:02 PM > > ``` > > Any ideas on how can fix it, thanks in advance. > > Btw, please ignore disclaimer message (if exist) located below. > > Regards, Burak > > > YASAL UYARI: > Bu e-posta, ekleri de dahil olmak üzere, yalnızca gönderildiği kişi veya > kuruluşa özel olup gizli bilgi ve kişisel veri içerebilir. Bu e-postanın > tarafınıza gönderim amacı ile sınırlı ve orantılı olarak kullanılması yasal > bir zorunluluktur. E-posta içeriğinin gönderim amacı dışında kullanılması, > ifşa edilmesi, kopyalanması, arşivlenmesi veya dağıtımının yapılması > yasaktır. Ayrıca, işbu e-posta içeriğinde yer alan gizli bilgi ve kişisel > verilerin güvenliğinin sağlanması sorumluluğu tarafınıza ait olup herhangi > bir ihlal halinde şirketimizin sorumluluğu bulunmamaktadır. İşbu e-postanın > muhatabı olmamanıza rağmen size ulaşmış olması halinde, e-postayı derhal > imha ederek bu durumu gecikmeksizin tarafımıza bildirmenizi rica ederiz. > > LEGAL DISCLAIMER: > This email, including any attachments, may contain confidential > information and personal data intended solely for the use of the person or > entity to whom it is addressed. You are under a legal obligation to use > this email and any attachments only for their intended purpose. Any > disclosure, duplication, misuse, archiving, copying, distribution of this > email by and to others is strictly prohibited. In addition, the > responsibility for ensuring the security of confidential information and > personal data contained in the content of this email lies with you, and our > company is not responsible in case of any violation. If this email has > reached you despite not being the intended recipient, please promptly > delete the email and notify us without undue delay. >