Hi Flavio

Maybe this is what you want: 
https://github.com/397090770/flink-elasticsearch2-connector, It can save Flink 
DataSet to elasticsearch.
importscala.collection.JavaConversions._
valconfig=Map("bulk.flush.max.actions"->"1000", 
"cluster.name"->"elasticsearch")valhosts="www.iteblog.com"valtransports= 
hosts.split(",").map(host =>newInetSocketAddress(InetAddress.getByName(host), 
9300)).toListvaldata:DataSet[String] = ....
data.output(newElasticSearchOutputFormat(config, transports, 
newElasticsearchSinkFunction[String] {      defcreateIndexRequest(element: 
String):IndexRequest= {        
Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      overridedefprocess(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))


I hope this could help you


在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<tzuli...@apache.org>写道:


Hi Flavio,


I don’t think there is a bridge class for this. At the moment you’ll have to 
implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, 
which generally speaking at the moment has no bridge or unification yet with 
the DataSet API.


Cheers,
Gordon





On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:



Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write 
to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from 
Flink but ElasticsearchSink only works with streaming environment.



Is there any bridge class for this?


Best,
Flavio

Reply via email to