There is a method Row.getFieldNames. Cheers, Till
On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com> wrote: > Hi Till, > > I look inside the Row class, it does contain a member `private final > Object[] fields;` though I wonder how to get column names out of the > member? > > Thanks! > > Best, > Yik San > > On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Yik San, >> >> by converting the rows to a Tuple3 you effectively lose the information >> about the column names. You could also call `toRetractStream[Row]` which >> will give you a `DataStream[Row]` where you keep the column names. >> >> Cheers, >> Till >> >> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com> >> wrote: >> >>> The question is cross-posted on Stack Overflow >>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>> . >>> >>> I want to consume a Kafka topic into a table using Flink SQL, then >>> convert it back to a DataStream. >>> >>> Here is the `SOURCE_DDL`: >>> >>> ``` >>> CREATE TABLE kafka_source ( >>> user_id BIGINT, >>> datetime TIMESTAMP(3), >>> last_5_clicks STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'aiinfra.fct.userfeature.0', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'test-group', >>> 'format' = 'json' >>> ) >>> ``` >>> >>> With Flink, I execute the DDL. >>> >>> ```scala >>> val settings = EnvironmentSettings.newInstance.build >>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>> tableEnv.executeSql(SOURCE_DDL) >>> val table = tableEnv.from("kafka_source") >>> ``` >>> >>> Then, I convert it into DataStream, and do downstream logic in the >>> `map(e => ...)` part. >>> >>> ```scala >>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>> String)](table).map(e => ...) >>> ``` >>> >>> Inside the `map(e => ...)` part, I would like to access the column name, >>> in this case, `last_5_clicks`. Why? Because I may have different sources >>> with different columns names (such as `last_10min_page_view`), but I would >>> like to reuse the code in `map(e => ...)`. >>> >>> Is there a way to do this? Thanks. >>> >>> Best, >>> Yik San >>> >>