[ https://issues.apache.org/jira/browse/FLINK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wouter Zorgdrager closed FLINK-35290. ------------------------------------- Resolution: Duplicate > Wrong Instant type conversion TableAPI to Datastream in thread mode > ------------------------------------------------------------------- > > Key: FLINK-35290 > URL: https://issues.apache.org/jira/browse/FLINK-35290 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.18.1 > Reporter: Wouter Zorgdrager > Priority: Major > > In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a > Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm > wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` > maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? > Nevertheless, if we switch from `process` to `thread` mode for execution, the > `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a > `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if > I only use the DatastreamAPI and read `Types.Instant()` directly, the > conversion in both `thread` and `process` mode seem to work just fine. > Below a minimal example exposing the bug: > ``` > EXECUTION_MODE = "thread" # or "process" > config = Configuration() > config.set_string("python.execution-mode", EXECUTION_MODE) > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > t_env.get_config().set("parallelism.default", "1") > t_env.get_config().set("python.fn-execution.bundle.size", "1") > t_env.get_config().set("python.execution-mode", EXECUTION_MODE) > def to_epoch_ms(row: Row): > print(type(row[1])) > return row[1].to_epoch_milli() > t_env.to_data_stream( > t_env.from_elements( > [ > (1, datetime(year=2024, day=10, month=9, hour=9)), > (2, datetime(year=2024, day=10, month=9, hour=12)), > (3, datetime(year=2024, day=22, month=11, hour=12)), > ], > DataTypes.ROW( > [ > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), > ] > ), > ) > ).map(to_epoch_ms, output_type=Types.LONG()).print() > env.execute() > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)