Hi, Srikanth~

In your code, 
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, 
Row.class).map(t -> {});  has converted the resultTable into a DataStream 
that’s unrelated with tableApi,
And the following code `outStreamAgg.addSink(…)` is just a normall stream write 
to a FlinkKafka sink function.
Your program code is a mixture of table api and dataStream programing not just 
single Table API.
Best,
Terry Wang



> 在 2019年9月26日,下午5:47,srikanth flink <flink.d...@gmail.com> 写道:
> 
> Hi Terry Wang,
> 
> Thanks for quick reply.
> 
> I would like to understand more on your line " If the target table is a type 
> of Kafka which implments AppendStreamTableSink, the update-mode will be 
> append only". 
> If your statement defines retract mode could not be used for Kafka sinks as 
> it implements AppendStreamTableSink, but then the below code is working for 
> me, dumping data to Kafka:
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, 
> Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> }); 
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
> 
> outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", 
> new SimpleStringSchema(),
> kafkaProducerProperties));
> 
> Is it that the above functionality works only with Table API and not with SQL?
> Please explain.
> 
> Thanks
> Srikanth
> 
> 
> 
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwa...@gmail.com 
> <mailto:zjuwa...@gmail.com>> wrote:
> Hi srikanth~
> 
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink` 
> `UpsertStreamTableSink` and `RetractStreamTableSink`. 
> If the target table is a type of Kafka which implments AppendStreamTableSink, 
> the update-mode will be append only. 
> So if you want enable retract-mode you may need to insert into one kind of 
> RetractStreamTableSink.
> Hope it helps you ~
> 
> 
> 
> Best,
> Terry Wang
> 
> 
> 
>> 在 2019年9月26日,下午2:50,srikanth flink <flink.d...@gmail.com 
>> <mailto:flink.d...@gmail.com>> 写道:
>> 
>> How could I configure environment file for Flink SQL, update-mode: retract?
>> 
>> I have this for append:
>> properties:         
>>         - key: zookeeper.connect
>>           value: localhost:2181
>>         - key: bootstrap.servers
>>           value: localhost:9092
>>         - key: group.id <http://group.id/>
>>           value: reconMultiAttempFail
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'a': {
>>                type: 'string'
>>             },
>>             'b': {
>>                type: 'string'
>>             },
>>             'cnt': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>> 
>>     schema:
>>       - name: 'a'
>>         type: VARCHAR
>>      - name: 'b'
>>         type: VARCHAR
>>       - name: 'cnt'
>>         type: BIGINT
>> 
>> Couldn't find any document for the same. 
>> 
>> someone help me with the syntax.
>> 
>> Thanks
>> Srikanth
>> 
> 

Reply via email to