Hi Fernando,

How much data are you trying to write? If you just use single messages for
testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?

public enum SinkOption {
   BULK_FLUSH_MAX_ACTIONS,
   BULK_FLUSH_MAX_SIZE,
   BULK_FLUSH_INTERVAL
}


On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <
fernando.cas...@leidos.com> wrote:

> Thank you guys. So I have no idea of why data is not being pushed to
> Elasticsearch… ☹
>
>
>
> My complete code is at
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>
> Btw, for some reason I still need to pass .documentType to the
> Elasticsearch connection descriptor (getting it from
> org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7
> doesn’t do types anymore.
>
>
>
> In case you can’t access stackoverflow for some reason, here is the code
> below too:
>
>
>
>
> */* * This Scala source file was generated by the Gradle 'init' task. */ 
> **package
> *flinkNamePull
>
> *import *java.time.LocalDateTime
> *import *java.util.Properties
>
> *import *org.apache.flink.api.common.serialization.SimpleStringSchema
> *import *org.apache.flink.streaming.api.scala._
> *import *org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,
> FlinkKafkaProducer010}
> *import *org.apache.flink.api.common.functions.RichMapFunction
> *import *org.apache.flink.configuration.Configuration
> *import *org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> *import *org.apache.flink.table.api.{DataTypes, Table}
> *import *org.apache.flink.table.api.scala.StreamTableEnvironment
> *import *org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
> *import *org.apache.flink.types.Row
>
> *object *Demo {
>
>
>
> */**    * MapFunction to generate Transfers POJOs from parsed CSV data.
> */   **class *TransfersMapper *extends *RichMapFunction[String,
> Transfers] {
>     *private var *formatter =
>
> *null     *@throws[Exception]
>     *override def *open(parameters: Configuration): Unit = {
>       *super*.open(parameters)
>
> *//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")     *}
>
>     @throws[Exception]
>     *override def *map(csvLine: String): Transfers = {
>
> *//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")       **var
> *splitCsv = csvLine.stripLineEnd.split(*","*)
>
>       *val *arrLength = splitCsv.length
>       *val *i = 0
>       *if *(arrLength != 13) {
>         *for *(i <- arrLength + 1 to 13) {
>           *if *(i == 13) {
>             splitCsv = splitCsv :+
> *"0.0"           *} *else *{
>             splitCsv = splitCsv :+
> *""           *}
>         }
>       }
>       *var *trans = *new *Transfers()
>       trans.*rowId *= splitCsv(0)
>       trans.*subjectId *= splitCsv(1)
>       trans.*hadmId *= splitCsv(2)
>       trans.*icuStayId *= splitCsv(3)
>       trans.*dbSource *= splitCsv(4)
>       trans.*eventType *= splitCsv(5)
>       trans.*prev_careUnit *= splitCsv(6)
>       trans.*curr_careUnit *= splitCsv(7)
>       trans.*prev_wardId *= splitCsv(8)
>       trans.*curr_wardId *= splitCsv(9)
>       trans.*inTime *= splitCsv(10)
>       trans.*outTime *= splitCsv(11)
>       trans.*los *= splitCsv(12).toDouble
>
>       *return *trans
>     }
>   }
>
>   *def *main(args: Array[String]) {
>
> *// Create streaming execution environment     **val *env =
> StreamExecutionEnvironment.
> *getExecutionEnvironment     *env.setParallelism(1)
>
>
> *// Set properties per KafkaConsumer API     **val *properties = *new *
> Properties()
>     properties.setProperty(*"bootstrap.servers"*, *"kafka.kafka:9092"*)
>     properties.setProperty(*"group.id <http://group.id>"*, *"test"*)
>
>
> *// Add Kafka source to environment     **val *myKConsumer = *new *
> FlinkKafkaConsumer010[String](*"raw.data3"*, *new *SimpleStringSchema(),
> properties)
>
> *// Read from beginning of topic     *myKConsumer.setStartFromEarliest()
>
>     *val *streamSource = env
>       .addSource(myKConsumer)
>
>
> *// Transform CSV into a Transfers object     **val *streamTransfers =
> streamSource.map(*new *TransfersMapper())
>
>
> *// create a TableEnvironment     **val *tEnv = StreamTableEnvironment.
> *create*(env)
>
>
> *// register a Table     **val *tblTransfers: Table =
> tEnv.fromDataStream(streamTransfers)
>     tEnv.createTemporaryView(*"transfers"*, tblTransfers)
>
>     tEnv.connect(
>       *new *Elasticsearch()
>         .version(*"7"*)
>         .host(
> *"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local"*,
> 9200, *"http"*)
> *// required: one or more Elasticsearch hosts to connect to         *
> .index(*"transfers-sum"*)
>         .documentType(*"_doc"*) *// not sure why this is still needed for
> ES7*
>         .keyNullLiteral(*"n/a"*)
>     )
>       .withFormat(*new *Json().jsonSchema(*"{type: 'object', properties:
> {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"*))
>       .withSchema(*new *Schema()
>         .field(*"curr_careUnit"*, DataTypes.*STRING*())
>         .field(*"sum"*, DataTypes.*DOUBLE*())
>       )
>       .inUpsertMode()
>       .createTemporaryTable(*"transfersSum"*)
>
>     *val *result = tEnv.sqlQuery(
>
>
>
>
> *"""         |SELECT curr_careUnit, sum(los)         |FROM transfers
>         |GROUP BY curr_careUnit         |"""*.stripMargin)
>
>     result.insertInto(*"transfersSum"*)
>
>     tEnv.toRetractStream[Row](result).print()
>
> *//Just to see if something is actually happening (and it is)     *
> env.execute(*"Flink Streaming Demo Dump to Elasticsearch"*)
>   }
> }
>
>
>
>
>
> Thank you,
>
> Fernando
>
>
>
>
>
> *From: *Jark Wu <imj...@gmail.com>
> *Date: *Tuesday, March 3, 2020 at 8:51 PM
> *To: *John Smith <java.dev....@gmail.com>
> *Cc: *"Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com>, "
> user@flink.apache.org" <user@flink.apache.org>
> *Subject: *EXTERNAL: Re: Should I use a Sink or Connector? Or Both?
>
>
>
> John is right.
>
>
>
> Could you provide more detailed code? So that we can help to investigate.
>
>
>
> Best,
>
> Jark
>
>
>
> On Wed, 4 Mar 2020 at 06:20, John Smith <java.dev....@gmail.com> wrote:
>
> The sink if for Streaming API, it looks like you are using SQL and tables.
> So you can use the connector to output the table result to Elastic. Unless
> you want to convert from table to stream first.
>
>
>
> On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <
> fernando.cas...@leidos.com> wrote:
>
> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>
>

Reply via email to