Hi Fernando, Thanks for reporting back. >From my point of view, this is a short-comming of current elasticsearch connector, i.e. out-of-box doesn't work. I created FLINK-16495 [1] to improve this to have a default flush interval.
Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-16495 On Sat, 7 Mar 2020 at 00:20, Castro, Fernando C. <fernando.cas...@leidos.com> wrote: > Arvid, thank you that was it! > > After setting these properties to my Elasticsearch connector, I was able > to see the records upserting into ES! > > > .bulkFlushMaxActions(2) > .bulkFlushInterval(1000L) > > > > Thank you, > > Fernando > > > > > > *From: *Arvid Heise <ar...@ververica.com> > *Date: *Thursday, March 5, 2020 at 2:27 AM > *To: *"Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com> > *Cc: *Jark Wu <imj...@gmail.com>, John Smith <java.dev....@gmail.com>, " > user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both? > > > > Hi Fernando, > > > > How much data are you trying to write? If you just use single messages for > testing, it could be that the default bulk settings are not working well. > > > > If so, could you please adjust the following settings and report back? > > public enum SinkOption { > *BULK_FLUSH_MAX_ACTIONS*, > *BULK_FLUSH_MAX_SIZE*, > *BULK_FLUSH_INTERVAL* > } > > > > On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. < > fernando.cas...@leidos.com> wrote: > > Thank you guys. So I have no idea of why data is not being pushed to > Elasticsearch… ☹ > > > > My complete code is at > https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors > > Btw, for some reason I still need to pass .documentType to the > Elasticsearch connection descriptor (getting it from > org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 > doesn’t do types anymore. > > > > In case you can’t access stackoverflow for some reason, here is the code > below too: > > > > > */* * This Scala source file was generated by the Gradle 'init' task. */ > **package > *flinkNamePull > > *import *java.time.LocalDateTime > *import *java.util.Properties > > *import *org.apache.flink.api.common.serialization.SimpleStringSchema > *import *org.apache.flink.streaming.api.scala._ > *import *org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, > FlinkKafkaProducer010} > *import *org.apache.flink.api.common.functions.RichMapFunction > *import *org.apache.flink.configuration.Configuration > *import *org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > *import *org.apache.flink.table.api.{DataTypes, Table} > *import *org.apache.flink.table.api.scala.StreamTableEnvironment > *import *org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema} > *import *org.apache.flink.types.Row > > *object *Demo { > > > > */** * MapFunction to generate Transfers POJOs from parsed CSV data. > */ **class *TransfersMapper *extends *RichMapFunction[String, > Transfers] { > *private var *formatter = > > *null *@throws[Exception] > *override def *open(parameters: Configuration): Unit = { > *super*.open(parameters) > > *//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") *} > > @throws[Exception] > *override def *map(csvLine: String): Transfers = { > > *//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",") **var > *splitCsv = csvLine.stripLineEnd.split(*","*) > > *val *arrLength = splitCsv.length > *val *i = 0 > *if *(arrLength != 13) { > *for *(i <- arrLength + 1 to 13) { > *if *(i == 13) { > splitCsv = splitCsv :+ > *"0.0" *} *else *{ > splitCsv = splitCsv :+ > *"" *} > } > } > *var *trans = *new *Transfers() > trans.*rowId *= splitCsv(0) > trans.*subjectId *= splitCsv(1) > trans.*hadmId *= splitCsv(2) > trans.*icuStayId *= splitCsv(3) > trans.*dbSource *= splitCsv(4) > trans.*eventType *= splitCsv(5) > trans.*prev_careUnit *= splitCsv(6) > trans.*curr_careUnit *= splitCsv(7) > trans.*prev_wardId *= splitCsv(8) > trans.*curr_wardId *= splitCsv(9) > trans.*inTime *= splitCsv(10) > trans.*outTime *= splitCsv(11) > trans.*los *= splitCsv(12).toDouble > > *return *trans > } > } > > *def *main(args: Array[String]) { > > *// Create streaming execution environment **val *env = > StreamExecutionEnvironment. > *getExecutionEnvironment *env.setParallelism(1) > > > *// Set properties per KafkaConsumer API **val *properties = *new * > Properties() > properties.setProperty(*"bootstrap.servers"*, *"kafka.kafka:9092"*) > properties.setProperty(*"group.id <http://group.id>"*, *"test"*) > > > *// Add Kafka source to environment **val *myKConsumer = *new * > FlinkKafkaConsumer010[String](*"raw.data3"*, *new *SimpleStringSchema(), > properties) > > *// Read from beginning of topic *myKConsumer.setStartFromEarliest() > > *val *streamSource = env > .addSource(myKConsumer) > > > *// Transform CSV into a Transfers object **val *streamTransfers = > streamSource.map(*new *TransfersMapper()) > > > *// create a TableEnvironment **val *tEnv = StreamTableEnvironment. > *create*(env) > > > *// register a Table **val *tblTransfers: Table = > tEnv.fromDataStream(streamTransfers) > tEnv.createTemporaryView(*"transfers"*, tblTransfers) > > tEnv.connect( > *new *Elasticsearch() > .version(*"7"*) > .host( > *"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local"*, > 9200, *"http"*) > *// required: one or more Elasticsearch hosts to connect to * > .index(*"transfers-sum"*) > .documentType(*"_doc"*) *// not sure why this is still needed for > ES7* > .keyNullLiteral(*"n/a"*) > ) > .withFormat(*new *Json().jsonSchema(*"{type: 'object', properties: > {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"*)) > .withSchema(*new *Schema() > .field(*"curr_careUnit"*, DataTypes.*STRING*()) > .field(*"sum"*, DataTypes.*DOUBLE*()) > ) > .inUpsertMode() > .createTemporaryTable(*"transfersSum"*) > > *val *result = tEnv.sqlQuery( > > > > > *""" |SELECT curr_careUnit, sum(los) |FROM transfers > |GROUP BY curr_careUnit |"""*.stripMargin) > > result.insertInto(*"transfersSum"*) > > tEnv.toRetractStream[Row](result).print() > > *//Just to see if something is actually happening (and it is) * > env.execute(*"Flink Streaming Demo Dump to Elasticsearch"*) > } > } > > > > > > Thank you, > > Fernando > > > > > > *From: *Jark Wu <imj...@gmail.com> > *Date: *Tuesday, March 3, 2020 at 8:51 PM > *To: *John Smith <java.dev....@gmail.com> > *Cc: *"Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com>, " > user@flink.apache.org" <user@flink.apache.org> > *Subject: *EXTERNAL: Re: Should I use a Sink or Connector? Or Both? > > > > John is right. > > > > Could you provide more detailed code? So that we can help to investigate. > > > > Best, > > Jark > > > > On Wed, 4 Mar 2020 at 06:20, John Smith <java.dev....@gmail.com> wrote: > > The sink if for Streaming API, it looks like you are using SQL and tables. > So you can use the connector to output the table result to Elastic. Unless > you want to convert from table to stream first. > > > > On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. < > fernando.cas...@leidos.com> wrote: > > Hello folks! I’m new to Flink and data streaming in general, just initial > FYI ;) > > > > I’m currently doing this successfully: > > 1 - streaming data from Kafka in Flink > > 2 - aggregating the data with Flink’s sqlQuery API > > 3 - outputting the result of #2 into STDOUT via toRetreatStream() > > > > My objective is to change #3 so I’m upserting into an Elasticsearch index > (see > https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors > for my complete code) > > > > I’ve been using the template for the Elasticsearch connector > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector > > tableEnvironment > > .connect(...) > > .withFormat(...) > > .withSchema(...) > > .inAppendMode() > > .createTemporaryTable("MyTable") > > > > By I’m confused from seeing some old examples online. Should I be using > the Elasticsearch Sink ( > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) > instead? Or both? > > > > I’m having trouble with the current implementation where no data is > outputting to Elasticsearch, but no error is being displayed in Flink (job > status is RUNNING). > > > > Hoping somebody could clarify what I’m missing? Thank you in advance! > > > > Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10 > >