Till,
  I have checked from all the taskmanager nodes I am able to establish a
connection by installing a redis-cli on those nodes. The thing is in the
constructor I am able to set and get values, also I am getting PONG for the
ping. But once object is initialized when I try to call DriverStreamHelper.get
and DriverStreamHelper.set from map/apply function I get the connection
refused. This may not be related to flink but rather to some security
setting with Amazon AWS EMR, this is assumption now. I have also tried with
3 different redis libraries to rule out any errors with libraries the same
exception in all.

object DriverStreamHelper {


  implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")

  val redisClient = RedisClient(host=redisHost, port=redisPort)

  val p = redisClient.ping()
  p.map{ res => LOG.info(s"Reply from Redis client : $res") }



  val postFix = System.currentTimeMillis()
  val key = "some-key" + postFix
  val value = "some-value" + postFix
  set(key, value, Some(10000L))
  LOG.info(s"Going to get the value from Redis ${get(key)}")

  def set(k: String, v: String): Unit = {
    redisClient.set(k,v)
  }

  def set(k: String, v: String, exTime: Option[Long]): Unit = {
      redisClient.set(k,v,exTime)
  }


def get(k: String): Option[String] = {
import scala.concurrent.duration._
val f = redisClient.get[String](k)
Await.result(f, 1.seconds) //FIXME - really bad need to return future here.
}

}


On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Balaji,
>
> from the stack trace it looks as if you cannot open a connection redis.
> Have you checked that you can access redis from all your TaskManager nodes?
>
> Cheers,
> Till
>
> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> I am trying to use AWS EMR yarn cluster where the flink code runs, in one
>> of apply window function, I try to set some values in redis it fails. I
>> have tried to access the same redis with no flink code and get/set works,
>> but from the flink I get  into this exception. Any inputs on what might be
>> going wrong.
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Error at remote task manager 'some-ip'.
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by:
>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>
>> ... 2 more
>>
>>
>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>> Connection refused
>>
>>         at com.redis.IO <http://com.redis.io/>$class.connect(IO.scala:37)
>>
>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>
>>         at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>
>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>
>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>
>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>
>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>
>>         at
>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>
>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>
>>         at
>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>
>>         at
>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>
>>         at
>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>
>>         at
>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>
>>         at
>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.net.ConnectException: Connection refused
>>
>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>
>>         at
>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>
>>         at
>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>
>>         at
>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>
>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>
>>         at java.net.Socket.connect(Socket.java:589)
>>
>>         at java.net.Socket.connect(Socket.java:538)
>>
>>         at java.net.Socket.<init>(Socket.java:434)
>>
>>         at java.net.Socket.<init>(Socket.java:211)
>>
>
>

Reply via email to