Hi Shixiong: Thanks for the reply. You are right. It seems it only supports the following two types.
I will retry by adding FileRegion type. protected long calculateSize(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } return -1; } 2017-06-14 1:34 GMT+08:00 Shixiong(Ryan) Zhu <shixi...@databricks.com>: > I took a look at ChannelTrafficShapingHandler. Looks like it's because it > doesn't support FileRegion. Spark's messages use this interface. > See org.apache.spark.network.protocol.MessageWithHeader. > > On Tue, Jun 13, 2017 at 4:17 AM, Niu Zhaojie <nzjem...@gmail.com> wrote: > >> Hi All: >> >> I am trying to control the network read/write speed with >> ChannelTrafficShapingHandler provided by Netty. >> >> >> In TransportContext.java >> >> I modify it as below: >> >> public TransportChannelHandler initializePipeline( >> SocketChannel channel, >> RpcHandler channelRpcHandler) { >> try { >> // added by zhaojie >> logger.info("want to try control read bandwidth on host: " + host); >> final ChannelTrafficShapingHandler channelShaping = new >> ChannelTrafficShapingHandler(50, 50, 1000); >> >> TransportChannelHandler channelHandler = createChannelHandler(channel, >> channelRpcHandler); >> >> channel.pipeline() >> .addLast("encoder", ENCODER) >> .addLast(TransportFrameDecoder.HANDLER_NAME, >> NettyUtils.createFrameDecoder()) >> .addLast("decoder", DECODER) >> .addLast("channelTrafficShaping", channelShaping) >> .addLast("idleStateHandler", new IdleStateHandler(0, 0, >> conf.connectionTimeoutMs() / 1000)) >> // NOTE: Chunks are currently guaranteed to be returned in the >> order of request, but this >> // would require more logic to guarantee if this were not part >> of the same event loop. >> .addLast("handler", channelHandler); >> >> >> I create a ChannelTrafficShapingHandler and register it into the >> pipeline of the channel. I set the write and read speed as 50kb/sec in the >> constructor. >> Except for it, what else do I need to do? >> >> However, it does not work. Is this idea correct? Am I missing something? >> >> Is there any better way ? >> >> Thanks. >> >> -- >> *Regards,* >> *Zhaojie* >> >> > -- *Regards,* *Zhaojie*