Hi Yuval - Here's a brief summary f what we are trying to do ..
At the library level we have this .. def buildExecutionGraph(): Unit def executeStreamingQueries(env: StreamExecutionEnvironment): JobExecutionResult = { buildExecutionGraph() env.execute(s"Executing $streamletRef") } and we do the following .. // note this ctx is created outside the Future val jobResult = Future(createLogic.executeStreamingQueries(ctx.env)) and at the application level we have something like this .. override def buildExecutionGraph = { val rides: DataStream[TaxiRide] = readStream(inTaxiRide) // reads from Kafka .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) writeStream(out, processed) // writes to Kafka } It fails only when we use the Future, otherwise it works .. regards. On Thu, Sep 19, 2019 at 1:16 PM Yuval Itzchakov <yuva...@gmail.com> wrote: > Debshish, could you share an example of before and after of your classes > for future reference? > > On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, <ghosh.debas...@gmail.com> > wrote: > >> We solved the problem of serialization by making some things transient >> which were being captured as part of the closure. So we no longer have >> serialization errors. Everything works properly without the future. >> >> I realize that because of statics concurrent job submission will be an >> issue. But we are submitting one job only - the difference is that it's >> through a Future. So there is no concurrent submission unless I am missing >> something. >> >> regards. >> >> On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <mmyy1...@gmail.com> wrote: >> >>> Hi Debasish, >>> >>> I think there is something critical of your usage hided. It might help >>> if you could provide more details. >>> >>> It still confuses me how you solve the serialization issue. Why the >>> non-transient fields only affects serialization in a future? >>> >>> WRT this ProgramAbortException issue, do you submit jobs concurrently in >>> one process? >>> Currently job submission is not thread-safe. It relies on some static >>> variables which could be affected by other concurrent submissions in the >>> same process. >>> Asking this because usually job submission is not through >>> OptimizerPlanEnvironment which appears in your exception stack trace. >>> >>> Thanks, >>> Biao /'bɪ.aʊ/ >>> >>> >>> >>> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> I think what you are pointing at is asynchronous datastream operations. >>>> In our case we want to submit the entire job in a Future. Something like >>>> the following .. >>>> >>>> def execute(..) = { >>>> // this does all data stream manipulation, joins etc. >>>> buildComputationGraph() >>>> >>>> // submits for execution with StreamExecutionEnvironment >>>> env.execute(..) >>>> } >>>> >>>> and we want to do .. >>>> >>>> val jobExecutionResultFuture = Future(execute(..)) >>>> >>>> and this gives that exception. >>>> >>>> regards. >>>> >>>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <rafi.ar...@gmail.com> >>>> wrote: >>>> >>>>> Hi Debasish, >>>>> >>>>> Have you taken a look at the AsyncIO API for running async operations? >>>>> I think this is the preferred way of doing it. [1] >>>>> So it would look something like this: >>>>> >>>>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, >>>>> String)] { >>>>> >>>>> /** The database specific client that can issue concurrent requests >>>>> with callbacks */ >>>>> lazy val client: DatabaseClient = new DatabaseClient(host, post, >>>>> credentials) >>>>> >>>>> /** The context used for the future callbacks */ >>>>> implicit lazy val executor: ExecutionContext = >>>>> ExecutionContext.fromExecutor(Executors.directExecutor()) >>>>> >>>>> >>>>> override def asyncInvoke(str: String, resultFuture: >>>>> ResultFuture[(String, String)]): Unit = { >>>>> >>>>> // issue the asynchronous request, receive a future for the >>>>> result val resultFutureRequested: Future[String] = >>>>> client.query(str) >>>>> >>>>> // set the callback to be executed once the request by the client >>>>> is complete // the callback simply forwards the result to the >>>>> result future resultFutureRequested.onSuccess { >>>>> case result: String => resultFuture.complete(Iterable((str, >>>>> result))) >>>>> } >>>>> }} >>>>> >>>>> // create the original streamval stream: DataStream[String] = ... >>>>> // apply the async I/O transformationval resultStream: >>>>> DataStream[(String, String)] = >>>>> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), >>>>> 1000, TimeUnit.MILLISECONDS, 100) >>>>> >>>>> >>>>> Thanks, >>>>> Rafi >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api >>>>> >>>>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> ok, the above problem was due to some serialization issues which we >>>>>> fixed by marking some of the things transient. This fixes the >>>>>> serialization >>>>>> issues .. But now when I try to execute in a Future I hit upon this .. >>>>>> >>>>>> >>>>>> *java.util.concurrent.ExecutionException: Boxed Error* at >>>>>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >>>>>> at >>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >>>>>> at >>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >>>>>> at scala.concurrent.Promise.complete(Promise.scala:53) >>>>>> at scala.concurrent.Promise.complete$(Promise.scala:52) >>>>>> at >>>>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) >>>>>> at >>>>>> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) >>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) >>>>>> at >>>>>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) >>>>>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >>>>>> at >>>>>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >>>>>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >>>>>> at >>>>>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >>>>>> >>>>>> *Caused by: >>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>> at >>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>> at >>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) >>>>>> at >>>>>> pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) >>>>>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) >>>>>> at scala.util.Success.$anonfun$map$1(Try.scala:255) >>>>>> at scala.util.Success.map(Try.scala:213) >>>>>> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) >>>>>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) >>>>>> ... 7 more >>>>>> >>>>>> I found this issue in JIRA >>>>>> https://issues.apache.org/jira/browse/FLINK-10381 which is still >>>>>> open and talks about a related issue. But we are not submitting multiple >>>>>> jobs - we are just submitting 1 job but async in a Future. I am not clear >>>>>> why this should create the problem that I see. >>>>>> >>>>>> Can anyone please help with an explanation ? >>>>>> >>>>>> regards. >>>>>> >>>>>> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com> wrote: >>>>>> >>>>>>> I think the issue may not be linked with Future. What happens is >>>>>>> when this piece of code is executed .. >>>>>>> >>>>>>> val rides: DataStream[TaxiRide] = >>>>>>> readStream(inTaxiRide) >>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>> .keyBy("rideId") >>>>>>> >>>>>>> val fares: DataStream[TaxiFare] = >>>>>>> readStream(inTaxiFare) >>>>>>> .keyBy("rideId") >>>>>>> >>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>> rides >>>>>>> .connect(fares) >>>>>>> .flatMap(new EnrichmentFunction) >>>>>>> >>>>>>> somehow the ClosureCleaner gets executed as evident from the >>>>>>> following which tries to serialize Avro data. Is there any way to pass >>>>>>> the >>>>>>> custom avro serializer that I am using ? >>>>>>> >>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, >>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>>> type:FLOAT >>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] >>>>>>> is >>>>>>> not serializable. The object probably contains or references non >>>>>>> serializable fields. >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>>> at >>>>>>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) >>>>>>> at >>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) >>>>>>> at >>>>>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) >>>>>>> at >>>>>>> pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43) >>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30) >>>>>>> at pipelines.runner.Runner.main(Runner.scala) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>> Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>>>>>> at >>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >>>>>>> at >>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>>>>> at >>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: java.io.NotSerializableException: >>>>>>> org.apache.avro.Schema$Field >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>>> at java.util.ArrayList.writeObject(ArrayList.java:766) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>> Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>>> at >>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133) >>>>>>> >>>>>>> >>>>>>> I also tried the following .. >>>>>>> >>>>>>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, >>>>>>> TaxiFare, TaxiRideFare] { >>>>>>> >>>>>>> @transient var rideState: ValueState[TaxiRide] = null >>>>>>> @transient var fareState: ValueState[TaxiFare] = null >>>>>>> >>>>>>> override def open(params: Configuration): Unit = { >>>>>>> super.open(params) >>>>>>> rideState = getRuntimeContext.getState( >>>>>>> new ValueStateDescriptor[TaxiRide]("saved ride", >>>>>>> classOf[TaxiRide])) >>>>>>> fareState = getRuntimeContext.getState( >>>>>>> new ValueStateDescriptor[TaxiFare]("saved fare", >>>>>>> classOf[TaxiFare])) >>>>>>> } >>>>>>> >>>>>>> and moved the state initialization to open function. But still get >>>>>>> the same result. >>>>>>> >>>>>>> Help ? >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Debasish, >>>>>>>> >>>>>>>> I guess the reason is something unexpectedly involved in >>>>>>>> serialization due to a reference from inner class (anonymous class or >>>>>>>> lambda expression). >>>>>>>> When Flink serializes this inner class instance, it would also >>>>>>>> serialize all referenced objects, for example, the outer class >>>>>>>> instance. If >>>>>>>> the outer class is not serializable, this error would happen. >>>>>>>> >>>>>>>> You could have a try to move the piece of codes to a named >>>>>>>> non-inner class. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Biao /'bɪ.aʊ/ >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh < >>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>> >>>>>>>>> My main question is why serialisation kicks in when I try to >>>>>>>>> execute within a `Future` and not otherwise. >>>>>>>>> >>>>>>>>> regards. >>>>>>>>> >>>>>>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh < >>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Yes, they are generated from Avro Schema and implements >>>>>>>>>> Serializable .. >>>>>>>>>> >>>>>>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma < >>>>>>>>>> deepakmc...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Does TaxiRide or TaxiRideFare implements Serializable? >>>>>>>>>>> >>>>>>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh < >>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello - >>>>>>>>>>>> >>>>>>>>>>>> The following piece of code is an example of a connected data >>>>>>>>>>>> streams .. >>>>>>>>>>>> >>>>>>>>>>>> val rides: DataStream[TaxiRide] = >>>>>>>>>>>> readStream(inTaxiRide) >>>>>>>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>> >>>>>>>>>>>> val fares: DataStream[TaxiFare] = >>>>>>>>>>>> readStream(inTaxiFare) >>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>> >>>>>>>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>>>>>>> rides >>>>>>>>>>>> .connect(fares) >>>>>>>>>>>> .flatMap(new EnrichmentFunction) >>>>>>>>>>>> >>>>>>>>>>>> When I execute the above logic using >>>>>>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine. >>>>>>>>>>>> But if I try to execute the above from within a >>>>>>>>>>>> scala.concurrent.Future, I get the following exception .. >>>>>>>>>>>> >>>>>>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG >>>>>>>>>>>> pos:2, >>>>>>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>>>>>>>> type:FLOAT >>>>>>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG >>>>>>>>>>>> pos:10] is >>>>>>>>>>>> not serializable. The object probably contains or references non >>>>>>>>>>>> serializable fields. >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>>>>>>>> ... >>>>>>>>>>>> >>>>>>>>>>>> Caused by: java.io.NotSerializableException: >>>>>>>>>>>> org.apache.avro.Schema$Field >>>>>>>>>>>> at >>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>>>>>>> >>>>>>>>>>>> Any thoughts why this may happen ? >>>>>>>>>>>> >>>>>>>>>>>> regards. >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>> >>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Thanks >>>>>>>>>>> Deepak >>>>>>>>>>> www.bigdatabig.com >>>>>>>>>>> www.keosha.net >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Debasish Ghosh >>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>> http://manning.com/ghosh >>>>>>>>>> >>>>>>>>>> Twttr: @debasishg >>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>> >>>>>>>>> -- >>>>>>>>> Sent from my iPhone >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg