Try setting the master to local[4]
On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.b...@gmail.com> wrote: > This is a simply scalatest. I start a SparkConf, set the master to local > (set the serializer etc), pull up kafka and es connection send a message to > kafka and wait 30sec to processing. > > It's run in IDEA no magick trick. > > b0c1 > > > ---------------------------------------------------------------------------------------------------------------------------------- > Skype: boci13, Hangout: boci.b...@gmail.com > > > On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> So a few quick questions: >> >> 1) What cluster are you running this against? Is it just local? Have you >> tried local[4]? >> 2) When you say breakpoint, how are you setting this break point? There >> is a good chance your breakpoint mechanism doesn't work in a distributed >> environment, could you instead cause a side effect (like writing to a file)? >> >> Cheers, >> >> Holden :) >> >> >> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote: >> >>> Ok I found dynamic resources, but I have a frustrating problem. This is >>> the flow: >>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save >>> >>> My problem is: if I do this it's not work, the enrich functions not >>> called, but if I put a print it's does. for example if I do this: >>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD >>> >>> The enrich X and enrich Y called but enrich Z not >>> if I put the print after the enrich Z it's will be printed. How can I >>> solve this? (what can I do to call the foreachRDD I put breakpoint inside >>> the map function (where I'm generate the writable) but it's not called) >>> >>> Any idea? >>> >>> b0c1 >>> >>> >>> >>> >>> ---------------------------------------------------------------------------------------------------------------------------------- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> >>> >>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote: >>> >>>> Another question. In the foreachRDD I will initialize the JobConf, but >>>> in this place how can I get information from the items? >>>> I have an identifier in the data which identify the required ES index >>>> (so how can I set dynamic index in the foreachRDD) ? >>>> >>>> b0c1 >>>> >>>> >>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>> >>>> >>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> Just your luck I happened to be working on that very talk today :) Let >>>>> me know how your experiences with Elasticsearch & Spark go :) >>>>> >>>>> >>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: >>>>> >>>>>> Wow, thanks your fast answer, it's help a lot... >>>>>> >>>>>> b0c1 >>>>>> >>>>>> >>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>> >>>>>> >>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> Hi b0c1, >>>>>>> >>>>>>> I have an example of how to do this in the repo for my talk as well, >>>>>>> the specific example is at >>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and >>>>>>> then call saveAsHadoopDataset on the RDD that gets passed into the >>>>>>> function we provide to foreachRDD. >>>>>>> >>>>>>> e.g. >>>>>>> >>>>>>> stream.foreachRDD{(data, time) => >>>>>>> val jobconf = ... >>>>>>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>>>>>> } >>>>>>> >>>>>>> Hope that helps :) >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Holden :) >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks. I without local option I can connect with es remote, now I >>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark >>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my >>>>>>>> second problem the output index is depend by the input data. >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>>> >>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your >>>>>>>>> project to user the ESInputFormat and ESOutputFormat ( >>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some >>>>>>>>> other basics here: >>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>>>>>> >>>>>>>>> For testing, yes I think you will need to start ES in local mode >>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = >>>>>>>>> localhost, >>>>>>>>> port = 9200). >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I >>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>>>>>>>> programatically? (if I can)) >>>>>>>>>> >>>>>>>>>> b0c1 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau < >>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to >>>>>>>>>>>> create es connection, but in prodution I want to use >>>>>>>>>>>> ElasticClient.remote, >>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what >>>>>>>>>>>> is the best practices? >>>>>>>>>>>> >>>>>>>>>>> In this case you probably want to make the ElasticClient inside >>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to >>>>>>>>>>> use a >>>>>>>>>>> different client in local mode just have a flag that control what >>>>>>>>>>> type of >>>>>>>>>>> client you create. >>>>>>>>>>> >>>>>>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local >>>>>>>>>>>> environment? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>>>>>> >>>>>>>>>>> I think the simplest thing to do would be use the same client in >>>>>>>>>>> mode and just start single node elastic search cluster. >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks guys >>>>>>>>>>>> >>>>>>>>>>>> b0c1 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau < >>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which >>>>>>>>>>>>> uses >>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & >>>>>>>>>>>>> dirty >>>>>>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the >>>>>>>>>>>>> difficulty of >>>>>>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>>>>>> >>>>>>>>>>>>> This approach might not work for your data, e.g. if you need >>>>>>>>>>>>> to create a query for each record in your RDD. If this is the >>>>>>>>>>>>> case, you >>>>>>>>>>>>> could instead look at using mapPartitions and setting up your >>>>>>>>>>>>> Elasticsearch >>>>>>>>>>>>> connection inside of that, so you could then re-use the client >>>>>>>>>>>>> for all of >>>>>>>>>>>>> the queries on each partition. This approach will avoid having to >>>>>>>>>>>>> serialize >>>>>>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>>>>>> function. >>>>>>>>>>>>> >>>>>>>>>>>>> Hope this helps! >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> >>>>>>>>>>>>> Holden :) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>>>>>>> serialize that class if you are sending it to spark workers >>>>>>>>>>>>>> inside a map, >>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Mayur Rustagi >>>>>>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au >>>>>>>>>>>>>> > wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a >>>>>>>>>>>>>>> dangerous act as they >>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your >>>>>>>>>>>>>>> ES server may >>>>>>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think its possible to invoke a static method that give you >>>>>>>>>>>>>>> a connection in >>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but >>>>>>>>>>>>>>> its too complex >>>>>>>>>>>>>>> and there should be a better option. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should >>>>>>>>>>>>>>> use it as the >>>>>>>>>>>>>>> default serializer >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> View this message in context: >>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>>>>> Nabble.com. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Cell : 425-233-8271 >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Cell : 425-233-8271 >>>>> >>>> >>>> >>> >> >> >> -- >> Cell : 425-233-8271 >> > > -- Cell : 425-233-8271