Hi Oleksii. The core error is (as you have seen): Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be cast to pemja.core.object.PyIterator
Now, since the name of the class is the same, the only way a cast can fail is if the versions of that class are different. How is that possible, you will need to investigate. Nix. From: Oleksii Sh <olex...@outlook.com> Date: Friday, January 10, 2025 at 11:26 PM To: user@flink.apache.org <user@flink.apache.org> Subject: Using data classes in pyflink Hi, I'm trying to understand how to use python classes in flink DataStream pipelines. I'm using python 3.11 and flink 1.19. I've tried running a few simple programs and require some guidance. Here's the first example: from dataclasses import dataclass from pyflink.common import Configuration from pyflink.datastream import StreamExecutionEnvironment # from page import Page # importing the same class defined in a separate module works # defining the class in the same file as the stream fails @dataclass class Page: text: str number: int if __name__ == '__main__': config = Configuration() # fails with this line commented or uncommented if Page is defined in this file # config.set_string("python.execution-mode", "thread") env = StreamExecutionEnvironment.get_execution_environment(config) # write all the data to one file env.set_parallelism(1) data_stream = env.from_collection([ Page(p[1], p[0]) for p in [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')]] * 10000 ) ds = data_stream.map(lambda p: Page(p.text, p.number + 1)) ds.print() env.execute() This consistently fails in both process and thread execution modes. To make it work I need to define the same Page class in a separate module and import it, then it works in both cases. Here's a second example, involving a tumbling count window: from typing import Iterable from pyflink.common import Configuration, Types from pyflink.datastream import StreamExecutionEnvironment, WindowFunction from pyflink.datastream.window import CountWindow from page import Page class SumWindowFunction(WindowFunction[Page, Page, str, CountWindow]): def apply(self, key: str, window: CountWindow, inputs: Iterable[Page]): result = Page('', 0) for i in inputs: result.text += i.text result.number += i.number return [result] if __name__ == '__main__': config = Configuration() config.set_string("python.execution-mode", "thread") env = StreamExecutionEnvironment.get_execution_environment(config) # write all the data to one file env.set_parallelism(1) # define the source data_stream = env.from_collection( [Page(p[1], p[0]) for p in [ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')]] * 10000 ) ds = data_stream.key_by(lambda x: x.text, key_type=Types.STRING()) \ .count_window(2) \ .apply(SumWindowFunction()) ds.print() # submit for execution env.execute() This works in process mode, but fails in thread mode: Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be cast to pemja.core.object.PyIterator at org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.processElement(AbstractOneInputEmbeddedPythonFunctionOperator.java:156) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520) at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338) I'd appreciate if you could point out what I'm doing wrong, or direct me towards documentation that would explain how to do this properly. Best regards, Olex