There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com>
wrote:

> Hi Till,
>
> I look inside the Row class, it does contain a member `private final
> Object[] fields;` though I wonder how to get column names out of the
> member?
>
> Thanks!
>
> Best,
> Yik San
>
> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Yik San,
>>
>> by converting the rows to a Tuple3 you effectively lose the information
>> about the column names. You could also call `toRetractStream[Row]` which
>> will give you a `DataStream[Row]` where you keep the column names.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com>
>> wrote:
>>
>>> The question is cross-posted on Stack Overflow
>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>> .
>>>
>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>> convert it back to a DataStream.
>>>
>>> Here is the `SOURCE_DDL`:
>>>
>>> ```
>>> CREATE TABLE kafka_source (
>>>     user_id BIGINT,
>>>     datetime TIMESTAMP(3),
>>>     last_5_clicks STRING
>>> ) WITH (
>>>     'connector' = 'kafka',
>>>     'topic' = 'aiinfra.fct.userfeature.0',
>>>     'properties.bootstrap.servers' = 'localhost:9092',
>>>     'properties.group.id' = 'test-group',
>>>     'format' = 'json'
>>> )
>>> ```
>>>
>>> With Flink, I execute the DDL.
>>>
>>> ```scala
>>> val settings = EnvironmentSettings.newInstance.build
>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>> tableEnv.executeSql(SOURCE_DDL)
>>> val table = tableEnv.from("kafka_source")
>>> ```
>>>
>>> Then, I convert it into DataStream, and do downstream logic in the
>>> `map(e => ...)` part.
>>>
>>> ```scala
>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>> String)](table).map(e => ...)
>>> ```
>>>
>>> Inside the `map(e => ...)` part, I would like to access the column name,
>>> in this case, `last_5_clicks`. Why? Because I may have different sources
>>> with different columns names (such as `last_10min_page_view`), but I would
>>> like to reuse the code in `map(e => ...)`.
>>>
>>> Is there a way to do this? Thanks.
>>>
>>> Best,
>>> Yik San
>>>
>>

Reply via email to