Hi Till, >From the version I am using (1.12.0), getFieldNames is not available in Row ... See https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java .
Is there any workaround for this in version 1.12.0? Thanks. Best, Yik San On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <trohrm...@apache.org> wrote: > There is a method Row.getFieldNames. > > Cheers, > Till > > On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> Hi Till, >> >> I look inside the Row class, it does contain a member `private final >> Object[] fields;` though I wonder how to get column names out of the >> member? >> >> Thanks! >> >> Best, >> Yik San >> >> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Yik San, >>> >>> by converting the rows to a Tuple3 you effectively lose the information >>> about the column names. You could also call `toRetractStream[Row]` which >>> will give you a `DataStream[Row]` where you keep the column names. >>> >>> Cheers, >>> Till >>> >>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com> >>> wrote: >>> >>>> The question is cross-posted on Stack Overflow >>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>>> . >>>> >>>> I want to consume a Kafka topic into a table using Flink SQL, then >>>> convert it back to a DataStream. >>>> >>>> Here is the `SOURCE_DDL`: >>>> >>>> ``` >>>> CREATE TABLE kafka_source ( >>>> user_id BIGINT, >>>> datetime TIMESTAMP(3), >>>> last_5_clicks STRING >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>> 'properties.group.id' = 'test-group', >>>> 'format' = 'json' >>>> ) >>>> ``` >>>> >>>> With Flink, I execute the DDL. >>>> >>>> ```scala >>>> val settings = EnvironmentSettings.newInstance.build >>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>>> tableEnv.executeSql(SOURCE_DDL) >>>> val table = tableEnv.from("kafka_source") >>>> ``` >>>> >>>> Then, I convert it into DataStream, and do downstream logic in the >>>> `map(e => ...)` part. >>>> >>>> ```scala >>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>>> String)](table).map(e => ...) >>>> ``` >>>> >>>> Inside the `map(e => ...)` part, I would like to access the column >>>> name, in this case, `last_5_clicks`. Why? Because I may have different >>>> sources with different columns names (such as `last_10min_page_view`), but >>>> I would like to reuse the code in `map(e => ...)`. >>>> >>>> Is there a way to do this? Thanks. >>>> >>>> Best, >>>> Yik San >>>> >>>