Hmm I'm not a Redis expert, but are you sure that you see a successful ping
reply in the logs of the TaskManagers and not only in the client logs?

Another thing: Is the redisClient thread safe? Multiple map tasks might be
accessing the set and get methods concurrently.

Another question: The code of DriverStreamHelper you've just sent is not
the code you've used when receiving the stack trace, right? Because in the
stack trace it's written that you access a RedisClientPool from the
DriverStreamHelper.set method.

Cheers,
Till


On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> Till,
>   I have checked from all the taskmanager nodes I am able to establish a
> connection by installing a redis-cli on those nodes. The thing is in the
> constructor I am able to set and get values, also I am getting PONG for the
> ping. But once object is initialized when I try to call DriverStreamHelper.get
> and DriverStreamHelper.set from map/apply function I get the connection
> refused. This may not be related to flink but rather to some security
> setting with Amazon AWS EMR, this is assumption now. I have also tried with
> 3 different redis libraries to rule out any errors with libraries the same
> exception in all.
>
> object DriverStreamHelper {
>
>
>   implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")
>
>   val redisClient = RedisClient(host=redisHost, port=redisPort)
>
>   val p = redisClient.ping()
>   p.map{ res => LOG.info(s"Reply from Redis client : $res") }
>
>
>
>   val postFix = System.currentTimeMillis()
>   val key = "some-key" + postFix
>   val value = "some-value" + postFix
>   set(key, value, Some(10000L))
>   LOG.info(s"Going to get the value from Redis ${get(key)}")
>
>   def set(k: String, v: String): Unit = {
>     redisClient.set(k,v)
>   }
>
>   def set(k: String, v: String, exTime: Option[Long]): Unit = {
>       redisClient.set(k,v,exTime)
>   }
>
>
> def get(k: String): Option[String] = {
> import scala.concurrent.duration._
> val f = redisClient.get[String](k)
> Await.result(f, 1.seconds) //FIXME - really bad need to return future
> here.
> }
>
> }
>
>
> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Balaji,
>>
>> from the stack trace it looks as if you cannot open a connection redis.
>> Have you checked that you can access redis from all your TaskManager nodes?
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
>> balaji.rajagopa...@olacabs.com> wrote:
>>
>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in
>>> one of apply window function, I try to set some values in redis it fails. I
>>> have tried to access the same redis with no flink code and get/set works,
>>> but from the flink I get  into this exception. Any inputs on what might be
>>> going wrong.
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Error at remote task manager 'some-ip'.
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>
>>> at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>
>>> at
>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>
>>> at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>>
>>> at
>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by:
>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>>
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>
>>> ... 2 more
>>>
>>>
>>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>>> Connection refused
>>>
>>>         at com.redis.IO <http://com.redis.io/>
>>> $class.connect(IO.scala:37)
>>>
>>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>>
>>>         at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>>
>>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>>
>>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>>
>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>>
>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>>
>>>         at
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>>
>>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>>
>>>         at
>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>>
>>>         at
>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>>
>>>         at
>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>>
>>>         at
>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>>
>>>         at
>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.net.ConnectException: Connection refused
>>>
>>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>>
>>>         at
>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>>
>>>         at
>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>>
>>>         at
>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>>
>>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>>
>>>         at java.net.Socket.connect(Socket.java:589)
>>>
>>>         at java.net.Socket.connect(Socket.java:538)
>>>
>>>         at java.net.Socket.<init>(Socket.java:434)
>>>
>>>         at java.net.Socket.<init>(Socket.java:211)
>>>
>>
>>
>

Reply via email to