Great! I was just thinking that, in principle, a streaming sink is an extension of a batch one. Am I wrong? This would avoid a lot of code duplication and would improve the overall maintainability..
On Thu, May 11, 2017 at 4:35 AM, wyphao.2007 <wyphao.2...@163.com> wrote: > Hi Flavio, I made a PR for this : https://github.com/apache/ > flink/pull/3869 > And it also support ActionRequestFailureHandler in DataSet's > ElasticsearchSink > > Best > > 在2017年05月09 15时30分, "Flavio Pompermaier"<pomperma...@okkam.it>写道: > > > Just one note: I took a look at your connector and it doesn't provide any > failure handling mechanism that is very useful for us. > Maybe it could worth to add ActionRequestFailureHandler as provided now > by the current ES streaming connector and introduced by commit > https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f > 86617bb5ef > > Best, > Flavio > > On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Thanks a lot for the support! >> >> On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote: >> >>> Hi! >>> >>> Thanks for sharing that repo! I think that would be quite an useful >>> contribution to Flink for the users, if you’re up to preparing a PR for it >>> :) >>> >>> It also looks like you’ve adopted most of the current ElasticsearchSink >>> APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the >>> ElasticsearchOutputFormat, which is nice to fit into the current code :-D >>> >>> Cheers, >>> Gordon >>> >>> >>> On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2...@163.com) wrote: >>> >>> Hi Flavio >>> >>> Maybe this is what you want: https://github.com/397090770/f >>> link-elasticsearch2-connector, It can save Flink DataSet to >>> elasticsearch. >>> >>> import scala.collection.JavaConversions._ >>> >>> val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> >>> "elasticsearch")val hosts = "www.iteblog.com"val transports = >>> hosts.split(",").map(host => new >>> InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : >>> DataSet[String] = .... >>> data.output(new ElasticSearchOutputFormat(config, transports, new >>> ElasticsearchSinkFunction[String] { def createIndexRequest(element: >>> String): IndexRequest = { >>> Requests.indexRequest.index("iteblog").`type`("info").source(element) >>> } override def process(element: String, ctx: RuntimeContext, >>> indexer: RequestIndexer) { >>> indexer.add(createIndexRequest(element)) >>> } >>> })) >>> >>> >>> I hope this could help you >>> >>> 在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<tzuli...@apache.org>写道: >>> >>> >>> Hi Flavio, >>> >>> I don’t think there is a bridge class for this. At the moment you’ll >>> have to implement your own OutputFormat. >>> The ElasticsearchSink is a SinkFunction which is part of the DataStream >>> API, which generally speaking at the moment has no bridge or unification >>> yet with the DataSet API. >>> >>> Cheers, >>> Gordon >>> >>> >>> On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it) >>> wrote: >>> >>> >>> Hi to all, >>> at the moment I have a Flink Job that generates a DataSet<String> that I >>> write to a File that is read by Logstash to index data on ES. >>> I'd like to use the new ElasticsearchSink to index those JSON directly >>> from Flink but ElasticsearchSink only works with streaming environment. >>> >>> Is there any bridge class for this? >>> >>> Best, >>> Flavio >>> >>> > > > -- > Flavio Pompermaier > Development Department > > OKKAM S.r.l. > Tel. +(39) 0461 1823908 <+39%200461%20182%203908> > > -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 1823908