Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)
I’m currently doing this successfully: 1 - streaming data from Kafka in Flink 2 - aggregating the data with Flink’s sqlQuery API 3 - outputting the result of #2 into STDOUT via toRetreatStream() My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code) I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("MyTable") By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both? I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING). Hoping somebody could clarify what I’m missing? Thank you in advance! Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10