Hi,

I have a very basic window function that finds minimum and maximum values on a 
keyed stream.

```
package org.example

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

class ScalaSessionProcessWindowFunction extends ProcessWindowFunction[(String, 
Long), (String, Long, Long), String, TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, 
Long)], out: Collector[(String, Long, Long)]): Unit = {

val maxId = elements.maxBy(_._2)._2
val minId = elements.minBy(_._2)._2

out.collect((key, minId, maxId))
}
}
```

Firstly, by using JUnit, I have written a test case in Java, and It worked fine:

```
package org.example;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper;
import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
import scala.Tuple2;
import scala.Tuple3;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ScalaSessionProcessWindowFunctionJavaTest {

@Test
public void testWindow() throws Exception {


StateDescriptor<ListState<Tuple2<String, Long>>, List<Tuple2<String, Long>>> 
stateDesc = new ListStateDescriptor<>(
"window-contents",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
}).createSerializer(new ExecutionConfig()));

WindowOperator<String, Tuple2<String, Long>, Iterable<Tuple2<String, Long>>, 
Tuple3<String, Long, Long>, TimeWindow> operator =
new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.milliseconds(2000)),
new TimeWindow.Serializer(),
(input) -> input._1,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableProcessWindowFunction<>(new 
ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
EventTimeTrigger.create(),
0,
null
);

OneInputStreamOperatorTestHarness<Tuple2<String, Long>, Tuple3<String, Long, 
Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, 
(input) -> input._1, BasicTypeInfo.STRING_TYPE_INFO);

testHarness.open();

testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1L), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2L), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3L), 25));

testHarness.processWatermark(3000);
testHarness.setProcessingTime(3000);


List<Tuple3<String, Long, Long>> outputValues = 
testHarness.extractOutputValues();

assertEquals(new Tuple3("key1", 1L, 3L), outputValues.get(0));

}

}
```

Then because we write our Flink projects in Scala, I’ve tried to convert above 
test case to Scala:

```
package org.example

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, 
TypeInformation}
import 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper
import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ScalaSessionProcessWindowFunctionTest extends AnyFlatSpec with Matchers {

"ScalaSessionProcessWindowFunction" should "testWindow" in {
val stateDesc = new ListStateDescriptor[(String, Long)]("window-contents", 
TypeInformation.of(new TypeHint[(String, Long)] {}).createSerializer(new 
ExecutionConfig))

val operator = new WindowOperator[String, (String, Long), Iterable[(String, 
Long)], (String, Long, Long), TimeWindow](
EventTimeSessionWindows.withGap(Time.milliseconds(2000)),
new TimeWindow.Serializer,
_._1,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig),
stateDesc,
new InternalIterableProcessWindowFunction(new 
ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
EventTimeTrigger.create(),
0,
null
)

val testHarness = new KeyedOneInputStreamOperatorTestHarness[String, (String, 
Long), (String, Long, Long)](operator, _._1, BasicTypeInfo.STRING_TYPE_INFO)

testHarness.open()

testHarness.processElement(new StreamRecord[(String, Long)](("key1", 1L), 0))
testHarness.processElement(new StreamRecord[(String, Long)](("key1", 2L), 10))
testHarness.processElement(new StreamRecord[(String, Long)](("key1", 3L), 25))

testHarness.processWatermark(3000)
testHarness.setProcessingTime(3000)

val outputValues = testHarness.extractOutputValues().toArray


outputValues(0) shouldBe("key1", 1L, 3L)
}
}
```


However the Scala test does not compile:

```
[error] 
/Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:28:7:
 type mismatch;
[error]  found   : 
org.apache.flink.api.common.state.ListStateDescriptor[(String, Long)]
[error]  required: org.apache.flink.api.common.state.StateDescriptor[_ <: 
org.apache.flink.api.common.state.AppendingState[(String, 
Long),Iterable[(String, Long)]], _]
[error]       stateDesc,
[error]       ^
[error] 
/Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:29:7:
 type mismatch;
[error]  found   : 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction[(String,
 Long),(String, Long, 
Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
[error]  required: 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction[Iterable[(String,
 Long)],(String, Long, 
Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
[error]       new InternalIterableProcessWindowFunction(new 
ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
[error]       ^
[error] two errors found
[error] (Test / compileIncremental) Compilation failed
[error] Total time: 30 s, completed Nov 13, 2024 5:30:02 PM

```

Any ideas on how can fix it, thanks in advance.

Btw, please ignore disclaimer message (if exist) located below.

Regards, Burak



YASAL UYARI:
Bu e-posta, ekleri de dahil olmak üzere, yalnızca gönderildiği kişi veya 
kuruluşa özel olup gizli bilgi ve kişisel veri içerebilir. Bu e-postanın 
tarafınıza gönderim amacı ile sınırlı ve orantılı olarak kullanılması yasal bir 
zorunluluktur. E-posta içeriğinin gönderim amacı dışında kullanılması, ifşa 
edilmesi, kopyalanması, arşivlenmesi veya dağıtımının yapılması yasaktır. 
Ayrıca, işbu e-posta içeriğinde yer alan gizli bilgi ve kişisel verilerin 
güvenliğinin sağlanması sorumluluğu tarafınıza ait olup herhangi bir ihlal 
halinde şirketimizin sorumluluğu bulunmamaktadır. İşbu e-postanın muhatabı 
olmamanıza rağmen size ulaşmış olması halinde, e-postayı derhal imha ederek bu 
durumu gecikmeksizin tarafımıza bildirmenizi rica ederiz.

LEGAL DISCLAIMER:
This email, including any attachments, may contain confidential information and 
personal data intended solely for the use of the person or entity to whom it is 
addressed. You are under a legal obligation to use this email and any 
attachments only for their intended purpose. Any disclosure, duplication, 
misuse, archiving, copying, distribution of this email by and to others is 
strictly prohibited. In addition, the responsibility for ensuring the security 
of confidential information and personal data contained in the content of this 
email lies with you, and our company is not responsible in case of any 
violation. If this email has reached you despite not being the intended 
recipient, please promptly delete the email and notify us without undue delay.

Reply via email to