Hi,

Sorry I am a beginner here. I am not really sure how to pack the dynamic
indices here.
the .index(test-ddmmyy) kind of indices here.
I have set the watermark for my kafka table source, but not sure how this
works on the the ElasticSearch Sink.

Pasted my sample code below:

tableEnv.connect(new Kafka()
      .version("0.11")
      .topic(params.getRequired("write-topic"))
      .property("bootstrap.servers", "localhost:9092")
      .sinkPartitionerRoundRobin())
      .withSchema(new Schema()
            .field("sid", Types.STRING())
            .field("ip", Types.STRING())
            .field("family", Types.STRING())
            .field("total_hits", Types.LONG())
            .field("tumbleStart", Types.SQL_TIMESTAMP())
            .field("tumbleEnd", Types.SQL_TIMESTAMP())
      )
      .withFormat(new Json().deriveSchema())
      .inAppendMode()
      .registerTableSink("sinkTopic");


new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("test")  ---- How to pass dynamic indices here, based on the
packet received from the table sink.
.documentType("user")
.failureHandlerRetryRejected()
.failureHandlerIgnore()

.bulkFlushMaxSize("20 mb")
.bulkFlushInterval(100000L)
.bulkFlushBackoffMaxRetries(3)

.connectionMaxRetryTimeout(3)
.connectionPathPrefix("/v1")


Thanks again !!


On Thu, Jan 10, 2019 at 2:55 PM miki haiat <miko5...@gmail.com> wrote:

> You can use flink  to manipulate the data by using
> TimeCharacteristic.EventTime[1] and set Watermark.
> Then if you have a lag or other issue  the data will be insert to the
> correct Indexes in elastic.
> More specific way to implement it with kafka[2]
>
>
>
>
> 1.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
> 2.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
>
> On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <hair...@gmail.com>
> wrote:
>
> > Hi David,
> >
> > thanks for the quick reply.
> > I did try that. I am not sure how to push into rolling indices here.
> > For example,  i would maintain daily indices on ES. Based on the event
> > time, i would like to classify the packets to appropriate indices. If
> there
> > was some lag in the source kafka, and i get to receive yesterday's data
> > [say maybe at 00:05 or something], Not sure how to pack the indices here.
> > Is there a way to come around this ??
> >
> > Regards,
> > ~Ramya.
> >
> > On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <dwysakow...@apache.org
> >
> > wrote:
> >
> > > Hi Ramya,
> > >
> > > Have you tried writing to ES directly from table API? You can check the
> > > ES connector for table API here:
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 10/01/2019 09:21, Ramya Ramamurthy wrote:
> > > > Hi,
> > > >
> > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
> and
> > > > insert to ElasticSearch. I have a kafka connector convert the data
> to a
> > > > Flink table. In order to insert into Elasticsearch, I have converted
> > this
> > > > table to a datastream, in order to be able to use the
> > ElasticSearchSink.
> > > > But the Row returned by the streams, have lost the schema. How do i
> > > convert
> > > > this to JSON before calling the Elasticsearch sink connector. Any
> help
> > or
> > > > suggestions would be appreciated.
> > > >
> > > > Thanks.
> > > >
> > >
> > >
> >
>

Reply via email to