Hi, aj > I was confused before as I was thinking the sink builder is called only once > but it gets called for every batch request, correct me if my understanding is > wrong.
You’re right that sink builder should be called only once rather than every batch requests, could you post some code piece of using the sink? Best, Leonard Xu > On Fri, May 29, 2020 at 9:08 AM Leonard Xu <xbjt...@gmail.com > <mailto:xbjt...@gmail.com>> wrote: > Hi,aj > > In the implementation of ElasticsearchSink, ElasticsearchSink won't create > index and only start a Elastic client for sending requests to > the Elastic cluster. You can simply extract the index(date value in your > case) from your timestamp field and then put it to an IndexRequest[2], > ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic > cluster will create corresponding index and flush the records. > > BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql > connector [2], you can simply config 'connector.index' = > ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals. > > Best, > Leoanrd Xu > [1] > https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119 > > <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119> > > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector> > > > > >> 在 2020年5月29日,02:43,aj <ajainje...@gmail.com <mailto:ajainje...@gmail.com>> >> 写道: >> >> Hello All, >> >> I am getting many events in Kafka and I have written a link job that sinks >> that Avro records from Kafka to S3 in parquet format. >> >> Now, I want to sink these records into elastic search. but the only >> challenge is that I want to sink record on time indices. Basically, In >> Elastic, I want to create a per day index with the date as the suffix. >> So in Flink stream job if I create an es sink how will I change the sink to >> start writing in a new index when the first event of the day arrives >> >> Thanks, >> Anuj. >> >> >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> > > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/>