Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 > The cookie is added to every single message/buffer that is transferred. That is too much - securing the integrity of the stream is responsibility of the encryption layer. The cookie should be added to requests messages that establish connections only. I will change the code to address cookie handling right after the SSL handshake using a new handler and drop the cookie passing for every messages. The handler will be added to the pipeline of both `NettyClient` and `NettyServer`. Client will send the cookie when the channel becomes active and the server will validate and keep track of the clients that are authorized. Here is the pseudo-code for Client and Server handlers. Please take a look and let me know if you are okay with this approach and I will modify the code. --- public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { private final String secureCookie; final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public ClientCookieHandler(String secureCookie) { this.secureCookie = secureCookie; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); if(this.secureCookie != null && this.secureCookie.length() != 0) { final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); ctx.writeAndFlush(buffer); } } } public static class ServerCookieDecoder extends MessageToMessageDecoder<ByteBuf> { private final String secureCookie; private final List<Channel> channelList = new ArrayList<>(); private final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public ServerCookieDecoder(String secureCookie) { this.secureCookie = secureCookie; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { if(secureCookie == null || secureCookie.length() == 0) { return; } if(channelList.contains(ctx.channel())) { return; } //read cookie based on the cookie length passed int cookieLength = msg.readInt(); if(cookieLength != secureCookie.getBytes(DEFAULT_CHARSET).length) { String message = "Cookie length does not match with source cookie. Invalid secure cookie passed."; throw new IllegalStateException(message); } //read only if cookie length is greater than zero if(cookieLength > 0) { final byte[] buffer = new byte[secureCookie.getBytes(DEFAULT_CHARSET).length]; msg.readBytes(buffer, 0, cookieLength); if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) { LOG.error("Secure cookie from the client is not matching with the server's identity"); throw new IllegalStateException("Invalid secure cookie passed."); } LOG.info("Secure cookie validation passed"); channelList.add(ctx.channel()); } } } ---
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---