A typical use case that will genreate updates (meaning not append only) is a non-widown groupy-by aggregation, like "select user, count(url) from clicks group by user".
You can refer to the flink doc at https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html Polarisary <polaris...@gmail.com> 于2019年11月14日周四 下午3:35写道: > My sql is regular insert like “insert into sink_table select c1,c2,c3 from > source_table”, > I want to know which case it will judge to append only? Does it has doc > for this? > > Many thanks! > > > > > > 在 2019年11月14日,上午10:05,张万新 <kevinzwx1...@gmail.com> 写道: > > Yes it's related to your sql, flink checks the plan of your sql to judge > whether your job is append only or has updates. If your job is append only, > that means no result need to be updated. > > If you still have problems, please post your sql and complete error > message to help people understand your use case. > > Polarisary <polaris...@gmail.com> 于2019年11月13日周三 下午6:43写道: > >> Hi >> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, >> the isAppendOnly is modified to ture, and keyFields is modified to null by >> StreamExecSink, but i want to upsert, >> Does this related to sql? >> >> the stack as follows: >> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) >> at task.Device.main(Device.java:77) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >> >> Hope to reply! >> many thanks >> >> >