Hi, *> Could you give an example that the query has a unique key?*
Consider the following sql: SELECT a, SUM(b) as d > FROM Orders > GROUP BY a The result table contains unique key of a. A document about Streaming Concepts[1] may be helpful for you. *> What is the mechanism flink infer which field is the unique key(s)?* Currently(flink-1.6.0), flink sql generate unique keys only from group by and the unique keys info can be passed to the downstream operators, for example the SELECT. *Implement a RetractStreamTableSink* Since outer joins output update data without unique keys, you can use a RetractTableSink to output data. There are some documents about implement a table sink[2]. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#streaming-concepts [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink On Sat, Aug 11, 2018 at 6:02 AM, 徐涛 <happydexu...@gmail.com> wrote: > Hi Fabian, > Could you give an example that the query has a unique key? > What is the mechanism flink infer which field is the unique key(s)? > Thanks a lot! > > Best, Henry > > > 在 2018年8月11日,上午5:21,Fabian Hueske <fhue...@gmail.com> 写道: > > Hi Henry, > > The problem is that the table that results from the query does not have a > unique key. > You can only use an upsert sink if the table has a (composite) unique key. > Since this is not the case, you cannot use upsert sink. > However, you can implement a StreamRetractionTableSink which allows to > write any kind of Table (append-only/update, keyed/non-keyed) to an > external system. > > Best, Fabian > > 2018-08-10 17:06 GMT+02:00 徐涛 <happydexu...@gmail.com>: > >> Hi All, >> I am using flink 1.6 to generate some realtime programs. I want to write >> the output to table sink, the code is as below. At first I use append table >> sink, which error message tells me that I should use upsert table sink, so >> I write one. But still another error “Caused by: >> org.apache.flink.table.api.TableException: UpsertStreamTableSink >> requires that Table has a full primary keys if it is updated.” comes >> out,which blocks me. My questions is how to modify a table keys in this >> scenario? I also check the exception stack, and found that the system infer >> the keys field by >> val tableKeys: Option[Array[String]] = >> UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), >> I wonder how to make the function return value ? >> Thanks a lot !!! >> >> var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM >> praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' >> DAY),article_id" ) >> tableEnv.registerTable("praiseAggr", praise) >> >> var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU >> FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' >> DAY),article_id" ) >> tableEnv.registerTable("commentAggr", comment) >> >> var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM >> reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' >> DAY),article_id" ) >> tableEnv.registerTable("readerAggr", reader) >> >> var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " >> + " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN >> commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on >> c.article_id=r.article_id") >> >> >> >> >> Thank, >> Henry Xu >> > > >