Hi All,
        I am using flink 1.6 to generate some realtime programs. I want to 
write the output to table sink, the code is as below. At first I use append 
table sink, which error message tells me that I should use upsert table sink, 
so I write one. But still another error “Caused by: 
org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated.” comes out,which blocks me. My 
questions is how to modify a table keys in this scenario? I also check the 
exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = 
UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the 
function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM 
praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
DAY),article_id" )
    tableEnv.registerTable("praiseAggr", praise)

    var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU 
FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
DAY),article_id" )
    tableEnv.registerTable("commentAggr", comment)

    var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM 
reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
DAY),article_id" )
    tableEnv.registerTable("readerAggr", reader)

    var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  
" SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN 
commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on 
c.article_id=r.article_id")

        

        

Thank,
Henry Xu

Reply via email to