Hi, Sorry I am a beginner here. I am not really sure how to pack the dynamic indices here. the .index(test-ddmmyy) kind of indices here. I have set the watermark for my kafka table source, but not sure how this works on the the ElasticSearch Sink.
Pasted my sample code below: tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("write-topic")) .property("bootstrap.servers", "localhost:9092") .sinkPartitionerRoundRobin()) .withSchema(new Schema() .field("sid", Types.STRING()) .field("ip", Types.STRING()) .field("family", Types.STRING()) .field("total_hits", Types.LONG()) .field("tumbleStart", Types.SQL_TIMESTAMP()) .field("tumbleEnd", Types.SQL_TIMESTAMP()) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSink("sinkTopic"); new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("test") ---- How to pass dynamic indices here, based on the packet received from the table sink. .documentType("user") .failureHandlerRetryRejected() .failureHandlerIgnore() .bulkFlushMaxSize("20 mb") .bulkFlushInterval(100000L) .bulkFlushBackoffMaxRetries(3) .connectionMaxRetryTimeout(3) .connectionPathPrefix("/v1") Thanks again !! On Thu, Jan 10, 2019 at 2:55 PM miki haiat <miko5...@gmail.com> wrote: > You can use flink to manipulate the data by using > TimeCharacteristic.EventTime[1] and set Watermark. > Then if you have a lag or other issue the data will be insert to the > correct Indexes in elastic. > More specific way to implement it with kafka[2] > > > > > 1. > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps > 2. > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > > > On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <hair...@gmail.com> > wrote: > > > Hi David, > > > > thanks for the quick reply. > > I did try that. I am not sure how to push into rolling indices here. > > For example, i would maintain daily indices on ES. Based on the event > > time, i would like to classify the packets to appropriate indices. If > there > > was some lag in the source kafka, and i get to receive yesterday's data > > [say maybe at 00:05 or something], Not sure how to pack the indices here. > > Is there a way to come around this ?? > > > > Regards, > > ~Ramya. > > > > On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <dwysakow...@apache.org > > > > wrote: > > > > > Hi Ramya, > > > > > > Have you tried writing to ES directly from table API? You can check the > > > ES connector for table API here: > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector > > > > > > Best, > > > > > > Dawid > > > > > > On 10/01/2019 09:21, Ramya Ramamurthy wrote: > > > > Hi, > > > > > > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka > and > > > > insert to ElasticSearch. I have a kafka connector convert the data > to a > > > > Flink table. In order to insert into Elasticsearch, I have converted > > this > > > > table to a datastream, in order to be able to use the > > ElasticSearchSink. > > > > But the Row returned by the streams, have lost the schema. How do i > > > convert > > > > this to JSON before calling the Elasticsearch sink connector. Any > help > > or > > > > suggestions would be appreciated. > > > > > > > > Thanks. > > > > > > > > > > > > >