Hello,
No, I did not use Hequn’s work. I already had an implementation the
pre-processed the stream before feeding it nto the (now modified) SQL
expression.
Basically the SQL, instead of computing the AVG, just computes the COUNT and
SUM. I do the average computation on the resulting stream.
Hi Ghassan,
I have the same issue with you, not sure how you solve your problem now?
Using Hequn's work around?
--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Thanks very much!
Regards,
Ghassan
> On Jan 5, 2018, at 7:55 AM, Hequn Cheng wrote:
>
> hi Ghassan,
>
> TableSource in Flink doesn't support primary key now, but you can achieve
> it by doing a group by manually.
>
> Such as:
> val resultTable = tableEnv.sql(
> "
> SELECT Prod
Also, if you expect your the key update records to be out of order, you
might want to add a ProcessFunction on a keyed stream that filters records
with smaller timestamps than the highest observed timestamp.
This would prevent a record to be overridden by an earlier version with a
smaller timestamp
Ha, that's a neat workaround! Thanks for sharing Hequn!
When doing this, you should however, ensure that all records with the same
key arrive from the same input task to avoid inconsistent behavior due to
records arriving out of order.
This would be the case if you ingest the table directly from a
hi Ghassan,
TableSource in Flink doesn't support primary key now, but you can achieve
it by doing a group by manually.
Such as:
val resultTable = tableEnv.sql(
"
SELECT ProductID, AVG(Rating) FROM
( SELECT ReviewID, LAST_VALUE(ProductID),
LAST_VALUE(Approved) , L
Hi Ghassan,
Flink's Table API / SQL does not support the upsert ingestion mode
(updating table rows by key) yet but only append mode, i.e, each event of
the data stream is appended to the table.
Hence, it is not possible to implement your use case using SQL.
An upsert ingestion mode will be added
Hi Timo,
Thanks for your quick reply.
I understand your statement about the SQL expression: It is grouped by
ProductID and performs the average computation over all the records it sees.
There is no concept of primary key and INSERT/UPDATE.
Is there a way to do what I want using SQL without,
Hi Ghassan,
in your example the result 3.5 is correct. The query is executed with
standard SQL semantics. You only group by ProductID and since it is the
same for all elements, the average is 3.5.
The second "review-3" does not replace anything. In general, the
replacement would happen in th
Hello,
I’m knew to Flink and I need some help. I’d like to use the SQL API for
processing an incoming stream that has the following characteristics:
* Each stream record has a key
* The record can be updated
* The record is of the form: reviewId -> (productId, rating)
For the above
10 matches
Mail list logo