Hi, aj

> I was confused before as I was thinking the sink builder is called only once 
> but it gets called for every batch request, correct me if my understanding is 
> wrong. 

You’re right that sink builder should be called only once rather than every 
batch requests, could you post some code piece of using the sink?

Best,
Leonard Xu



> On Fri, May 29, 2020 at 9:08 AM Leonard Xu <xbjt...@gmail.com 
> <mailto:xbjt...@gmail.com>> wrote:
> Hi,aj
> 
> In the implementation of ElasticsearchSink, ElasticsearchSink  won't create 
> index and only start a Elastic client for sending requests to
> the Elastic cluster. You can simply extract the index(date value in your 
> case) from your timestamp field and then put it to an IndexRequest[2],  
> ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic 
> cluster will create corresponding index and flush the records.
> 
> BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql 
> connector [2], you can simply config 'connector.index' = 
> ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.
> 
> Best,
> Leoanrd Xu
> [1] 
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
>  
> <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119>
>  
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector>
>     
> 
> 
> 
>> 在 2020年5月29日,02:43,aj <ajainje...@gmail.com <mailto:ajainje...@gmail.com>> 
>> 写道:
>> 
>> Hello All,
>> 
>> I am getting many events in Kafka and I have written a link job that sinks 
>> that Avro records from Kafka to S3 in parquet format. 
>> 
>> Now, I want to sink these records into elastic search. but the only 
>> challenge is that I want to sink record on time indices. Basically, In 
>> Elastic, I want to create a per day index with the date as the suffix. 
>> So in Flink stream job if I create an es sink how will I change the sink to 
>> start writing  in a new index when the first event of the day arrives
>> 
>> Thanks,
>> Anuj. 
>> 
>> 
>>  <http://www.oracle.com/>
>> 
>> 
>>  <http://www.cse.iitm.ac.in/%7Eanujjain/>
> 
> 
> -- 
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877 
> Skype : anuj.jain07
>  <http://www.oracle.com/>
> 
> 
>  <http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to