Thanks a lot for the support! On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:
> Hi! > > Thanks for sharing that repo! I think that would be quite an useful > contribution to Flink for the users, if you’re up to preparing a PR for it > :) > > It also looks like you’ve adopted most of the current ElasticsearchSink > APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the > ElasticsearchOutputFormat, which is nice to fit into the current code :-D > > Cheers, > Gordon > > > On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2...@163.com) wrote: > > Hi Flavio > > Maybe this is what you want: https://github.com/397090770/ > flink-elasticsearch2-connector, It can save Flink DataSet to > elasticsearch. > > import scala.collection.JavaConversions._ > > val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> > "elasticsearch")val hosts = "www.iteblog.com"val transports = > hosts.split(",").map(host => new > InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : > DataSet[String] = .... > data.output(new ElasticSearchOutputFormat(config, transports, new > ElasticsearchSinkFunction[String] { def createIndexRequest(element: > String): IndexRequest = { > Requests.indexRequest.index("iteblog").`type`("info").source(element) > } override def process(element: String, ctx: RuntimeContext, > indexer: RequestIndexer) { > indexer.add(createIndexRequest(element)) > } > })) > > > I hope this could help you > > 在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<tzuli...@apache.org>写道: > > > Hi Flavio, > > I don’t think there is a bridge class for this. At the moment you’ll have > to implement your own OutputFormat. > The ElasticsearchSink is a SinkFunction which is part of the DataStream > API, which generally speaking at the moment has no bridge or unification > yet with the DataSet API. > > Cheers, > Gordon > > > On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it) > wrote: > > > Hi to all, > at the moment I have a Flink Job that generates a DataSet<String> that I > write to a File that is read by Logstash to index data on ES. > I'd like to use the new ElasticsearchSink to index those JSON directly > from Flink but ElasticsearchSink only works with streaming environment. > > Is there any bridge class for this? > > Best, > Flavio > >