Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch & Spark go :)
On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: > Wow, thanks your fast answer, it's help a lot... > > b0c1 > > > ---------------------------------------------------------------------------------------------------------------------------------- > Skype: boci13, Hangout: boci.b...@gmail.com > > > On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> Hi b0c1, >> >> I have an example of how to do this in the repo for my talk as well, the >> specific example is at >> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and >> then call saveAsHadoopDataset on the RDD that gets passed into the >> function we provide to foreachRDD. >> >> e.g. >> >> stream.foreachRDD{(data, time) => >> val jobconf = ... >> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >> } >> >> Hope that helps :) >> >> Cheers, >> >> Holden :) >> >> >> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >> >>> Thanks. I without local option I can connect with es remote, now I only >>> have one problem. How can I use elasticsearch-hadoop with spark streaming? >>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem >>> the output index is depend by the input data. >>> >>> Thanks >>> >>> >>> ---------------------------------------------------------------------------------------------------------------------------------- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> >>> >>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>> nick.pentre...@gmail.com> wrote: >>> >>>> You can just add elasticsearch-hadoop as a dependency to your project >>>> to user the ESInputFormat and ESOutputFormat ( >>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >>>> basics here: >>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>> >>>> For testing, yes I think you will need to start ES in local mode (just >>>> ./bin/elasticsearch) and use the default config (host = localhost, port = >>>> 9200). >>>> >>>> >>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>>> >>>>> That's okay, but hadoop has ES integration. what happened if I run >>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>>> programatically? (if I can)) >>>>> >>>>> b0c1 >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>> >>>>> >>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote: >>>>>> >>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>> - in local (test) mode I want to use ElasticClient.local to create >>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to >>>>>>> this >>>>>>> I want to pass ElasticClient to mapPartitions, or what is the best >>>>>>> practices? >>>>>>> >>>>>> In this case you probably want to make the ElasticClient inside of >>>>>> mapPartitions (since it isn't serializable) and if you want to use a >>>>>> different client in local mode just have a flag that control what type of >>>>>> client you create. >>>>>> >>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? >>>>>>> >>>>>>> >>>>>> - After store the enriched data into ES, I want to generate >>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>> >>>>>> I think the simplest thing to do would be use the same client in mode >>>>>> and just start single node elastic search cluster. >>>>>> >>>>>>> >>>>>>> Thanks guys >>>>>>> >>>>>>> b0c1 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses >>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of >>>>>>>> having to manually create ElasticSearch clients. >>>>>>>> >>>>>>>> This approach might not work for your data, e.g. if you need to >>>>>>>> create a query for each record in your RDD. If this is the case, you >>>>>>>> could >>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch >>>>>>>> connection inside of that, so you could then re-use the client for all >>>>>>>> of >>>>>>>> the queries on each partition. This approach will avoid having to >>>>>>>> serialize >>>>>>>> the Elasticsearch connection because it will be local to your function. >>>>>>>> >>>>>>>> Hope this helps! >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Holden :) >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>> >>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>> serialize that class if you are sending it to spark workers inside a >>>>>>>>> map, >>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>> >>>>>>>>> >>>>>>>>> Mayur Rustagi >>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous >>>>>>>>>> act as they >>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES >>>>>>>>>> server may >>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>> >>>>>>>>>> I think its possible to invoke a static method that give you a >>>>>>>>>> connection in >>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its >>>>>>>>>> too complex >>>>>>>>>> and there should be a better option. >>>>>>>>>> >>>>>>>>>> Never use kryo before, if its that good perhaps we should use it >>>>>>>>>> as the >>>>>>>>>> default serializer >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> View this message in context: >>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>> Nabble.com. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Cell : 425-233-8271 >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Cell : 425-233-8271 >>>>>> >>>>> >>>>> >>>> >>> >> >> >> -- >> Cell : 425-233-8271 >> > > -- Cell : 425-233-8271