Great! I was just thinking that, in principle, a streaming sink is an
extension of a batch one. Am I wrong?
This would avoid a lot of code duplication and would improve the overall
maintainability..

On Thu, May 11, 2017 at 4:35 AM, wyphao.2007 <wyphao.2...@163.com> wrote:

> Hi Flavio, I made a PR for this : https://github.com/apache/
> flink/pull/3869
> And it also support ActionRequestFailureHandler in DataSet's
> ElasticsearchSink
>
> Best
>
> 在2017年05月09 15时30分, "Flavio Pompermaier"<pomperma...@okkam.it>写道:
>
>
> Just one note: I took a look at your connector and it doesn't provide any
> failure handling mechanism that is very useful for us.
> Maybe it could worth to add ActionRequestFailureHandler as provided now
> by the current ES streaming connector and introduced by commit
> https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f
> 86617bb5ef
>
> Best,
> Flavio
>
> On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Thanks a lot for the support!
>>
>> On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> Thanks for sharing that repo! I think that would be quite an useful
>>> contribution to Flink for the users, if you’re up to preparing a PR for it
>>> :)
>>>
>>> It also looks like you’ve adopted most of the current ElasticsearchSink
>>> APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the
>>> ElasticsearchOutputFormat, which is nice to fit into the current code :-D
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2...@163.com) wrote:
>>>
>>> Hi Flavio
>>>
>>> Maybe this is what you want: https://github.com/397090770/f
>>> link-elasticsearch2-connector, It can save Flink DataSet to
>>> elasticsearch.
>>>
>>> import scala.collection.JavaConversions._
>>>
>>> val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> 
>>> "elasticsearch")val hosts = "www.iteblog.com"val transports = 
>>> hosts.split(",").map(host => new 
>>> InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : 
>>> DataSet[String] = ....
>>> data.output(new ElasticSearchOutputFormat(config, transports, new 
>>> ElasticsearchSinkFunction[String] {      def createIndexRequest(element: 
>>> String): IndexRequest = {        
>>> Requests.indexRequest.index("iteblog").`type`("info").source(element)
>>>       }      override def process(element: String, ctx: RuntimeContext, 
>>> indexer: RequestIndexer) {
>>>         indexer.add(createIndexRequest(element))
>>>       }
>>> }))
>>>
>>>
>>> I hope this could help you
>>>
>>> 在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<tzuli...@apache.org>写道:
>>>
>>>
>>> Hi Flavio,
>>>
>>> I don’t think there is a bridge class for this. At the moment you’ll
>>> have to implement your own OutputFormat.
>>> The ElasticsearchSink is a SinkFunction which is part of the DataStream
>>> API, which generally speaking at the moment has no bridge or unification
>>> yet with the DataSet API.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it)
>>> wrote:
>>>
>>>
>>> Hi to all,
>>> at the moment I have a Flink Job that generates a DataSet<String> that I
>>> write to a File that is read by Logstash to index data on ES.
>>> I'd like to use the new ElasticsearchSink to index those JSON directly
>>> from Flink but ElasticsearchSink only works with streaming environment.
>>>
>>> Is there any bridge class for this?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 1823908 <+39%200461%20182%203908>
>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908

Reply via email to