Hi,aj

In the implementation of ElasticsearchSink, ElasticsearchSink  won't create 
index and only start a Elastic client for sending requests to
the Elastic cluster. You can simply extract the index(date value in your case) 
from your timestamp field and then put it to an IndexRequest[2],  
ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic 
cluster will create corresponding index and flush the records.

BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql 
connector [2], you can simply config 'connector.index' = 
‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.

Best,
Leoanrd Xu
[1] 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
 
<https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119>
 
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector>
    



> 在 2020年5月29日,02:43,aj <ajainje...@gmail.com> 写道:
> 
> Hello All,
> 
> I am getting many events in Kafka and I have written a link job that sinks 
> that Avro records from Kafka to S3 in parquet format. 
> 
> Now, I want to sink these records into elastic search. but the only challenge 
> is that I want to sink record on time indices. Basically, In Elastic, I want 
> to create a per day index with the date as the suffix. 
> So in Flink stream job if I create an es sink how will I change the sink to 
> start writing  in a new index when the first event of the day arrives
> 
> Thanks,
> Anuj. 
> 
> 
>  <http://www.oracle.com/>
> 
> 
>  <http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to