Re problem 1: Could you share an example which could be run by others? It depends on `dataclass` which seems like a private class.
Re problem 2: Seems like a classloader issue. Could you share how to reproduce it? Since you could run the first example in thread mode, it seems only happening in certain cases? Regards, Dian On Mon, Jan 13, 2025 at 7:30 PM Nikola Milutinovic <n.milutino...@levi9.com> wrote: > > Hi Oleksii. > > > > The core error is (as you have seen): Caused by: > java.lang.ClassCastException: pemja.core.object.PyIterator cannot be cast to > pemja.core.object.PyIterator > > > > Now, since the name of the class is the same, the only way a cast can fail is > if the versions of that class are different. How is that possible, you will > need to investigate. > > > > Nix. > > > > From: Oleksii Sh <olex...@outlook.com> > Date: Friday, January 10, 2025 at 11:26 PM > To: user@flink.apache.org <user@flink.apache.org> > Subject: Using data classes in pyflink > > Hi, > > > > I'm trying to understand how to use python classes in flink DataStream > pipelines. I'm using python 3.11 and flink 1.19. > > I've tried running a few simple programs and require some guidance. > > > > Here's the first example: > > > > from dataclasses import dataclass > > from pyflink.common import Configuration > from pyflink.datastream import StreamExecutionEnvironment > > # from page import Page # importing the same class defined in a separate > module works > > # defining the class in the same file as the stream fails > @dataclass > class Page: > text: str > number: int > > > if __name__ == '__main__': > config = Configuration() > > # fails with this line commented or uncommented if Page is defined in > this file > # config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > > # write all the data to one file > env.set_parallelism(1) > > data_stream = env.from_collection([ > Page(p[1], p[0]) for p in > [(1, 'hi'), (2, 'hello'), (3, > 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), > (6, 'hello')]] * 10000 > ) > > ds = data_stream.map(lambda p: Page(p.text, p.number + 1)) > > ds.print() > > env.execute() > > > > This consistently fails in both process and thread execution modes. > > To make it work I need to define the same Page class in a separate module and > import it, then it works in both cases. > > > > > > Here's a second example, involving a tumbling count window: > > > > from typing import Iterable > > from pyflink.common import Configuration, Types > from pyflink.datastream import StreamExecutionEnvironment, WindowFunction > from pyflink.datastream.window import CountWindow > > from page import Page > > > class SumWindowFunction(WindowFunction[Page, Page, str, CountWindow]): > def apply(self, key: str, window: CountWindow, inputs: Iterable[Page]): > result = Page('', 0) > for i in inputs: > result.text += i.text > result.number += i.number > return [result] > > > if __name__ == '__main__': > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > > # write all the data to one file > env.set_parallelism(1) > > # define the source > data_stream = env.from_collection( > [Page(p[1], p[0]) for p in [ > (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, > 'hello'), (6, 'hello')]] * 10000 > ) > > ds = data_stream.key_by(lambda x: x.text, key_type=Types.STRING()) \ > .count_window(2) \ > .apply(SumWindowFunction()) > > ds.print() > > # submit for execution > env.execute() > > > > This works in process mode, but fails in thread mode: > > Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot > be cast to pemja.core.object.PyIterator > > at > org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.processElement(AbstractOneInputEmbeddedPythonFunctionOperator.java:156) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110) > > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338) > > > > > > I'd appreciate if you could point out what I'm doing wrong, or direct me > towards documentation that would explain how to do this properly. > > > > Best regards, > > Olex