A typical use case that will genreate updates (meaning not append only) is
a non-widown groupy-by aggregation, like "select user, count(url) from
clicks group by user".

You can refer to the flink doc at
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html


Polarisary <polaris...@gmail.com> 于2019年11月14日周四 下午3:35写道:

> My sql is regular insert like “insert into sink_table select c1,c2,c3 from
> source_table”,
> I want to know which case it will judge to append only? Does it has doc
> for this?
>
> Many thanks!
>
>
>
>
>
> 在 2019年11月14日,上午10:05,张万新 <kevinzwx1...@gmail.com> 写道:
>
> Yes it's related to your sql, flink checks the plan of your sql to judge
> whether your job is append only or has updates. If your job is append only,
> that means no result need to be updated.
>
> If you still have problems, please post your sql and complete error
> message to help people understand your use case.
>
> Polarisary <polaris...@gmail.com> 于2019年11月13日周三 下午6:43写道:
>
>> Hi
>> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql,
>> the isAppendOnly is modified to ture, and keyFields is modified to null by
>> StreamExecSink, but i want to upsert,
>> Does this related to sql?
>>
>> the stack as follows:
>> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
>> at task.Device.main(Device.java:77)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>
>> Hope to reply!
>> many thanks
>>
>>
>

Reply via email to