Hi Oleksii.

The core error is (as you have seen): Caused by: java.lang.ClassCastException: 
pemja.core.object.PyIterator cannot be cast to pemja.core.object.PyIterator

Now, since the name of the class is the same, the only way a cast can fail is 
if the versions of that class are different. How is that possible, you will 
need to investigate.

Nix.

From: Oleksii Sh <olex...@outlook.com>
Date: Friday, January 10, 2025 at 11:26 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Using data classes in pyflink
Hi,

I'm trying to understand how to use python classes in flink DataStream 
pipelines. I'm using python 3.11 and flink 1.19.
I've tried running a few simple programs and require some guidance.

Here's the first example:


from dataclasses import dataclass

from pyflink.common import Configuration
from pyflink.datastream import StreamExecutionEnvironment

# from page import Page # importing the same class defined in a separate module 
works

# defining the class in the same file as the stream fails
@dataclass
class Page:
    text: str
    number: int


if __name__ == '__main__':
    config = Configuration()

    # fails  with this line commented or uncommented if Page is defined in this 
file
    # config.set_string("python.execution-mode", "thread")
    env = StreamExecutionEnvironment.get_execution_environment(config)

    # write all the data to one file
    env.set_parallelism(1)

    data_stream = env.from_collection([
                                          Page(p[1], p[0]) for p in
                                          [(1, 'hi'), (2, 'hello'), (3, 'hi'), 
(4, 'hello'), (5, 'hi'), (6, 'hello'),
                                           (6, 'hello')]] * 10000
                                      )

    ds = data_stream.map(lambda p: Page(p.text, p.number + 1))

    ds.print()

    env.execute()

This consistently fails in both process and thread execution modes.
To make it work I need to define the same Page class in a separate module and 
import it, then it works in both cases.


Here's a second example, involving a tumbling count window:


from typing import Iterable

from pyflink.common import Configuration, Types
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
from pyflink.datastream.window import CountWindow

from page import Page


class SumWindowFunction(WindowFunction[Page, Page, str, CountWindow]):
    def apply(self, key: str, window: CountWindow, inputs: Iterable[Page]):
        result = Page('', 0)
        for i in inputs:
            result.text += i.text
            result.number += i.number
        return [result]


if __name__ == '__main__':
    config = Configuration()
    config.set_string("python.execution-mode", "thread")
    env = StreamExecutionEnvironment.get_execution_environment(config)

    # write all the data to one file
    env.set_parallelism(1)

    # define the source
    data_stream = env.from_collection(
        [Page(p[1], p[0]) for p in [
            (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')]] * 10000
    )

    ds = data_stream.key_by(lambda x: x.text, key_type=Types.STRING()) \
    .count_window(2) \
    .apply(SumWindowFunction())

    ds.print()

    # submit for execution
    env.execute()

This works in process mode, but fails in thread mode:
Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be 
cast to pemja.core.object.PyIterator
      at 
org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.processElement(AbstractOneInputEmbeddedPythonFunctionOperator.java:156)
      at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
      at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
      at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
      at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
      at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
      at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
      at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100)
      at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
      at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
      at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)


I'd appreciate if you could point out what I'm doing wrong, or direct me 
towards documentation that would explain how to do this properly.

Best regards,
Olex

Reply via email to