Check the logs what Akka is logging and verify that the port you try to
bind to is free.

Cheers,
Till

On Wed, Apr 17, 2019 at 12:50 PM Andy Hoang <a...@parcelperform.com> wrote:

> Hi Till,
>
> Sorry to bother you again, so I manage to build and work with akka http
> client in my local
> After deploy to yarn node, the actorsystem cant be connected.
>  ```
> PPLogger.getActivityLogger.info("########### 1")
> implicit  val system = ActorSystem("my-system")
> PPLogger.getActivityLogger.info("########### 2")
> ```
> So the line ### 2 was never print, and the method ended up timeout.
> Honestly I dont know how to debug with this case.
>
> I’m just curious how people ended up using any async http client without
> hassle?
>
> Thanks,
>
> Andy,
>
> On Apr 12, 2019, at 10:23 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Andy,
>
> you can do some micro benchmarks where you instantiate your
> AsyncHttpClient and call the invoke method. But better would be to
> benchmark it end-to-end by running it on a cluster with a realistic
> workload which you also expect to occur in production.
>
> Cheers,
> Till
>
> On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <a...@parcelperform.com>
> wrote:
>
>> Hi Till,
>> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use
>> that feature: https://issues.apache.org/jira/browse/FLINK-6756
>> Meanwhile I can reimplement it in the copy-patse manner but I’m still
>> curious if my AsyncHttpClient
>>  work nicely or not, what would be the down side when you look at it.
>> I understand the open/close method is will help in term of init/cleaning
>> resource, but how can we benchmark the solution to make sure one is better
>> than the other? What is the key to decide here or we have to try it in
>> production first?
>> Thank a lot, again
>>
>> Andy,
>>
>>
>> On Apr 12, 2019, at 2:44 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi Andy,
>>
>> there is also a Scala version of the `RichAsyncFunction`.
>>
>> In Scala you have to specify a value for class members. This is different
>> from Java.
>>
>> User code is first instantiated on the client where you create the job
>> topology (basically where you call new RichAsyncHttpClient). The code is
>> then serialized and shipped to the cluster where it is actually executed.
>>
>> Cheers,
>> Till
>>
>> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <a...@parcelperform.com>
>> wrote:
>>
>>> Hi Till,
>>> Thanks for your reply, I manage do some experiments and has result as
>>> some worked and some not. I hope you can give me a bit more insight:
>>>
>>> As your suggestion to impl a `RichAsyncFunction` with transient field,
>>> like this and having error
>>>
>>> ```
>>> Class 'RichAsyncHttpClient' must either be declared abstract or
>>> implement abstract member 'executionContext: ExecutionContextExecutor' in
>>> ‘com.parcelperform.util.RichAsyncHttpClient’
>>> ```
>>>
>>> ```
>>>
>>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
>>> Either[Throwable, ResponseEntity]]{
>>>
>>>   PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>>   @transient implicit var materializer: ActorMaterializer
>>>   @transient implicit var system: ActorSystem
>>>   @transient implicit var executionContext: ExecutionContextExecutor
>>>
>>>
>>>   override def asyncInvoke(input: Shipment, resultFuture: 
>>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>
>>>     val resultFutureRequested: Future[HttpResponse] = 
>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>>
>>>     resultFutureRequested.onComplete {
>>>       case Success(res) => {
>>>         resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>>>       }
>>>       case Failure(x)   => {
>>>         resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>>>       }
>>>     }
>>>   }
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>>     super.open(parameters)
>>>     system = ActorSystem("my-system")
>>>     executionContext = system.dispatcher
>>>     materializer = ActorMaterializer()
>>>   }
>>> }
>>>
>>> ```
>>>
>>> Aslo the Usage of that class, I has error, I guess its because of
>>> java/scala issue. In flink doc, for java code they use RichAsyncFunction
>>> and for scala they use AsyncFunction:
>>> ```
>>> //    AsyncDataStream.unorderedWait(streamShipment, new
>>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type
>>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual:
>>> RichAsyncHttpClient
>>>
>>> ```
>>>
>>> ###
>>>
>>> So I try to fix my current code again with transient field and move it
>>> into constructor:
>>> ```
>>>
>>>
>>>
>>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>>
>>>   @transient implicit lazy val system = {
>>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>>     ActorSystem("my-system")
>>>   }
>>>   @transient implicit lazy val executionContext = {
>>>     system.dispatcher
>>>   }
>>>   @transient implicit lazy val materializer: ActorMaterializer = {
>>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>>     ActorMaterializer()
>>>   }
>>>
>>>   override def asyncInvoke(input: Shipment, resultFuture: 
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>> //    PPLogger.getActivityLogger.info 
>>> <http://pplogger.getactivitylogger.info/>("###########INIT 
>>> ------------------- ")
>>> //    implicit val system = ActorSystem("my-system")
>>> //    implicit val executionContext = system.dispatcher
>>> //    implicit val materializer: ActorMaterializer = ActorMaterializer()
>>> //    PPLogger.getActivityLogger.info 
>>> <http://pplogger.getactivitylogger.info/>("###########DONE 
>>> ------------------- ")
>>>
>>>     val resultFutureRequested: Future[HttpResponse] = 
>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>>     resultFutureRequested.onComplete {
>>>       case Success(res) => {
>>>         resultFuture.complete(Iterable(Right(res.entity)))
>>>       }
>>>       case Failure(x)   => {
>>>         resultFuture.complete(Iterable(Left(x)))
>>>       }
>>>     }
>>>
>>>   }
>>>
>>>   override def timeout(input: Shipment, resultFuture: 
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async 
>>> function call has timed out."))))
>>>   }
>>> }
>>>
>>> ```
>>> And its run ok. The log was print only one.
>>>
>>> I still asking about this because I haven’t understand the term `That
>>> way you only create the `ActorMaterialier` on the `TaskManager` where the
>>> operator is executed and solve the problem of serializability `. I though
>>> for all the code executed inside TaskManger?
>>>
>>> Thanks for being patient with me, till here
>>>
>>> Andy,
>>>
>>>
>>>
>>>
>>> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>>>
>>> Hi Andy,
>>>
>>> without being an expert of Akka's http client, I think you should not
>>> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`.
>>> What I would recommend you instead is to implement a `RichAsyncFunction`
>>> with a transient field for `ActorMaterializer` which you initialize in the
>>> `RichAsyncFunction#open` method. That way you only create the
>>> `ActorMaterialier` on the `TaskManager` where the operator is executed and
>>> solve the problem of serializability and you make it much more efficient
>>> because you don't create a new `ActorSystem` for every request.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I’m try to decide which http client to go with Flink, currently I
>>>> tested with scalaj and akka http client and both work ok with our current
>>>> dev environment.
>>>> For scalaj its is pretty straight forward since its is just calling an
>>>> http request with its timeout.
>>>>
>>>> For akka http client its a bit more complicated (I’m new to scala and
>>>> all), so I’m asking if am I doing it right by create a AsyncFunction like
>>>> this
>>>> ```
>>>>
>>>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>>>
>>>>   override def asyncInvoke(input: Shipment, resultFuture: 
>>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>>
>>>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>>>     implicit val system = ActorSystem("my-system")
>>>>     implicit val executionContext = system.dispatcher
>>>>     implicit val materializer: ActorMaterializer = ActorMaterializer()
>>>>     val resultFutureRequested: Future[HttpResponse] = 
>>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>>>
>>>>
>>>>     resultFutureRequested.onComplete {
>>>>       case Success(res) => {
>>>>         resultFuture.complete(Iterable(Right(res.entity)))
>>>>       }
>>>>       case Failure(x)   => {
>>>>         resultFuture.complete(Iterable(Left(x)))
>>>>       }
>>>>     }
>>>>
>>>>   }
>>>>
>>>>   override def timeout(input: Shipment, resultFuture: 
>>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async 
>>>> function call has timed out."))))
>>>>   }
>>>> }
>>>>
>>>> ```
>>>> I notice that I have to implicit create a bunch of variable inside the
>>>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
>>>> overhead. I did try to init them in the constructor of the class but the
>>>> compiler just throw a bunch of Not implemented Serializer error.
>>>>
>>>> My lib:
>>>>    "com.typesafe.akka" %% "akka-http" % "10.1.8",
>>>>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>>>>
>>>> My flink:
>>>> scala 2.12
>>>> flink 1.70
>>>>
>>>>
>>>>
>>>> Any reply are appreciated!
>>>>
>>>> Thanks a bunch
>>>>
>>>> Andy,
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to