[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927451#comment-15927451
 ] 

ASF GitHub Bot commented on FLINK-3930:
---------------------------------------

Github user WangTaoTheTonic commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2425#discussion_r106335560
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java
 ---
    @@ -0,0 +1,130 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network.netty;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class CookieHandler {
    +
    +   public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
    +
    +           private final Logger LOG = 
LoggerFactory.getLogger(ClientCookieHandler.class);
    +
    +           private final String secureCookie;
    +
    +           final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
    +
    +           public ClientCookieHandler(String secureCookie) {
    +                   this.secureCookie = secureCookie;
    +           }
    +
    +           @Override
    +           public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
    +                   super.channelActive(ctx);
    +                   LOG.debug("In channelActive method of 
ClientCookieHandler");
    +
    +                   if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
    +                           LOG.debug("In channelActive method of 
ClientCookieHandler -> sending secure cookie");
    +                           final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
    +                           
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
    +                           
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
    +                           ctx.writeAndFlush(buffer);
    +                   }
    +           }
    +   }
    +
    +   public static class ServerCookieDecoder extends 
MessageToMessageDecoder<ByteBuf> {
    +
    +           private final String secureCookie;
    +
    +           private final List<Channel> channelList = new ArrayList<>();
    +
    +           private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");
    +
    +           private final Logger LOG = 
LoggerFactory.getLogger(ServerCookieDecoder.class);
    +
    +           public ServerCookieDecoder(String secureCookie) {
    +                   this.secureCookie = secureCookie;
    +           }
    +
    +           /**
    +            * Decode from one message to an other. This method will be 
called for each written message that can be handled
    +            * by this encoder.
    +            *
    +            * @param ctx the {@link ChannelHandlerContext} which this 
{@link MessageToMessageDecoder} belongs to
    +            * @param msg the message to decode to an other one
    +            * @param out the {@link List} to which decoded messages should 
be added
    +            * @throws Exception is thrown if an error accour
    +            */
    +           @Override
    +           protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List<Object> out) throws Exception {
    +
    +                   LOG.debug("ChannelHandlerContext name: {}, channel: 
{}", ctx.name(), ctx.channel());
    +
    +                   if(secureCookie == null || secureCookie.length() == 0) {
    +                           LOG.debug("Not validating secure cookie since 
the server configuration is not enabled to use cookie");
    +                           return;
    +                   }
    +
    +                   LOG.debug("Going to decode the secure cookie passed by 
the remote client");
    +
    +                   if(channelList.contains(ctx.channel())) {
    +                           LOG.debug("Channel: {} already authorized", 
ctx.channel());
    +                           return;
    +                   }
    +
    +                   //read cookie based on the cookie length passed
    +                   int cookieLength = msg.readInt();
    +                   if(cookieLength != 
secureCookie.getBytes(DEFAULT_CHARSET).length) {
    +                           ctx.channel().close();
    +                           String message = "Cookie length does not match 
with source cookie. Invalid secure cookie passed.";
    +                           throw new IllegalStateException(message);
    +                   }
    +
    +                   //read only if cookie length is greater than zero
    +                   if(cookieLength > 0) {
    +
    +                           final byte[] buffer = new 
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
    +                           msg.readBytes(buffer, 0, cookieLength);
    +
    +                           
if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
    +                                   LOG.error("Secure cookie from the 
client is not matching with the server's identity");
    +                                   throw new 
IllegalStateException("Invalid secure cookie passed.");
    +                           }
    +
    +                           LOG.info("Secure cookie validation passed");
    +
    +                           channelList.add(ctx.channel());
    --- End diff --
    
    It seems like we only have channels added, but not removed. Would this list 
increase indefinitely?


> Implement Service-Level Authorization
> -------------------------------------
>
>                 Key: FLINK-3930
>                 URL: https://issues.apache.org/jira/browse/FLINK-3930
>             Project: Flink
>          Issue Type: New Feature
>          Components: Security
>            Reporter: Eron Wright 
>            Assignee: Vijay Srinivasaraghavan
>              Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to