Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch… ☹
My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore. In case you can’t access stackoverflow for some reason, here is the code below too: /* * This Scala source file was generated by the Gradle 'init' task. */ package flinkNamePull import java.time.LocalDateTime import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema} import org.apache.flink.types.Row object Demo { /** * MapFunction to generate Transfers POJOs from parsed CSV data. */ class TransfersMapper extends RichMapFunction[String, Transfers] { private var formatter = null @throws[Exception] override def open(parameters: Configuration): Unit = { super.open(parameters) //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") } @throws[Exception] override def map(csvLine: String): Transfers = { //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",") var splitCsv = csvLine.stripLineEnd.split(",") val arrLength = splitCsv.length val i = 0 if (arrLength != 13) { for (i <- arrLength + 1 to 13) { if (i == 13) { splitCsv = splitCsv :+ "0.0" } else { splitCsv = splitCsv :+ "" } } } var trans = new Transfers() trans.rowId = splitCsv(0) trans.subjectId = splitCsv(1) trans.hadmId = splitCsv(2) trans.icuStayId = splitCsv(3) trans.dbSource = splitCsv(4) trans.eventType = splitCsv(5) trans.prev_careUnit = splitCsv(6) trans.curr_careUnit = splitCsv(7) trans.prev_wardId = splitCsv(8) trans.curr_wardId = splitCsv(9) trans.inTime = splitCsv(10) trans.outTime = splitCsv(11) trans.los = splitCsv(12).toDouble return trans } } def main(args: Array[String]) { // Create streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // Set properties per KafkaConsumer API val properties = new Properties() properties.setProperty("bootstrap.servers", "kafka.kafka:9092") properties.setProperty("group.id", "test") // Add Kafka source to environment val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties) // Read from beginning of topic myKConsumer.setStartFromEarliest() val streamSource = env .addSource(myKConsumer) // Transform CSV into a Transfers object val streamTransfers = streamSource.map(new TransfersMapper()) // create a TableEnvironment val tEnv = StreamTableEnvironment.create(env) // register a Table val tblTransfers: Table = tEnv.fromDataStream(streamTransfers) tEnv.createTemporaryView("transfers", tblTransfers) tEnv.connect( new Elasticsearch() .version("7") .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http") // required: one or more Elasticsearch hosts to connect to .index("transfers-sum") .documentType("_doc") // not sure why this is still needed for ES7 .keyNullLiteral("n/a") ) .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}")) .withSchema(new Schema() .field("curr_careUnit", DataTypes.STRING()) .field("sum", DataTypes.DOUBLE()) ) .inUpsertMode() .createTemporaryTable("transfersSum") val result = tEnv.sqlQuery( """ |SELECT curr_careUnit, sum(los) |FROM transfers |GROUP BY curr_careUnit |""".stripMargin) result.insertInto("transfersSum") tEnv.toRetractStream[Row](result).print() //Just to see if something is actually happening (and it is) env.execute("Flink Streaming Demo Dump to Elasticsearch") } } Thank you, Fernando From: Jark Wu <imj...@gmail.com> Date: Tuesday, March 3, 2020 at 8:51 PM To: John Smith <java.dev....@gmail.com> Cc: "Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both? John is right. Could you provide more detailed code? So that we can help to investigate. Best, Jark On Wed, 4 Mar 2020 at 06:20, John Smith <java.dev....@gmail.com<mailto:java.dev....@gmail.com>> wrote: The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first. On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <fernando.cas...@leidos.com<mailto:fernando.cas...@leidos.com>> wrote: Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;) I’m currently doing this successfully: 1 - streaming data from Kafka in Flink 2 - aggregating the data with Flink’s sqlQuery API 3 - outputting the result of #2 into STDOUT via toRetreatStream() My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code) I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("MyTable") By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both? I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING). Hoping somebody could clarify what I’m missing? Thank you in advance! Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10