Hi Fernando,

Thanks for reporting back.
>From my point of view, this is a short-comming of current elasticsearch
connector, i.e. out-of-box doesn't work.
I created FLINK-16495 [1] to improve this to have a default flush interval.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16495

On Sat, 7 Mar 2020 at 00:20, Castro, Fernando C. <fernando.cas...@leidos.com>
wrote:

> Arvid, thank you that was it!
>
> After setting these properties to my Elasticsearch connector, I was able
> to see the records upserting into ES!
>
>
> .bulkFlushMaxActions(2)
> .bulkFlushInterval(1000L)
>
>
>
> Thank you,
>
> Fernando
>
>
>
>
>
> *From: *Arvid Heise <ar...@ververica.com>
> *Date: *Thursday, March 5, 2020 at 2:27 AM
> *To: *"Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com>
> *Cc: *Jark Wu <imj...@gmail.com>, John Smith <java.dev....@gmail.com>, "
> user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?
>
>
>
> Hi Fernando,
>
>
>
> How much data are you trying to write? If you just use single messages for
> testing, it could be that the default bulk settings are not working well.
>
>
>
> If so, could you please adjust the following settings and report back?
>
> public enum SinkOption {
>    *BULK_FLUSH_MAX_ACTIONS*,
>    *BULK_FLUSH_MAX_SIZE*,
>    *BULK_FLUSH_INTERVAL*
> }
>
>
>
> On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <
> fernando.cas...@leidos.com> wrote:
>
> Thank you guys. So I have no idea of why data is not being pushed to
> Elasticsearch… ☹
>
>
>
> My complete code is at
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>
> Btw, for some reason I still need to pass .documentType to the
> Elasticsearch connection descriptor (getting it from
> org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7
> doesn’t do types anymore.
>
>
>
> In case you can’t access stackoverflow for some reason, here is the code
> below too:
>
>
>
>
> */* * This Scala source file was generated by the Gradle 'init' task. */ 
> **package
> *flinkNamePull
>
> *import *java.time.LocalDateTime
> *import *java.util.Properties
>
> *import *org.apache.flink.api.common.serialization.SimpleStringSchema
> *import *org.apache.flink.streaming.api.scala._
> *import *org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,
> FlinkKafkaProducer010}
> *import *org.apache.flink.api.common.functions.RichMapFunction
> *import *org.apache.flink.configuration.Configuration
> *import *org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> *import *org.apache.flink.table.api.{DataTypes, Table}
> *import *org.apache.flink.table.api.scala.StreamTableEnvironment
> *import *org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
> *import *org.apache.flink.types.Row
>
> *object *Demo {
>
>
>
> */**    * MapFunction to generate Transfers POJOs from parsed CSV data.
> */   **class *TransfersMapper *extends *RichMapFunction[String,
> Transfers] {
>     *private var *formatter =
>
> *null     *@throws[Exception]
>     *override def *open(parameters: Configuration): Unit = {
>       *super*.open(parameters)
>
> *//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")     *}
>
>     @throws[Exception]
>     *override def *map(csvLine: String): Transfers = {
>
> *//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")       **var
> *splitCsv = csvLine.stripLineEnd.split(*","*)
>
>       *val *arrLength = splitCsv.length
>       *val *i = 0
>       *if *(arrLength != 13) {
>         *for *(i <- arrLength + 1 to 13) {
>           *if *(i == 13) {
>             splitCsv = splitCsv :+
> *"0.0"           *} *else *{
>             splitCsv = splitCsv :+
> *""           *}
>         }
>       }
>       *var *trans = *new *Transfers()
>       trans.*rowId *= splitCsv(0)
>       trans.*subjectId *= splitCsv(1)
>       trans.*hadmId *= splitCsv(2)
>       trans.*icuStayId *= splitCsv(3)
>       trans.*dbSource *= splitCsv(4)
>       trans.*eventType *= splitCsv(5)
>       trans.*prev_careUnit *= splitCsv(6)
>       trans.*curr_careUnit *= splitCsv(7)
>       trans.*prev_wardId *= splitCsv(8)
>       trans.*curr_wardId *= splitCsv(9)
>       trans.*inTime *= splitCsv(10)
>       trans.*outTime *= splitCsv(11)
>       trans.*los *= splitCsv(12).toDouble
>
>       *return *trans
>     }
>   }
>
>   *def *main(args: Array[String]) {
>
> *// Create streaming execution environment     **val *env =
> StreamExecutionEnvironment.
> *getExecutionEnvironment     *env.setParallelism(1)
>
>
> *// Set properties per KafkaConsumer API     **val *properties = *new *
> Properties()
>     properties.setProperty(*"bootstrap.servers"*, *"kafka.kafka:9092"*)
>     properties.setProperty(*"group.id <http://group.id>"*, *"test"*)
>
>
> *// Add Kafka source to environment     **val *myKConsumer = *new *
> FlinkKafkaConsumer010[String](*"raw.data3"*, *new *SimpleStringSchema(),
> properties)
>
> *// Read from beginning of topic     *myKConsumer.setStartFromEarliest()
>
>     *val *streamSource = env
>       .addSource(myKConsumer)
>
>
> *// Transform CSV into a Transfers object     **val *streamTransfers =
> streamSource.map(*new *TransfersMapper())
>
>
> *// create a TableEnvironment     **val *tEnv = StreamTableEnvironment.
> *create*(env)
>
>
> *// register a Table     **val *tblTransfers: Table =
> tEnv.fromDataStream(streamTransfers)
>     tEnv.createTemporaryView(*"transfers"*, tblTransfers)
>
>     tEnv.connect(
>       *new *Elasticsearch()
>         .version(*"7"*)
>         .host(
> *"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local"*,
> 9200, *"http"*)
> *// required: one or more Elasticsearch hosts to connect to         *
> .index(*"transfers-sum"*)
>         .documentType(*"_doc"*) *// not sure why this is still needed for
> ES7*
>         .keyNullLiteral(*"n/a"*)
>     )
>       .withFormat(*new *Json().jsonSchema(*"{type: 'object', properties:
> {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"*))
>       .withSchema(*new *Schema()
>         .field(*"curr_careUnit"*, DataTypes.*STRING*())
>         .field(*"sum"*, DataTypes.*DOUBLE*())
>       )
>       .inUpsertMode()
>       .createTemporaryTable(*"transfersSum"*)
>
>     *val *result = tEnv.sqlQuery(
>
>
>
>
> *"""         |SELECT curr_careUnit, sum(los)         |FROM transfers
>         |GROUP BY curr_careUnit         |"""*.stripMargin)
>
>     result.insertInto(*"transfersSum"*)
>
>     tEnv.toRetractStream[Row](result).print()
>
> *//Just to see if something is actually happening (and it is)     *
> env.execute(*"Flink Streaming Demo Dump to Elasticsearch"*)
>   }
> }
>
>
>
>
>
> Thank you,
>
> Fernando
>
>
>
>
>
> *From: *Jark Wu <imj...@gmail.com>
> *Date: *Tuesday, March 3, 2020 at 8:51 PM
> *To: *John Smith <java.dev....@gmail.com>
> *Cc: *"Castro, Fernando C. [US-US]" <fernando.cas...@leidos.com>, "
> user@flink.apache.org" <user@flink.apache.org>
> *Subject: *EXTERNAL: Re: Should I use a Sink or Connector? Or Both?
>
>
>
> John is right.
>
>
>
> Could you provide more detailed code? So that we can help to investigate.
>
>
>
> Best,
>
> Jark
>
>
>
> On Wed, 4 Mar 2020 at 06:20, John Smith <java.dev....@gmail.com> wrote:
>
> The sink if for Streaming API, it looks like you are using SQL and tables.
> So you can use the connector to output the table result to Elastic. Unless
> you want to convert from table to stream first.
>
>
>
> On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <
> fernando.cas...@leidos.com> wrote:
>
> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>
>

Reply via email to