Hi Oleksii, it also can be related to the problem, that you have two different classloaders, because Flink creates a separate classloader for every running pipeline. It is especially related when you run the pipeline in Session Mode.
Not sure whether it helps or not, but you can try the next configuration option: classloader.parent-first-patterns.additional: "pemja.core;" On Mon, 2025-01-13 at 11:30 +0000, Nikola Milutinovic wrote: Hi Oleksii. The core error is (as you have seen):Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be cast to pemja.core.object.PyIterator Now, since the name of the class is the same, the only way a cast can fail is if the versions of that class are different. How is that possible, you will need to investigate. Nix. From:Oleksii Sh <olex...@outlook.com> Date: Friday, January 10, 2025 at 11:26 PM To: user@flink.apache.org <user@flink.apache.org> Subject: Using data classes in pyflink Hi, I'm trying to understand how to use python classes in flink DataStream pipelines. I'm using python 3.11 and flink 1.19. I've tried running a few simple programs and require some guidance. Here's the first example: from dataclasses import dataclass from pyflink.common import Configuration from pyflink.datastream import StreamExecutionEnvironment # from page import Page # importing the same class defined in a separate module works # defining the class in the same file as the stream fails @dataclass class Page: text: str number: int if __name__ == '__main__': config = Configuration() # fails with this line commented or uncommented if Page is defined in this file # config.set_string("python.execution-mode", "thread") env = StreamExecutionEnvironment.get_execution_environment(config) # write all the data to one file env.set_parallelism(1) data_stream = env.from_collection([ Page(p[1], p[0]) for p in [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')]] * 10000 ) ds = data_stream.map(lambda p: Page(p.text, p.number + 1)) ds.print() env.execute() This consistently fails in both process and thread execution modes. To make it work I need to define the same Page class in a separate module and import it, then it works in both cases. Here's a second example, involving a tumbling count window: from typing import Iterable from pyflink.common import Configuration, Types from pyflink.datastream import StreamExecutionEnvironment, WindowFunction from pyflink.datastream.window import CountWindow from page import Page class SumWindowFunction(WindowFunction[Page, Page, str, CountWindow]): def apply(self, key: str, window: CountWindow, inputs: Iterable[Page]): result = Page('', 0) for i in inputs: result.text += i.text result.number += i.number return [result] if __name__ == '__main__': config = Configuration() config.set_string("python.execution-mode", "thread") env = StreamExecutionEnvironment.get_execution_environment(config) # write all the data to one file env.set_parallelism(1) # define the source data_stream = env.from_collection( [Page(p[1], p[0]) for p in [ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')]] * 10000 ) ds = data_stream.key_by(lambda x: x.text, key_type=Types.STRING()) \ .count_window(2) \ .apply(SumWindowFunction()) ds.print() # submit for execution env.execute() This works in process mode, but fails in thread mode: Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be cast to pemja.core.object.PyIterator at org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.processElement(AbstractOneInputEmbeddedPythonFunctionOperator.java:156) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520) at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338) I'd appreciate if you could point out what I'm doing wrong, or direct me towards documentation that would explain how to do this properly. Best regards, Olex