Check the logs what Akka is logging and verify that the port you try to bind to is free.
Cheers, Till On Wed, Apr 17, 2019 at 12:50 PM Andy Hoang <a...@parcelperform.com> wrote: > Hi Till, > > Sorry to bother you again, so I manage to build and work with akka http > client in my local > After deploy to yarn node, the actorsystem cant be connected. > ``` > PPLogger.getActivityLogger.info("########### 1") > implicit val system = ActorSystem("my-system") > PPLogger.getActivityLogger.info("########### 2") > ``` > So the line ### 2 was never print, and the method ended up timeout. > Honestly I dont know how to debug with this case. > > I’m just curious how people ended up using any async http client without > hassle? > > Thanks, > > Andy, > > On Apr 12, 2019, at 10:23 PM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Andy, > > you can do some micro benchmarks where you instantiate your > AsyncHttpClient and call the invoke method. But better would be to > benchmark it end-to-end by running it on a cluster with a realistic > workload which you also expect to occur in production. > > Cheers, > Till > > On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <a...@parcelperform.com> > wrote: > >> Hi Till, >> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use >> that feature: https://issues.apache.org/jira/browse/FLINK-6756 >> Meanwhile I can reimplement it in the copy-patse manner but I’m still >> curious if my AsyncHttpClient >> work nicely or not, what would be the down side when you look at it. >> I understand the open/close method is will help in term of init/cleaning >> resource, but how can we benchmark the solution to make sure one is better >> than the other? What is the key to decide here or we have to try it in >> production first? >> Thank a lot, again >> >> Andy, >> >> >> On Apr 12, 2019, at 2:44 PM, Till Rohrmann <trohrm...@apache.org> wrote: >> >> Hi Andy, >> >> there is also a Scala version of the `RichAsyncFunction`. >> >> In Scala you have to specify a value for class members. This is different >> from Java. >> >> User code is first instantiated on the client where you create the job >> topology (basically where you call new RichAsyncHttpClient). The code is >> then serialized and shipped to the cluster where it is actually executed. >> >> Cheers, >> Till >> >> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <a...@parcelperform.com> >> wrote: >> >>> Hi Till, >>> Thanks for your reply, I manage do some experiments and has result as >>> some worked and some not. I hope you can give me a bit more insight: >>> >>> As your suggestion to impl a `RichAsyncFunction` with transient field, >>> like this and having error >>> >>> ``` >>> Class 'RichAsyncHttpClient' must either be declared abstract or >>> implement abstract member 'executionContext: ExecutionContextExecutor' in >>> ‘com.parcelperform.util.RichAsyncHttpClient’ >>> ``` >>> >>> ``` >>> >>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, >>> Either[Throwable, ResponseEntity]]{ >>> >>> PPLogger.getActivityLogger.info("###########INIT ------------------- ") >>> @transient implicit var materializer: ActorMaterializer >>> @transient implicit var system: ActorSystem >>> @transient implicit var executionContext: ExecutionContextExecutor >>> >>> >>> override def asyncInvoke(input: Shipment, resultFuture: >>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>> >>> val resultFutureRequested: Future[HttpResponse] = >>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) >>> >>> resultFutureRequested.onComplete { >>> case Success(res) => { >>> resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection) >>> } >>> case Failure(x) => { >>> resultFuture.complete(Iterable(Either(x)).asJavaCollection) >>> } >>> } >>> } >>> >>> override def open(parameters: Configuration): Unit = { >>> super.open(parameters) >>> system = ActorSystem("my-system") >>> executionContext = system.dispatcher >>> materializer = ActorMaterializer() >>> } >>> } >>> >>> ``` >>> >>> Aslo the Usage of that class, I has error, I guess its because of >>> java/scala issue. In flink doc, for java code they use RichAsyncFunction >>> and for scala they use AsyncFunction: >>> ``` >>> // AsyncDataStream.unorderedWait(streamShipment, new >>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type >>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: >>> RichAsyncHttpClient >>> >>> ``` >>> >>> ### >>> >>> So I try to fix my current code again with transient field and move it >>> into constructor: >>> ``` >>> >>> >>> >>> class AsyncHttpClient( args: Array[String] = Array()) extends >>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ >>> >>> @transient implicit lazy val system = { >>> PPLogger.getActivityLogger.info("###########INIT ------------------- ") >>> ActorSystem("my-system") >>> } >>> @transient implicit lazy val executionContext = { >>> system.dispatcher >>> } >>> @transient implicit lazy val materializer: ActorMaterializer = { >>> PPLogger.getActivityLogger.info("###########DONE ------------------- ") >>> ActorMaterializer() >>> } >>> >>> override def asyncInvoke(input: Shipment, resultFuture: >>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>> // PPLogger.getActivityLogger.info >>> <http://pplogger.getactivitylogger.info/>("###########INIT >>> ------------------- ") >>> // implicit val system = ActorSystem("my-system") >>> // implicit val executionContext = system.dispatcher >>> // implicit val materializer: ActorMaterializer = ActorMaterializer() >>> // PPLogger.getActivityLogger.info >>> <http://pplogger.getactivitylogger.info/>("###########DONE >>> ------------------- ") >>> >>> val resultFutureRequested: Future[HttpResponse] = >>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) >>> resultFutureRequested.onComplete { >>> case Success(res) => { >>> resultFuture.complete(Iterable(Right(res.entity))) >>> } >>> case Failure(x) => { >>> resultFuture.complete(Iterable(Left(x))) >>> } >>> } >>> >>> } >>> >>> override def timeout(input: Shipment, resultFuture: >>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>> resultFuture.complete(Iterable(Left(new TimeoutException("Async >>> function call has timed out.")))) >>> } >>> } >>> >>> ``` >>> And its run ok. The log was print only one. >>> >>> I still asking about this because I haven’t understand the term `That >>> way you only create the `ActorMaterialier` on the `TaskManager` where the >>> operator is executed and solve the problem of serializability `. I though >>> for all the code executed inside TaskManger? >>> >>> Thanks for being patient with me, till here >>> >>> Andy, >>> >>> >>> >>> >>> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org> wrote: >>> >>> Hi Andy, >>> >>> without being an expert of Akka's http client, I think you should not >>> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. >>> What I would recommend you instead is to implement a `RichAsyncFunction` >>> with a transient field for `ActorMaterializer` which you initialize in the >>> `RichAsyncFunction#open` method. That way you only create the >>> `ActorMaterialier` on the `TaskManager` where the operator is executed and >>> solve the problem of serializability and you make it much more efficient >>> because you don't create a new `ActorSystem` for every request. >>> >>> Cheers, >>> Till >>> >>> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com> >>> wrote: >>> >>>> Hi guys, >>>> >>>> I’m try to decide which http client to go with Flink, currently I >>>> tested with scalaj and akka http client and both work ok with our current >>>> dev environment. >>>> For scalaj its is pretty straight forward since its is just calling an >>>> http request with its timeout. >>>> >>>> For akka http client its a bit more complicated (I’m new to scala and >>>> all), so I’m asking if am I doing it right by create a AsyncFunction like >>>> this >>>> ``` >>>> >>>> class AsyncHttpClient( args: Array[String] = Array()) extends >>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ >>>> >>>> override def asyncInvoke(input: Shipment, resultFuture: >>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>>> >>>> PPLogger.getActivityLogger.info("###########INIT ------------------- ") >>>> implicit val system = ActorSystem("my-system") >>>> implicit val executionContext = system.dispatcher >>>> implicit val materializer: ActorMaterializer = ActorMaterializer() >>>> val resultFutureRequested: Future[HttpResponse] = >>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) >>>> PPLogger.getActivityLogger.info("###########DONE ------------------- ") >>>> >>>> >>>> resultFutureRequested.onComplete { >>>> case Success(res) => { >>>> resultFuture.complete(Iterable(Right(res.entity))) >>>> } >>>> case Failure(x) => { >>>> resultFuture.complete(Iterable(Left(x))) >>>> } >>>> } >>>> >>>> } >>>> >>>> override def timeout(input: Shipment, resultFuture: >>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>>> resultFuture.complete(Iterable(Left(new TimeoutException("Async >>>> function call has timed out.")))) >>>> } >>>> } >>>> >>>> ``` >>>> I notice that I have to implicit create a bunch of variable inside the >>>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the >>>> overhead. I did try to init them in the constructor of the class but the >>>> compiler just throw a bunch of Not implemented Serializer error. >>>> >>>> My lib: >>>> "com.typesafe.akka" %% "akka-http" % "10.1.8", >>>> "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test, >>>> >>>> My flink: >>>> scala 2.12 >>>> flink 1.70 >>>> >>>> >>>> >>>> Any reply are appreciated! >>>> >>>> Thanks a bunch >>>> >>>> Andy, >>>> >>>> >>>> >>>> >>> >> >