Hi,
I have a very basic window function that finds minimum and maximum values on a
keyed stream.
```
package org.example
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
class ScalaSessionProcessWindowFunction extends ProcessWindowFunction[(String,
Long), (String, Long, Long), String, TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String,
Long)], out: Collector[(String, Long, Long)]): Unit = {
val maxId = elements.maxBy(_._2)._2
val minId = elements.minBy(_._2)._2
out.collect((key, minId, maxId))
}
}
```
Firstly, by using JUnit, I have written a test case in Java, and It worked fine:
```
package org.example;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper;
import
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
import scala.Tuple2;
import scala.Tuple3;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ScalaSessionProcessWindowFunctionJavaTest {
@Test
public void testWindow() throws Exception {
StateDescriptor<ListState<Tuple2<String, Long>>, List<Tuple2<String, Long>>>
stateDesc = new ListStateDescriptor<>(
"window-contents",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
}).createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Long>, Iterable<Tuple2<String, Long>>,
Tuple3<String, Long, Long>, TimeWindow> operator =
new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.milliseconds(2000)),
new TimeWindow.Serializer(),
(input) -> input._1,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableProcessWindowFunction<>(new
ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
EventTimeTrigger.create(),
0,
null
);
OneInputStreamOperatorTestHarness<Tuple2<String, Long>, Tuple3<String, Long,
Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator,
(input) -> input._1, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1L), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2L), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3L), 25));
testHarness.processWatermark(3000);
testHarness.setProcessingTime(3000);
List<Tuple3<String, Long, Long>> outputValues =
testHarness.extractOutputValues();
assertEquals(new Tuple3("key1", 1L, 3L), outputValues.get(0));
}
}
```
Then because we write our Flink projects in Scala, I’ve tried to convert above
test case to Scala:
```
package org.example
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint,
TypeInformation}
import
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper
import
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
import
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
class ScalaSessionProcessWindowFunctionTest extends AnyFlatSpec with Matchers {
"ScalaSessionProcessWindowFunction" should "testWindow" in {
val stateDesc = new ListStateDescriptor[(String, Long)]("window-contents",
TypeInformation.of(new TypeHint[(String, Long)] {}).createSerializer(new
ExecutionConfig))
val operator = new WindowOperator[String, (String, Long), Iterable[(String,
Long)], (String, Long, Long), TimeWindow](
EventTimeSessionWindows.withGap(Time.milliseconds(2000)),
new TimeWindow.Serializer,
_._1,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig),
stateDesc,
new InternalIterableProcessWindowFunction(new
ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
EventTimeTrigger.create(),
0,
null
)
val testHarness = new KeyedOneInputStreamOperatorTestHarness[String, (String,
Long), (String, Long, Long)](operator, _._1, BasicTypeInfo.STRING_TYPE_INFO)
testHarness.open()
testHarness.processElement(new StreamRecord[(String, Long)](("key1", 1L), 0))
testHarness.processElement(new StreamRecord[(String, Long)](("key1", 2L), 10))
testHarness.processElement(new StreamRecord[(String, Long)](("key1", 3L), 25))
testHarness.processWatermark(3000)
testHarness.setProcessingTime(3000)
val outputValues = testHarness.extractOutputValues().toArray
outputValues(0) shouldBe("key1", 1L, 3L)
}
}
```
However the Scala test does not compile:
```
[error]
/Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:28:7:
type mismatch;
[error] found :
org.apache.flink.api.common.state.ListStateDescriptor[(String, Long)]
[error] required: org.apache.flink.api.common.state.StateDescriptor[_ <:
org.apache.flink.api.common.state.AppendingState[(String,
Long),Iterable[(String, Long)]], _]
[error] stateDesc,
[error] ^
[error]
/Users/burak.dursunlar/projects/flink-playground/src/test/scala/org/example/ScalaSessionProcessWindowFunctionTest.scala:29:7:
type mismatch;
[error] found :
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction[(String,
Long),(String, Long,
Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
[error] required:
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction[Iterable[(String,
Long)],(String, Long,
Long),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
[error] new InternalIterableProcessWindowFunction(new
ScalaProcessWindowFunctionWrapper(new ScalaSessionProcessWindowFunction())),
[error] ^
[error] two errors found
[error] (Test / compileIncremental) Compilation failed
[error] Total time: 30 s, completed Nov 13, 2024 5:30:02 PM
```
Any ideas on how can fix it, thanks in advance.
Btw, please ignore disclaimer message (if exist) located below.
Regards, Burak
YASAL UYARI:
Bu e-posta, ekleri de dahil olmak üzere, yalnızca gönderildiği kişi veya
kuruluşa özel olup gizli bilgi ve kişisel veri içerebilir. Bu e-postanın
tarafınıza gönderim amacı ile sınırlı ve orantılı olarak kullanılması yasal bir
zorunluluktur. E-posta içeriğinin gönderim amacı dışında kullanılması, ifşa
edilmesi, kopyalanması, arşivlenmesi veya dağıtımının yapılması yasaktır.
Ayrıca, işbu e-posta içeriğinde yer alan gizli bilgi ve kişisel verilerin
güvenliğinin sağlanması sorumluluğu tarafınıza ait olup herhangi bir ihlal
halinde şirketimizin sorumluluğu bulunmamaktadır. İşbu e-postanın muhatabı
olmamanıza rağmen size ulaşmış olması halinde, e-postayı derhal imha ederek bu
durumu gecikmeksizin tarafımıza bildirmenizi rica ederiz.
LEGAL DISCLAIMER:
This email, including any attachments, may contain confidential information and
personal data intended solely for the use of the person or entity to whom it is
addressed. You are under a legal obligation to use this email and any
attachments only for their intended purpose. Any disclosure, duplication,
misuse, archiving, copying, distribution of this email by and to others is
strictly prohibited. In addition, the responsibility for ensuring the security
of confidential information and personal data contained in the content of this
email lies with you, and our company is not responsible in case of any
violation. If this email has reached you despite not being the intended
recipient, please promptly delete the email and notify us without undue delay.