Hi Burak,

It seems the variable you have in the unit test has incorrect type or it is
not fully compatible:

this variable -->   stateDesc

 found   :  org.apache.flink.api.common.state.ListStateDescriptor[(String,
Long)]
required:  org.apache.flink.api.common.state.StateDescriptor[
     _ <: org.apache.flink.api.common.state.AppendingState[ (String, Long),
Iterable[(String, Long)] ],
     _
] // this is parent type in the hierarchy

Perhaps try full type inference, without explicit type tags:

val stateDesc = new ListStateDescriptor(
"window-contents",
TypeInformation.of(new TypeHint[(String, Long)] {}
).createSerializer(new ExecutionConfig))

It is hard to see mismatch in your loooong code, smaller example would be
helpful to spot the issue. Maybe look at the example here:
https://github.com/flink-extended/flink-scala-api/blob/master/modules/examples/src/test/scala/org/example/CustomTriggerTests.scala#L54
.

Alexey


On Wed, Nov 13, 2024 at 3:36 PM Burak Dursunlar <
burak.dursun...@trendyol.com> wrote:

> Hi,
>
> I have a very basic window function that finds minimum and maximum values
> on a keyed stream.
>
> ```
> package org.example
>
> import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.util.Collector
>
> class ScalaSessionProcessWindowFunction extends
> ProcessWindowFunction[(String, Long), (String, Long, Long), String,
> TimeWindow]{
> override def process(key: String, context: Context, elements:
> Iterable[(String, Long)], out: Collector[(String, Long, Long)]): Unit = {
>
> val maxId = elements.maxBy(_._2)._2
> val minId = elements.minBy(_._2)._2
>
> out.collect((key, minId, maxId))
> }
> }
> ```
>
> Firstly, by using JUnit, I have written a test case in Java, and It worked
> fine:
>
> ```
> package org.example;
>
> import org.apache.flink.api.common.ExecutionConfig;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.state.StateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper
> ;
> import
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
> ;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
> import
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction
> ;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import
> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
> import org.junit.Test;
> import scala.Tuple2;
> import scala.Tuple3;
>
> import java.util.List;
>
> import static org.junit.jupiter.api.Assertions.assertEquals;
>
> public class ScalaSessionProcessWindowFunctionJavaTest {
>
> @Test
> public void testWindow() throws Exception {
>
>
> StateDescriptor<ListState<Tuple2<String, Long>>, List<Tuple2<String, Long>>>
> stateDesc = new ListStateDescriptor<>(
> "window-contents",
> TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
> }).createSerializer(new ExecutionConfig()));
>
> WindowOperator<String, Tuple2<String, Long>, Iterable<Tuple2<String,
> Long>>, Tuple3<String, Long, Long>, TimeWindow> operator =
> new WindowOperator<>(
> EventTimeSessionWindows.withGap(Time.milliseconds(2000)),
> new TimeWindow.Serializer(),
> (input) -> input._1,
> BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
> stateDesc,
> new InternalIterableProcessWindowFunction<>(new
> ScalaProcessWindowFunctionWrapper(new
> ScalaSessionProcessWindowFunction())),
> EventTimeTrigger.create(),
> 0,
> null
> );
>
> OneInputStreamOperatorTestHarness<Tuple2<String, Long>, Tuple3<String,
> Long, Long>> testHarness = new
> KeyedOneInputStreamOperatorTestHarness<>(operator, (input) -> input._1,
> BasicTypeInfo.STRING_TYPE_INFO);
>
> testHarness.open();
>
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1L), 0
> ));
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2L), 10
> ));
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3L), 25
> ));
>
> testHarness.processWatermark(3000);
> testHarness.setProcessingTime(3000);
>
>
> List<Tuple3<String, Long, Long>> outputValues =
> testHarness.extractOutputValues();
>
> assertEquals(new Tuple3("key1", 1L, 3L), outputValues.get(0));
>
> }
>
> }
> ```
>
> Then because we write our Flink projects in Scala, I’ve tried to convert
> above test case to Scala:
>
> ```
> package org.example
>
> import org.apache.flink.api.common.ExecutionConfig
> import org.apache.flink.api.common.state.ListStateDescriptor
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint,
> TypeInformation}
> import
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper
> import
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
> import
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> import
> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
> import org.scalatest.flatspec.AnyFlatSpec
> import org.scalatest.matchers.should.Matchers
>
> class ScalaSessionProcessWindowFunctionTest extends AnyFlatSpec with Matchers
> {
>
> "ScalaSessionProcessWindowFunction" should "testWindow" in {
> val stateDesc = new ListStateDescriptor[(String, Long)]("window-contents",
> TypeInformation.of(new TypeHint[(String, Long)] {}).createSerializer(new
> ExecutionConfig))
>
> val operator = new WindowOperator[String, (String, Long), Iterable[(String,
> Long)], (String, Long, Long), TimeWindow](
> EventTimeSessionWindows.withGap(Time.milliseconds(2000)),
> new TimeWindow.Serializer,
> _._1,
> BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig),
> stateDesc,
> new InternalIterableProcessWindowFunction(new
> ScalaProcessWindowFunctionWrapper(new
> ScalaSessionProcessWindowFunction())),
> EventTimeTrigger.create(),
> 0,
> null
> )
>
> val testHarness = new KeyedOneInputStreamOperatorTestHarness[String,
> (String, Long), (String, Long, Long)](operator, _._1, BasicTypeInfo.
> STRING_TYPE_INFO)
>
> testHarness.open()
>
> testHarness.processElement(new StreamRecord[(String, Long)](("key1", 1L),
> 0))
> testHarness.processElement(new StreamRecord[(String, Long)](("key1", 2L),
> 10))
> testHarness.processElement(new StreamRecord[(String, Long)](("key1", 3L),
> 25))
>
> testHarness.processWatermark(3000)
> testHarness.setProcessingTime(3000)
>
> val outputValues = testHarness.extractOutputValues().toArray
>
>
> outputValues(0) shouldBe("key1", 1L, 3L)
> }
> }
> ```
>
>
> However the Scala test does not compile:
>
> ```
> [error]
> /Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:28:7:
> type mismatch;
> [error]  found   :
> org.apache.flink.api.common.state.ListStateDescriptor[(String, Long)]
> [error]  required: org.apache.flink.api.common.state.StateDescriptor[_ <:
> org.apache.flink.api.common.state.AppendingState[(String,
> Long),Iterable[(String, Long)]], _]
> [error]       stateDesc,
> [error]       ^
> [error]
> /Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:29:7:
> type mismatch;
> [error]  found   :
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction[(String,
> Long),(String, Long,
> Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
> [error]  required:
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction[Iterable[(String,
> Long)],(String, Long,
> Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
> [error]       new InternalIterableProcessWindowFunction(new
> ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
> [error]       ^
> [error] two errors found
> [error] (Test / compileIncremental) Compilation failed
> [error] Total time: 30 s, completed Nov 13, 2024 5:30:02 PM
>
> ```
>
> Any ideas on how can fix it, thanks in advance.
>
> Btw, please ignore disclaimer message (if exist) located below.
>
> Regards, Burak
>
>
> YASAL UYARI:
> Bu e-posta, ekleri de dahil olmak üzere, yalnızca gönderildiği kişi veya
> kuruluşa özel olup gizli bilgi ve kişisel veri içerebilir. Bu e-postanın
> tarafınıza gönderim amacı ile sınırlı ve orantılı olarak kullanılması yasal
> bir zorunluluktur. E-posta içeriğinin gönderim amacı dışında kullanılması,
> ifşa edilmesi, kopyalanması, arşivlenmesi veya dağıtımının yapılması
> yasaktır. Ayrıca, işbu e-posta içeriğinde yer alan gizli bilgi ve kişisel
> verilerin güvenliğinin sağlanması sorumluluğu tarafınıza ait olup herhangi
> bir ihlal halinde şirketimizin sorumluluğu bulunmamaktadır. İşbu e-postanın
> muhatabı olmamanıza rağmen size ulaşmış olması halinde, e-postayı derhal
> imha ederek bu durumu gecikmeksizin tarafımıza bildirmenizi rica ederiz.
>
> LEGAL DISCLAIMER:
> This email, including any attachments, may contain confidential
> information and personal data intended solely for the use of the person or
> entity to whom it is addressed. You are under a legal obligation to use
> this email and any attachments only for their intended purpose. Any
> disclosure, duplication, misuse, archiving, copying, distribution of this
> email by and to others is strictly prohibited. In addition, the
> responsibility for ensuring the security of confidential information and
> personal data contained in the content of this email lies with you, and our
> company is not responsible in case of any violation. If this email has
> reached you despite not being the intended recipient, please promptly
> delete the email and notify us without undue delay.
>

Reply via email to