Re: Temporal join on rolling aggregate

2024-03-26 Thread Matthias Broecheler
Hey Sebastien et al, have you tried rewriting the rolling aggregate as a window-over query? A window-over aggregation creates an append-only stream which should preserve the timestamp/watermark of the source. You can then add a deduplication

Preserve rowtime through join

2022-10-10 Thread Matthias Broecheler
Hey Flinksters, I was wondering if you had any ideas for how to preserve the rowtime across an INNER equi join so that the output can be used in a temporal join. I've attached an example based on the TemporalJoinTest where I'm creating two views by deduplicating underlying streams (to rates_pk an

Re: How to flatten ARRAY in Table API

2022-02-20 Thread Matthias Broecheler
stomerid, productid, quantity, ... > FROM > orders > CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price, > discount); > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion > > Regards, > Da

How to flatten ARRAY in Table API

2022-02-19 Thread Matthias Broecheler
Hey Flinksters, I'm reading a nested JSON object into a table and would like to access the nested rows inside an array. Is there a way to flatten them so that I get a table with the nested rows? So far, I've only been able to figure out how to access a specific element inside the array using the

Re: DataStream to Table API

2021-08-23 Thread Matthias Broecheler
Row.of("Name"+i, i)); -> > > DataStream rows = integers.map(i -> Row.of("Name"+i, i), new > RowTypeInfo(Types.STRING, Types.INT)); > > Best, > JING ZHANG > > > > Matthias Broecheler 于2021年8月21日周六 上午12:40写道: > >> Thank you, Caizhi, for lookin

Re: DataStream to Table API

2021-08-20 Thread Matthias Broecheler
tor#getMapReturnTypes are not dealing with row >> types (see that method and also TypeExtractor#privateGetForClass). You >> might want to open a JIRA ticket for this. >> >> Matthias Broecheler 于2021年8月20日周五 上午7:01写道: >> >>> Hey Flinkers, >>> >>

DataStream to Table API

2021-08-19 Thread Matthias Broecheler
Hey Flinkers, I am trying to follow the docs to convert a DataStream to a Table. Specifically, I have a DataStream of Row and want the columns of the row to become the columns of the resulting table. Tha

Re: Periodic output at end of stream

2021-08-18 Thread Matthias Broecheler
t; troubleshooting. > > Best wishes, > JING ZHANG > > Matthias Broecheler 于2021年8月14日周六 上午3:44写道: > >> Hey guys, >> >> I have a KeyedProcessFunction that gathers statistics on the events that >> flow through and emits it periodically (every few seconds) to a

Periodic output at end of stream

2021-08-13 Thread Matthias Broecheler
Hey guys, I have a KeyedProcessFunction that gathers statistics on the events that flow through and emits it periodically (every few seconds) to a SideOutput. However, at the end of stream the last set of statistics don't get emitted. I read on the mailing list that processing time timers that are

Re: StreamFileSink not closing file

2021-08-13 Thread Matthias Broecheler
nded dataset with 2pc sink. However, > we are expected to fix this issue in the upcoming 1.14 version [1]. > > Best, > Yun > > > [1] https://issues.apache.org/jira/browse/FLINK-2491 > > --Original Mail -- > *Sender:*Matthias Br

StreamFileSink not closing file

2021-08-06 Thread Matthias Broecheler
Hey guys, I wrote a simple DataStream that counts up some numbers into a SideOutput which I am trying to sink into a StreamFileSink so that I can write the results to disk and read them from there. I'm running my little test locally and I can see that the data is being written to hidden "inproress"