Awesome, thanks! On Thu, Sep 26, 2019 at 5:50 PM Terry Wang <zjuwa...@gmail.com> wrote:
> Hi, Srikanth~ > > In your code, > DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, > Row.class).map(t -> {}); has converted the resultTable into a DataStream > that’s unrelated with tableApi, > And the following code `outStreamAgg.addSink(…)` is just a normall stream > write to a FlinkKafka sink function. > Your program code is a mixture of table api and dataStream programing not > just single Table API. > > Best, > Terry Wang > > > > 在 2019年9月26日,下午5:47,srikanth flink <flink.d...@gmail.com> 写道: > > Hi Terry Wang, > > Thanks for quick reply. > > I would like to understand more on your line " If the target table is a > type of Kafka which implments AppendStreamTableSink, the update-mode will > be append only". > If your statement defines retract mode could not be used for Kafka sinks > as it implements AppendStreamTableSink, but then the below code is working > for me, dumping data to Kafka: > DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, > Row.class).map(t -> { > Row r = t.f1; > ObjectNode node = mapper.createObjectNode(); > node.put("source.ip", r.getField(0).toString()); > node.put("destination.ip", r.getField(1).toString()); > node.put("cnt", Long.parseLong(r.getField(2).toString())); > return node.toString(); > }); > Properties kafkaProducerProperties = new Properties(); > kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "host:9092"); > kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); > kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); > > outStreamAgg.addSink(new > FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(), > kafkaProducerProperties)); > > Is it that the above functionality works only with Table API and not with > SQL? > Please explain. > > Thanks > Srikanth > > > > On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwa...@gmail.com> wrote: > >> Hi srikanth~ >> >> The Flink SQL update-mode is inferred from the target table type. >> For now, there are three StreamTableSink type, `AppendStreamTableSink` >> `UpsertStreamTableSink` and `RetractStreamTableSink`. >> If the target table is a type of Kafka which implments >> AppendStreamTableSink, the update-mode will be append only. >> So if you want enable retract-mode you may need to insert into one kind >> of RetractStreamTableSink. >> Hope it helps you ~ >> >> >> >> Best, >> Terry Wang >> >> >> >> 在 2019年9月26日,下午2:50,srikanth flink <flink.d...@gmail.com> 写道: >> >> How could I configure environment file for Flink SQL, update-mode: >> retract? >> >> I have this for append: >> properties: >> - key: zookeeper.connect >> value: localhost:2181 >> - key: bootstrap.servers >> value: localhost:9092 >> - key: group.id >> value: reconMultiAttempFail >> format: >> type: json >> fail-on-missing-field: false >> json-schema: > >> { >> type: 'object', >> properties: { >> 'a': { >> type: 'string' >> }, >> 'b': { >> type: 'string' >> }, >> 'cnt': { >> type: 'string' >> } >> } >> } >> derive-schema: false >> >> schema: >> - name: 'a' >> type: VARCHAR >> - name: 'b' >> type: VARCHAR >> - name: 'cnt' >> type: BIGINT >> >> Couldn't find any document for the same. >> >> someone help me with the syntax. >> >> Thanks >> Srikanth >> >> >> >