Hi Till,

I look inside the Row class, it does contain a member `private final Object[]
fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Yik San,
>
> by converting the rows to a Tuple3 you effectively lose the information
> about the column names. You could also call `toRetractStream[Row]` which
> will give you a `DataStream[Row]` where you keep the column names.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com>
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>> .
>>
>> I want to consume a Kafka topic into a table using Flink SQL, then
>> convert it back to a DataStream.
>>
>> Here is the `SOURCE_DDL`:
>>
>> ```
>> CREATE TABLE kafka_source (
>>     user_id BIGINT,
>>     datetime TIMESTAMP(3),
>>     last_5_clicks STRING
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = 'aiinfra.fct.userfeature.0',
>>     'properties.bootstrap.servers' = 'localhost:9092',
>>     'properties.group.id' = 'test-group',
>>     'format' = 'json'
>> )
>> ```
>>
>> With Flink, I execute the DDL.
>>
>> ```scala
>> val settings = EnvironmentSettings.newInstance.build
>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>> tableEnv.executeSql(SOURCE_DDL)
>> val table = tableEnv.from("kafka_source")
>> ```
>>
>> Then, I convert it into DataStream, and do downstream logic in the `map(e
>> => ...)` part.
>>
>> ```scala
>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
>> => ...)
>> ```
>>
>> Inside the `map(e => ...)` part, I would like to access the column name,
>> in this case, `last_5_clicks`. Why? Because I may have different sources
>> with different columns names (such as `last_10min_page_view`), but I would
>> like to reuse the code in `map(e => ...)`.
>>
>> Is there a way to do this? Thanks.
>>
>> Best,
>> Yik San
>>
>

Reply via email to