Hi, I have a very basic window function that finds minimum and maximum values on a keyed stream.
``` package org.example import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector class ScalaSessionProcessWindowFunction extends ProcessWindowFunction[(String, Long), (String, Long, Long), String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long, Long)]): Unit = { val maxId = elements.maxBy(_._2)._2 val minId = elements.minBy(_._2)._2 out.collect((key, minId, maxId)) } } ``` Firstly, by using JUnit, I have written a test case in Java, and It worked fine: ``` package org.example; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; import scala.Tuple2; import scala.Tuple3; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; public class ScalaSessionProcessWindowFunctionJavaTest { @Test public void testWindow() throws Exception { StateDescriptor<ListState<Tuple2<String, Long>>, List<Tuple2<String, Long>>> stateDesc = new ListStateDescriptor<>( "window-contents", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() { }).createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Long>, Iterable<Tuple2<String, Long>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( EventTimeSessionWindows.withGap(Time.milliseconds(2000)), new TimeWindow.Serializer(), (input) -> input._1, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableProcessWindowFunction<>(new ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())), EventTimeTrigger.create(), 0, null ); OneInputStreamOperatorTestHarness<Tuple2<String, Long>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, (input) -> input._1, BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1L), 0)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2L), 10)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3L), 25)); testHarness.processWatermark(3000); testHarness.setProcessingTime(3000); List<Tuple3<String, Long, Long>> outputValues = testHarness.extractOutputValues(); assertEquals(new Tuple3("key1", 1L, 3L), outputValues.get(0)); } } ``` Then because we write our Flink projects in Scala, I’ve tried to convert above test case to Scala: ``` package org.example import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.state.ListStateDescriptor import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation} import org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers class ScalaSessionProcessWindowFunctionTest extends AnyFlatSpec with Matchers { "ScalaSessionProcessWindowFunction" should "testWindow" in { val stateDesc = new ListStateDescriptor[(String, Long)]("window-contents", TypeInformation.of(new TypeHint[(String, Long)] {}).createSerializer(new ExecutionConfig)) val operator = new WindowOperator[String, (String, Long), Iterable[(String, Long)], (String, Long, Long), TimeWindow]( EventTimeSessionWindows.withGap(Time.milliseconds(2000)), new TimeWindow.Serializer, _._1, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig), stateDesc, new InternalIterableProcessWindowFunction(new ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())), EventTimeTrigger.create(), 0, null ) val testHarness = new KeyedOneInputStreamOperatorTestHarness[String, (String, Long), (String, Long, Long)](operator, _._1, BasicTypeInfo.STRING_TYPE_INFO) testHarness.open() testHarness.processElement(new StreamRecord[(String, Long)](("key1", 1L), 0)) testHarness.processElement(new StreamRecord[(String, Long)](("key1", 2L), 10)) testHarness.processElement(new StreamRecord[(String, Long)](("key1", 3L), 25)) testHarness.processWatermark(3000) testHarness.setProcessingTime(3000) val outputValues = testHarness.extractOutputValues().toArray outputValues(0) shouldBe("key1", 1L, 3L) } } ``` However the Scala test does not compile: ``` [error] /Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:28:7: type mismatch; [error] found : org.apache.flink.api.common.state.ListStateDescriptor[(String, Long)] [error] required: org.apache.flink.api.common.state.StateDescriptor[_ <: org.apache.flink.api.common.state.AppendingState[(String, Long),Iterable[(String, Long)]], _] [error] stateDesc, [error] ^ [error] /Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:29:7: type mismatch; [error] found : org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction[(String, Long),(String, Long, Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow] [error] required: org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction[Iterable[(String, Long)],(String, Long, Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow] [error] new InternalIterableProcessWindowFunction(new ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())), [error] ^ [error] two errors found [error] (Test / compileIncremental) Compilation failed [error] Total time: 30 s, completed Nov 13, 2024 5:30:02 PM ``` Any ideas on how can fix it, thanks in advance. Btw, please ignore disclaimer message (if exist) located below. Regards, Burak YASAL UYARI: Bu e-posta, ekleri de dahil olmak üzere, yalnızca gönderildiği kişi veya kuruluşa özel olup gizli bilgi ve kişisel veri içerebilir. Bu e-postanın tarafınıza gönderim amacı ile sınırlı ve orantılı olarak kullanılması yasal bir zorunluluktur. E-posta içeriğinin gönderim amacı dışında kullanılması, ifşa edilmesi, kopyalanması, arşivlenmesi veya dağıtımının yapılması yasaktır. Ayrıca, işbu e-posta içeriğinde yer alan gizli bilgi ve kişisel verilerin güvenliğinin sağlanması sorumluluğu tarafınıza ait olup herhangi bir ihlal halinde şirketimizin sorumluluğu bulunmamaktadır. İşbu e-postanın muhatabı olmamanıza rağmen size ulaşmış olması halinde, e-postayı derhal imha ederek bu durumu gecikmeksizin tarafımıza bildirmenizi rica ederiz. LEGAL DISCLAIMER: This email, including any attachments, may contain confidential information and personal data intended solely for the use of the person or entity to whom it is addressed. You are under a legal obligation to use this email and any attachments only for their intended purpose. Any disclosure, duplication, misuse, archiving, copying, distribution of this email by and to others is strictly prohibited. In addition, the responsibility for ensuring the security of confidential information and personal data contained in the content of this email lies with you, and our company is not responsible in case of any violation. If this email has reached you despite not being the intended recipient, please promptly delete the email and notify us without undue delay.