Hi Till,

Sorry to bother you again, so I manage to build and work with akka http client 
in my local
After deploy to yarn node, the actorsystem cant be connected.
 ```
PPLogger.getActivityLogger.info("########### 1")
implicit  val system = ActorSystem("my-system")
PPLogger.getActivityLogger.info("########### 2")
```
So the line ### 2 was never print, and the method ended up timeout. Honestly I 
dont know how to debug with this case.

I’m just curious how people ended up using any async http client without hassle?

Thanks,

Andy,

> On Apr 12, 2019, at 10:23 PM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Andy,
> 
> you can do some micro benchmarks where you instantiate your AsyncHttpClient 
> and call the invoke method. But better would be to benchmark it end-to-end by 
> running it on a cluster with a realistic workload which you also expect to 
> occur in production.
> 
> Cheers,
> Till
> 
> On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <a...@parcelperform.com 
> <mailto:a...@parcelperform.com>> wrote:
> Hi Till,
> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that 
> feature: https://issues.apache.org/jira/browse/FLINK-6756 
> <https://issues.apache.org/jira/browse/FLINK-6756>
> Meanwhile I can reimplement it in the copy-patse manner but I’m still curious 
> if my AsyncHttpClient
>  work nicely or not, what would be the down side when you look at it.
> I understand the open/close method is will help in term of init/cleaning 
> resource, but how can we benchmark the solution to make sure one is better 
> than the other? What is the key to decide here or we have to try it in 
> production first?
> Thank a lot, again
> 
> Andy,
> 
> 
>> On Apr 12, 2019, at 2:44 PM, Till Rohrmann <trohrm...@apache.org 
>> <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi Andy,
>> 
>> there is also a Scala version of the `RichAsyncFunction`.
>> 
>> In Scala you have to specify a value for class members. This is different 
>> from Java.
>> 
>> User code is first instantiated on the client where you create the job 
>> topology (basically where you call new RichAsyncHttpClient). The code is 
>> then serialized and shipped to the cluster where it is actually executed.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <a...@parcelperform.com 
>> <mailto:a...@parcelperform.com>> wrote:
>> Hi Till, 
>> Thanks for your reply, I manage do some experiments and has result as some 
>> worked and some not. I hope you can give me a bit more insight:
>> 
>> As your suggestion to impl a `RichAsyncFunction` with transient field, like 
>> this and having error
>> 
>> ```
>> Class 'RichAsyncHttpClient' must either be declared abstract or implement 
>> abstract member 'executionContext: ExecutionContextExecutor' in 
>> ‘com.parcelperform.util.RichAsyncHttpClient’
>> ```
>> 
>> ```
>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
>> Either[Throwable, ResponseEntity]]{
>> 
>>   PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>   @transient implicit var materializer: ActorMaterializer
>>   @transient implicit var system: ActorSystem
>>   @transient implicit var executionContext: ExecutionContextExecutor
>> 
>> 
>>   override def asyncInvoke(input: Shipment, resultFuture: 
>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>> 
>>     val resultFutureRequested: Future[HttpResponse] = 
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json 
>> <https://httpbin.org/json>"))
>> 
>>     resultFutureRequested.onComplete {
>>       case Success(res) => {
>>         resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>>       }
>>       case Failure(x)   => {
>>         resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>>       }
>>     }
>>   }
>> 
>>   override def open(parameters: Configuration): Unit = {
>>     super.open(parameters)
>>     system = ActorSystem("my-system")
>>     executionContext = system.dispatcher
>>     materializer = ActorMaterializer()
>>   }
>> }
>> ```
>> 
>> Aslo the Usage of that class, I has error, I guess its because of java/scala 
>> issue. In flink doc, for java code they use RichAsyncFunction and for scala 
>> they use AsyncFunction:
>> ```
>> //    AsyncDataStream.unorderedWait(streamShipment, new 
>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type 
>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: 
>> RichAsyncHttpClient
>> 
>> ```
>> 
>> ### 
>> 
>> So I try to fix my current code again with transient field and move it into 
>> constructor:
>> ```
>> 
>> 
>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>> 
>>   @transient implicit lazy val system = {
>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>     ActorSystem("my-system")
>>   }
>>   @transient implicit lazy val executionContext = {
>>     system.dispatcher
>>   }
>>   @transient implicit lazy val materializer: ActorMaterializer = {
>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>     ActorMaterializer()
>>   }
>> 
>>   override def asyncInvoke(input: Shipment, resultFuture: 
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>> //    PPLogger.getActivityLogger.info 
>> <http://pplogger.getactivitylogger.info/>("###########INIT 
>> ------------------- ")
>> //    implicit val system = ActorSystem("my-system")
>> //    implicit val executionContext = system.dispatcher
>> //    implicit val materializer: ActorMaterializer = ActorMaterializer()
>> //    PPLogger.getActivityLogger.info 
>> <http://pplogger.getactivitylogger.info/>("###########DONE 
>> ------------------- ")
>> 
>>     val resultFutureRequested: Future[HttpResponse] = 
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json 
>> <https://httpbin.org/json>"))
>>     resultFutureRequested.onComplete {
>>       case Success(res) => {
>>         resultFuture.complete(Iterable(Right(res.entity)))
>>       }
>>       case Failure(x)   => {
>>         resultFuture.complete(Iterable(Left(x)))
>>       }
>>     }
>> 
>>   }
>> 
>>   override def timeout(input: Shipment, resultFuture: 
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
>> call has timed out."))))
>>   }
>> }
>> ```
>> And its run ok. The log was print only one. 
>> 
>> I still asking about this because I haven’t understand the term `That way 
>> you only create the `ActorMaterialier` on the `TaskManager` where the 
>> operator is executed and solve the problem of serializability `. I though 
>> for all the code executed inside TaskManger?
>> 
>> Thanks for being patient with me, till here
>> 
>> Andy,
>> 
>> 
>> 
>> 
>>> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org 
>>> <mailto:trohrm...@apache.org>> wrote:
>>> 
>>> Hi Andy,
>>> 
>>> without being an expert of Akka's http client, I think you should not 
>>> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. 
>>> What I would recommend you instead is to implement a `RichAsyncFunction` 
>>> with a transient field for `ActorMaterializer` which you initialize in the 
>>> `RichAsyncFunction#open` method. That way you only create the 
>>> `ActorMaterialier` on the `TaskManager` where the operator is executed and 
>>> solve the problem of serializability and you make it much more efficient 
>>> because you don't create a new `ActorSystem` for every request.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com 
>>> <mailto:a...@parcelperform.com>> wrote:
>>> Hi guys,
>>> 
>>> I’m try to decide which http client to go with Flink, currently I tested 
>>> with scalaj and akka http client and both work ok with our current dev 
>>> environment.
>>> For scalaj its is pretty straight forward since its is just calling an http 
>>> request with its timeout.
>>> 
>>> For akka http client its a bit more complicated (I’m new to scala and all), 
>>> so I’m asking if am I doing it right by create a AsyncFunction like this
>>> ```
>>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>> 
>>>   override def asyncInvoke(input: Shipment, resultFuture: 
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>> 
>>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>>     implicit val system = ActorSystem("my-system")
>>>     implicit val executionContext = system.dispatcher
>>>     implicit val materializer: ActorMaterializer = ActorMaterializer()
>>>     val resultFutureRequested: Future[HttpResponse] = 
>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json 
>>> <https://httpbin.org/json>"))
>>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>> 
>>> 
>>>     resultFutureRequested.onComplete {
>>>       case Success(res) => {
>>>         resultFuture.complete(Iterable(Right(res.entity)))
>>>       }
>>>       case Failure(x)   => {
>>>         resultFuture.complete(Iterable(Left(x)))
>>>       }
>>>     }
>>> 
>>>   }
>>> 
>>>   override def timeout(input: Shipment, resultFuture: 
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async 
>>> function call has timed out."))))
>>>   }
>>> }
>>> ```
>>> I notice that I have to implicit create a bunch of variable inside the 
>>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the 
>>> overhead. I did try to init them in the constructor of the class but the 
>>> compiler just throw a bunch of Not implemented Serializer error.
>>> 
>>> My lib:
>>>    "com.typesafe.akka" %% "akka-http" % "10.1.8",
>>>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>>> 
>>> My flink:
>>> scala 2.12
>>> flink 1.70
>>> 
>>> 
>>> 
>>> Any reply are appreciated!
>>> 
>>> Thanks a bunch
>>> 
>>> Andy,
>>> 
>>> 
>>> 
>> 
> 

Reply via email to