Awesome, thanks!

On Thu, Sep 26, 2019 at 5:50 PM Terry Wang <zjuwa...@gmail.com> wrote:

> Hi, Srikanth~
>
> In your code,
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {});  has converted the resultTable into a DataStream
> that’s unrelated with tableApi,
> And the following code `outStreamAgg.addSink(…)` is just a normall stream
> write to a FlinkKafka sink function.
> Your program code is a mixture of table api and dataStream programing not
> just single Table API.
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午5:47,srikanth flink <flink.d...@gmail.com> 写道:
>
> Hi Terry Wang,
>
> Thanks for quick reply.
>
> I would like to understand more on your line " If the target table is a
> type of Kafka which implments AppendStreamTableSink, the update-mode will
> be append only".
> If your statement defines retract mode could not be used for Kafka sinks
> as it implements AppendStreamTableSink, but then the below code is working
> for me, dumping data to Kafka:
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> });
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> outStreamAgg.addSink(new
> FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
> kafkaProducerProperties));
>
> Is it that the above functionality works only with Table API and not with
> SQL?
> Please explain.
>
> Thanks
> Srikanth
>
>
>
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwa...@gmail.com> wrote:
>
>> Hi srikanth~
>>
>> The Flink SQL update-mode is inferred from the target table type.
>> For now, there are three StreamTableSink type, `AppendStreamTableSink`
>> `UpsertStreamTableSink` and `RetractStreamTableSink`.
>> If the target table is a type of Kafka which implments
>> AppendStreamTableSink, the update-mode will be append only.
>> So if you want enable retract-mode you may need to insert into one kind
>> of RetractStreamTableSink.
>> Hope it helps you ~
>>
>>
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 在 2019年9月26日,下午2:50,srikanth flink <flink.d...@gmail.com> 写道:
>>
>> How could I configure environment file for Flink SQL, update-mode:
>> retract?
>>
>> I have this for append:
>> properties:
>>         - key: zookeeper.connect
>>           value: localhost:2181
>>         - key: bootstrap.servers
>>           value: localhost:9092
>>         - key: group.id
>>           value: reconMultiAttempFail
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'a': {
>>                type: 'string'
>>             },
>>             'b': {
>>                type: 'string'
>>             },
>>             'cnt': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>>
>>     schema:
>>       - name: 'a'
>>         type: VARCHAR
>>      - name: 'b'
>>         type: VARCHAR
>>       - name: 'cnt'
>>         type: BIGINT
>>
>> Couldn't find any document for the same.
>>
>> someone help me with the syntax.
>>
>> Thanks
>> Srikanth
>>
>>
>>
>

Reply via email to