Great to hear that you solved your problem :-) On Wed, Apr 6, 2016 at 2:29 PM, Balaji Rajagopalan < balaji.rajagopa...@olacabs.com> wrote:
> Till, > Found the issue, it was my bad assumption about GlobalConfiguration, > what I thought was once the configuration is read from the client machine > GlobalConfiguration params will passed on to the task manager nodes, as > well, it was not and values from default was getting pickup, which was > localhost 6379 and there was no redis running in localhost of task manager. > > balaji > > On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hmm I'm not a Redis expert, but are you sure that you see a successful >> ping reply in the logs of the TaskManagers and not only in the client logs? >> >> Another thing: Is the redisClient thread safe? Multiple map tasks might >> be accessing the set and get methods concurrently. >> >> Another question: The code of DriverStreamHelper you've just sent is not >> the code you've used when receiving the stack trace, right? Because in the >> stack trace it's written that you access a RedisClientPool from the >> DriverStreamHelper.set method. >> >> Cheers, >> Till >> >> >> On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan < >> balaji.rajagopa...@olacabs.com> wrote: >> >>> Till, >>> I have checked from all the taskmanager nodes I am able to establish a >>> connection by installing a redis-cli on those nodes. The thing is in the >>> constructor I am able to set and get values, also I am getting PONG for the >>> ping. But once object is initialized when I try to call >>> DriverStreamHelper.get >>> and DriverStreamHelper.set from map/apply function I get the connection >>> refused. This may not be related to flink but rather to some security >>> setting with Amazon AWS EMR, this is assumption now. I have also tried with >>> 3 different redis libraries to rule out any errors with libraries the same >>> exception in all. >>> >>> object DriverStreamHelper { >>> >>> >>> implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem") >>> >>> val redisClient = RedisClient(host=redisHost, port=redisPort) >>> >>> val p = redisClient.ping() >>> p.map{ res => LOG.info(s"Reply from Redis client : $res") } >>> >>> >>> >>> val postFix = System.currentTimeMillis() >>> val key = "some-key" + postFix >>> val value = "some-value" + postFix >>> set(key, value, Some(10000L)) >>> LOG.info(s"Going to get the value from Redis ${get(key)}") >>> >>> def set(k: String, v: String): Unit = { >>> redisClient.set(k,v) >>> } >>> >>> def set(k: String, v: String, exTime: Option[Long]): Unit = { >>> redisClient.set(k,v,exTime) >>> } >>> >>> >>> def get(k: String): Option[String] = { >>> import scala.concurrent.duration._ >>> val f = redisClient.get[String](k) >>> Await.result(f, 1.seconds) //FIXME - really bad need to return future >>> here. >>> } >>> >>> } >>> >>> >>> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Balaji, >>>> >>>> from the stack trace it looks as if you cannot open a connection redis. >>>> Have you checked that you can access redis from all your TaskManager nodes? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan < >>>> balaji.rajagopa...@olacabs.com> wrote: >>>> >>>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in >>>>> one of apply window function, I try to set some values in redis it fails. >>>>> I >>>>> have tried to access the same redis with no flink code and get/set works, >>>>> but from the flink I get into this exception. Any inputs on what might be >>>>> going wrong. >>>>> >>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >>>>> Error at remote task manager 'some-ip'. >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241) >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>>> >>>>> at >>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>>> >>>>> at >>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>>> >>>>> at >>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) >>>>> >>>>> at >>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >>>>> >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>>> >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>>> >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>>> >>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>>> >>>>> at >>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Caused by: >>>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164) >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) >>>>> >>>>> at >>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) >>>>> >>>>> at >>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) >>>>> >>>>> at >>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299) >>>>> >>>>> at >>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) >>>>> >>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) >>>>> >>>>> ... 2 more >>>>> >>>>> >>>>> Caused by: java.lang.RuntimeException: java.net.ConnectException: >>>>> Connection refused >>>>> >>>>> at com.redis.IO <http://com.redis.io/> >>>>> $class.connect(IO.scala:37) >>>>> >>>>> at com.redis.RedisClient.connect(RedisClient.scala:94) >>>>> >>>>> at >>>>> com.redis.RedisCommand$class.initialize(RedisClient.scala:71) >>>>> >>>>> at com.redis.RedisClient.initialize(RedisClient.scala:94) >>>>> >>>>> at com.redis.RedisClient.<init>(RedisClient.scala:98) >>>>> >>>>> at com.redis.RedisClientFactory.makeObject(Pool.scala:12) >>>>> >>>>> at com.redis.RedisClientFactory.makeObject(Pool.scala:7) >>>>> >>>>> at >>>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149) >>>>> >>>>> at com.redis.RedisClientPool.withClient(Pool.scala:34) >>>>> >>>>> at >>>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57) >>>>> >>>>> at >>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35) >>>>> >>>>> at >>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Caused by: java.net.ConnectException: Connection refused >>>>> >>>>> at java.net.PlainSocketImpl.socketConnect(Native Method) >>>>> >>>>> at >>>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) >>>>> >>>>> at >>>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) >>>>> >>>>> at >>>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) >>>>> >>>>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >>>>> >>>>> at java.net.Socket.connect(Socket.java:589) >>>>> >>>>> at java.net.Socket.connect(Socket.java:538) >>>>> >>>>> at java.net.Socket.<init>(Socket.java:434) >>>>> >>>>> at java.net.Socket.<init>(Socket.java:211) >>>>> >>>> >>>> >>> >> >