Re: Using SQL with dynamic tables where rows are updated

2018-01-10 Thread Ghassan Yammine
Hello, No, I did not use Hequn’s work. I already had an implementation the pre-processed the stream before feeding it nto the (now modified) SQL expression. Basically the SQL, instead of computing the AVG, just computes the COUNT and SUM. I do the average computation on the resulting stream.

Re: Using SQL with dynamic tables where rows are updated

2018-01-10 Thread yinhua
Hi Ghassan, I have the same issue with you, not sure how you solve your problem now? Using Hequn's work around? -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: Using SQL with dynamic tables where rows are updated

2018-01-05 Thread Ghassan Yammine
Thanks very much! Regards, Ghassan > On Jan 5, 2018, at 7:55 AM, Hequn Cheng wrote: > > hi Ghassan, > > TableSource in Flink doesn't support primary key now, but you can achieve > it by doing a group by manually. > > Such as: > val resultTable = tableEnv.sql( > " > SELECT Prod

Re: Using SQL with dynamic tables where rows are updated

2018-01-05 Thread Fabian Hueske
Also, if you expect your the key update records to be out of order, you might want to add a ProcessFunction on a keyed stream that filters records with smaller timestamps than the highest observed timestamp. This would prevent a record to be overridden by an earlier version with a smaller timestamp

Re: Using SQL with dynamic tables where rows are updated

2018-01-05 Thread Fabian Hueske
Ha, that's a neat workaround! Thanks for sharing Hequn! When doing this, you should however, ensure that all records with the same key arrive from the same input task to avoid inconsistent behavior due to records arriving out of order. This would be the case if you ingest the table directly from a

Re: Using SQL with dynamic tables where rows are updated

2018-01-05 Thread Hequn Cheng
hi Ghassan, TableSource in Flink doesn't support primary key now, but you can achieve it by doing a group by manually. Such as: val resultTable = tableEnv.sql( " SELECT ProductID, AVG(Rating) FROM ( SELECT ReviewID, LAST_VALUE(ProductID), LAST_VALUE(Approved) , L

Re: Using SQL with dynamic tables where rows are updated

2018-01-05 Thread Fabian Hueske
Hi Ghassan, Flink's Table API / SQL does not support the upsert ingestion mode (updating table rows by key) yet but only append mode, i.e, each event of the data stream is appended to the table. Hence, it is not possible to implement your use case using SQL. An upsert ingestion mode will be added

Re: Using SQL with dynamic tables where rows are updated

2017-12-20 Thread Ghassan Yammine
Hi Timo, Thanks for your quick reply. I understand your statement about the SQL expression: It is grouped by ProductID and performs the average computation over all the records it sees. There is no concept of primary key and INSERT/UPDATE. Is there a way to do what I want using SQL without,

Re: Using SQL with dynamic tables where rows are updated

2017-12-20 Thread Timo Walther
Hi Ghassan, in your example the result 3.5 is correct. The query is executed with standard SQL semantics. You only group by ProductID and since it is the same for all elements, the average is 3.5. The second "review-3" does not replace anything. In general, the replacement would happen in th

Using SQL with dynamic tables where rows are updated

2017-12-19 Thread Ghassan Yammine
Hello, I’m knew to Flink and I need some help. I’d like to use the SQL API for processing an incoming stream that has the following characteristics: * Each stream record has a key * The record can be updated * The record is of the form: reviewId -> (productId, rating) For the above