Hi Oleksii,
it also can be related to the problem, that you have two different 
classloaders, because Flink creates a separate classloader for every running 
pipeline.
It is especially related when you run the pipeline in Session Mode.

Not sure whether it helps or not, but you can try the next configuration option:

classloader.parent-first-patterns.additional: "pemja.core;"


On Mon, 2025-01-13 at 11:30 +0000, Nikola Milutinovic wrote:
Hi Oleksii.

The core error is (as you have seen):Caused by: java.lang.ClassCastException: 
pemja.core.object.PyIterator cannot be cast to pemja.core.object.PyIterator

Now, since the name of the class is the same, the only way a cast can fail is 
if the versions of that class are different. How is that possible, you will 
need to investigate.

Nix.

From:Oleksii Sh <olex...@outlook.com>
Date: Friday, January 10, 2025 at 11:26 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Using data classes in pyflink
Hi,

I'm trying to understand how to use python classes in flink DataStream 
pipelines. I'm using python 3.11 and flink 1.19.
I've tried running a few simple programs and require some guidance.

Here's the first example:


from dataclasses import dataclass

from pyflink.common import Configuration
from pyflink.datastream import StreamExecutionEnvironment

# from page import Page # importing the same class defined in a separate module 
works

# defining the class in the same file as the stream fails
@dataclass
class Page:
    text: str
    number: int


if __name__ == '__main__':
    config = Configuration()

    # fails  with this line commented or uncommented if Page is defined in this 
file
    # config.set_string("python.execution-mode", "thread")
    env = StreamExecutionEnvironment.get_execution_environment(config)

    # write all the data to one file
    env.set_parallelism(1)

    data_stream = env.from_collection([
                                          Page(p[1], p[0]) for p in
                                          [(1, 'hi'), (2, 'hello'), (3, 'hi'), 
(4, 'hello'), (5, 'hi'), (6, 'hello'),
                                           (6, 'hello')]] * 10000
                                      )

    ds = data_stream.map(lambda p: Page(p.text, p.number + 1))

    ds.print()

    env.execute()

This consistently fails in both process and thread execution modes.
To make it work I need to define the same Page class in a separate module and 
import it, then it works in both cases.


Here's a second example, involving a tumbling count window:


from typing import Iterable

from pyflink.common import Configuration, Types
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
from pyflink.datastream.window import CountWindow

from page import Page


class SumWindowFunction(WindowFunction[Page, Page, str, CountWindow]):
    def apply(self, key: str, window: CountWindow, inputs: Iterable[Page]):
        result = Page('', 0)
        for i in inputs:
            result.text += i.text
            result.number += i.number
        return [result]


if __name__ == '__main__':
    config = Configuration()
    config.set_string("python.execution-mode", "thread")
    env = StreamExecutionEnvironment.get_execution_environment(config)

    # write all the data to one file
    env.set_parallelism(1)

    # define the source
    data_stream = env.from_collection(
        [Page(p[1], p[0]) for p in [
            (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')]] * 10000
    )

    ds = data_stream.key_by(lambda x: x.text, key_type=Types.STRING()) \
    .count_window(2) \
    .apply(SumWindowFunction())

    ds.print()

    # submit for execution
    env.execute()

This works in process mode, but fails in thread mode:
Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be 
cast to pemja.core.object.PyIterator
      at 
org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.processElement(AbstractOneInputEmbeddedPythonFunctionOperator.java:156)
      at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
      at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
      at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
      at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
      at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
      at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
      at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100)
      at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
      at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
      at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)


I'd appreciate if you could point out what I'm doing wrong, or direct me 
towards documentation that would explain how to do this properly.

Best regards,
Olex

Reply via email to