Wouter Zorgdrager created FLINK-35290: -----------------------------------------
Summary: Wrong Instant type conversion TableAPI to Datastream in thread mode Key: FLINK-35290 URL: https://issues.apache.org/jira/browse/FLINK-35290 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.18.1 Reporter: Wouter Zorgdrager In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? Nevertheless, if we switch from `process` to `thread` mode for execution, the `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if I only use the DatastreamAPI and read `Types.Instant()` directly, the conversion in both `thread` and `process` mode seem to work just fine. Below a minimal example exposing the bug: ``` EXECUTION_MODE = "thread" # or "process" config = Configuration() config.set_string("python.execution-mode", EXECUTION_MODE) env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.get_config().set("parallelism.default", "1") t_env.get_config().set("python.fn-execution.bundle.size", "1") t_env.get_config().set("python.execution-mode", EXECUTION_MODE) def to_epoch_ms(row: Row): print(type(row[1])) return row[1].to_epoch_milli() t_env.to_data_stream( t_env.from_elements( [ (1, datetime(year=2024, day=10, month=9, hour=9)), (2, datetime(year=2024, day=10, month=9, hour=12)), (3, datetime(year=2024, day=22, month=11, hour=12)), ], DataTypes.ROW( [ DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), ] ), ) ).map(to_epoch_ms, output_type=Types.LONG()).print() env.execute() ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)