Thank you guys. So I have no idea of why data is not being pushed to 
Elasticsearch… ☹

My complete code is at 
https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
Btw, for some reason I still need to pass .documentType to the Elasticsearch 
connection descriptor (getting it from 
org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t 
do types anymore.

In case you can’t access stackoverflow for some reason, here is the code below 
too:
/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new 
SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        
.host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local",
 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc") // not sure why this is still needed for ES7
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: 
{curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    tEnv.toRetractStream[Row](result).print() //Just to see if something is 
actually happening (and it is)

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}


Thank you,
Fernando


From: Jark Wu <imj...@gmail.com>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <java.dev....@gmail.com>
Cc: "Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com>, 
"user@flink.apache.org" <user@flink.apache.org>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

John is right.

Could you provide more detailed code? So that we can help to investigate.

Best,
Jark

On Wed, 4 Mar 2020 at 06:20, John Smith 
<java.dev....@gmail.com<mailto:java.dev....@gmail.com>> wrote:
The sink if for Streaming API, it looks like you are using SQL and tables. So 
you can use the connector to output the table result to Elastic. Unless you 
want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. 
<fernando.cas...@leidos.com<mailto:fernando.cas...@leidos.com>> wrote:
Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

I’m currently doing this successfully:
1 - streaming data from Kafka in Flink
2 - aggregating the data with Flink’s sqlQuery API
3 - outputting the result of #2 into STDOUT via toRetreatStream()

My objective is to change #3 so I’m upserting into an Elasticsearch index (see 
https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
 for my complete code)

I’ve been using the template for the Elasticsearch connector 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

By I’m confused from seeing some old examples online. Should I be using the 
Elasticsearch Sink 
(https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
 instead? Or both?

I’m having trouble with the current implementation where no data is outputting 
to Elasticsearch, but no error is being displayed in Flink (job status is 
RUNNING).

Hoping somebody could clarify what I’m missing? Thank you in advance!

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply via email to