Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

I’m currently doing this successfully:
1 - streaming data from Kafka in Flink
2 - aggregating the data with Flink’s sqlQuery API
3 - outputting the result of #2 into STDOUT via toRetreatStream()

My objective is to change #3 so I’m upserting into an Elasticsearch index (see 
https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
 for my complete code)

I’ve been using the template for the Elasticsearch connector 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

By I’m confused from seeing some old examples online. Should I be using the 
Elasticsearch Sink 
(https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
 instead? Or both?

I’m having trouble with the current implementation where no data is outputting 
to Elasticsearch, but no error is being displayed in Flink (job status is 
RUNNING).

Hoping somebody could clarify what I’m missing? Thank you in advance!

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply via email to