Hi Till, Sorry to bother you again, so I manage to build and work with akka http client in my local After deploy to yarn node, the actorsystem cant be connected. ``` PPLogger.getActivityLogger.info("########### 1") implicit val system = ActorSystem("my-system") PPLogger.getActivityLogger.info("########### 2") ``` So the line ### 2 was never print, and the method ended up timeout. Honestly I dont know how to debug with this case.
I’m just curious how people ended up using any async http client without hassle? Thanks, Andy, > On Apr 12, 2019, at 10:23 PM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Andy, > > you can do some micro benchmarks where you instantiate your AsyncHttpClient > and call the invoke method. But better would be to benchmark it end-to-end by > running it on a cluster with a realistic workload which you also expect to > occur in production. > > Cheers, > Till > > On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <a...@parcelperform.com > <mailto:a...@parcelperform.com>> wrote: > Hi Till, > Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that > feature: https://issues.apache.org/jira/browse/FLINK-6756 > <https://issues.apache.org/jira/browse/FLINK-6756> > Meanwhile I can reimplement it in the copy-patse manner but I’m still curious > if my AsyncHttpClient > work nicely or not, what would be the down side when you look at it. > I understand the open/close method is will help in term of init/cleaning > resource, but how can we benchmark the solution to make sure one is better > than the other? What is the key to decide here or we have to try it in > production first? > Thank a lot, again > > Andy, > > >> On Apr 12, 2019, at 2:44 PM, Till Rohrmann <trohrm...@apache.org >> <mailto:trohrm...@apache.org>> wrote: >> >> Hi Andy, >> >> there is also a Scala version of the `RichAsyncFunction`. >> >> In Scala you have to specify a value for class members. This is different >> from Java. >> >> User code is first instantiated on the client where you create the job >> topology (basically where you call new RichAsyncHttpClient). The code is >> then serialized and shipped to the cluster where it is actually executed. >> >> Cheers, >> Till >> >> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <a...@parcelperform.com >> <mailto:a...@parcelperform.com>> wrote: >> Hi Till, >> Thanks for your reply, I manage do some experiments and has result as some >> worked and some not. I hope you can give me a bit more insight: >> >> As your suggestion to impl a `RichAsyncFunction` with transient field, like >> this and having error >> >> ``` >> Class 'RichAsyncHttpClient' must either be declared abstract or implement >> abstract member 'executionContext: ExecutionContextExecutor' in >> ‘com.parcelperform.util.RichAsyncHttpClient’ >> ``` >> >> ``` >> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, >> Either[Throwable, ResponseEntity]]{ >> >> PPLogger.getActivityLogger.info("###########INIT ------------------- ") >> @transient implicit var materializer: ActorMaterializer >> @transient implicit var system: ActorSystem >> @transient implicit var executionContext: ExecutionContextExecutor >> >> >> override def asyncInvoke(input: Shipment, resultFuture: >> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >> >> val resultFutureRequested: Future[HttpResponse] = >> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json >> <https://httpbin.org/json>")) >> >> resultFutureRequested.onComplete { >> case Success(res) => { >> resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection) >> } >> case Failure(x) => { >> resultFuture.complete(Iterable(Either(x)).asJavaCollection) >> } >> } >> } >> >> override def open(parameters: Configuration): Unit = { >> super.open(parameters) >> system = ActorSystem("my-system") >> executionContext = system.dispatcher >> materializer = ActorMaterializer() >> } >> } >> ``` >> >> Aslo the Usage of that class, I has error, I guess its because of java/scala >> issue. In flink doc, for java code they use RichAsyncFunction and for scala >> they use AsyncFunction: >> ``` >> // AsyncDataStream.unorderedWait(streamShipment, new >> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type >> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: >> RichAsyncHttpClient >> >> ``` >> >> ### >> >> So I try to fix my current code again with transient field and move it into >> constructor: >> ``` >> >> >> class AsyncHttpClient( args: Array[String] = Array()) extends >> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ >> >> @transient implicit lazy val system = { >> PPLogger.getActivityLogger.info("###########INIT ------------------- ") >> ActorSystem("my-system") >> } >> @transient implicit lazy val executionContext = { >> system.dispatcher >> } >> @transient implicit lazy val materializer: ActorMaterializer = { >> PPLogger.getActivityLogger.info("###########DONE ------------------- ") >> ActorMaterializer() >> } >> >> override def asyncInvoke(input: Shipment, resultFuture: >> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >> // PPLogger.getActivityLogger.info >> <http://pplogger.getactivitylogger.info/>("###########INIT >> ------------------- ") >> // implicit val system = ActorSystem("my-system") >> // implicit val executionContext = system.dispatcher >> // implicit val materializer: ActorMaterializer = ActorMaterializer() >> // PPLogger.getActivityLogger.info >> <http://pplogger.getactivitylogger.info/>("###########DONE >> ------------------- ") >> >> val resultFutureRequested: Future[HttpResponse] = >> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json >> <https://httpbin.org/json>")) >> resultFutureRequested.onComplete { >> case Success(res) => { >> resultFuture.complete(Iterable(Right(res.entity))) >> } >> case Failure(x) => { >> resultFuture.complete(Iterable(Left(x))) >> } >> } >> >> } >> >> override def timeout(input: Shipment, resultFuture: >> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >> resultFuture.complete(Iterable(Left(new TimeoutException("Async function >> call has timed out.")))) >> } >> } >> ``` >> And its run ok. The log was print only one. >> >> I still asking about this because I haven’t understand the term `That way >> you only create the `ActorMaterialier` on the `TaskManager` where the >> operator is executed and solve the problem of serializability `. I though >> for all the code executed inside TaskManger? >> >> Thanks for being patient with me, till here >> >> Andy, >> >> >> >> >>> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org >>> <mailto:trohrm...@apache.org>> wrote: >>> >>> Hi Andy, >>> >>> without being an expert of Akka's http client, I think you should not >>> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. >>> What I would recommend you instead is to implement a `RichAsyncFunction` >>> with a transient field for `ActorMaterializer` which you initialize in the >>> `RichAsyncFunction#open` method. That way you only create the >>> `ActorMaterialier` on the `TaskManager` where the operator is executed and >>> solve the problem of serializability and you make it much more efficient >>> because you don't create a new `ActorSystem` for every request. >>> >>> Cheers, >>> Till >>> >>> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com >>> <mailto:a...@parcelperform.com>> wrote: >>> Hi guys, >>> >>> I’m try to decide which http client to go with Flink, currently I tested >>> with scalaj and akka http client and both work ok with our current dev >>> environment. >>> For scalaj its is pretty straight forward since its is just calling an http >>> request with its timeout. >>> >>> For akka http client its a bit more complicated (I’m new to scala and all), >>> so I’m asking if am I doing it right by create a AsyncFunction like this >>> ``` >>> class AsyncHttpClient( args: Array[String] = Array()) extends >>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ >>> >>> override def asyncInvoke(input: Shipment, resultFuture: >>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>> >>> PPLogger.getActivityLogger.info("###########INIT ------------------- ") >>> implicit val system = ActorSystem("my-system") >>> implicit val executionContext = system.dispatcher >>> implicit val materializer: ActorMaterializer = ActorMaterializer() >>> val resultFutureRequested: Future[HttpResponse] = >>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json >>> <https://httpbin.org/json>")) >>> PPLogger.getActivityLogger.info("###########DONE ------------------- ") >>> >>> >>> resultFutureRequested.onComplete { >>> case Success(res) => { >>> resultFuture.complete(Iterable(Right(res.entity))) >>> } >>> case Failure(x) => { >>> resultFuture.complete(Iterable(Left(x))) >>> } >>> } >>> >>> } >>> >>> override def timeout(input: Shipment, resultFuture: >>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { >>> resultFuture.complete(Iterable(Left(new TimeoutException("Async >>> function call has timed out.")))) >>> } >>> } >>> ``` >>> I notice that I have to implicit create a bunch of variable inside the >>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the >>> overhead. I did try to init them in the constructor of the class but the >>> compiler just throw a bunch of Not implemented Serializer error. >>> >>> My lib: >>> "com.typesafe.akka" %% "akka-http" % "10.1.8", >>> "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test, >>> >>> My flink: >>> scala 2.12 >>> flink 1.70 >>> >>> >>> >>> Any reply are appreciated! >>> >>> Thanks a bunch >>> >>> Andy, >>> >>> >>> >> >