Hi devs!

I'm working on enhancing a custom 0.8.2.1 producer/consumer to support 
establishing connections a secured 0.9.0 cluster with strict ACLs on each 
topic.  I'm pretty new to (read: first day working with) GSS-API/JAAS and not 
really sure how to approach this problem.  Our existing implementation is 
pretty straight forward, establish a socket to the broker and send/receive byte 
arrays conforming to the Kafka protocol accordingly.

Now--with 0.9.0 brokers configured to listen on 9093 with SASL_PLAINTEXT.  I'm 
performing a auth with Kerberos on our existing Kerberos infrastructure that's 
in a known functional state and I'm running into a world of confusion.

My initial experiments consisted of sending a metadata fetch request for a 
single topic only that has ACLs restricting metadata, consuming and producing 
to only a single principal.  Something that should be pretty straight forward.

I'm launching my client with the following system properties configured:

JAVA_OPTS="$JAVA_OPTS -Djavax.security.auth.useSubjectCredsOnly=false"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=$CONFIGDIR/jaas.conf"
JAVA_OPTS="$JAVA_OPTS 
-Djava.security.debug=gssloginconfig,configfile,configparser,logincontext"
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.realm=MY.REALM"
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.kdc=my.kdc"

...and my jaas.conf contains:

com.sun.security.jgss.krb5.initiate {
    com.sun.security.auth.module.Krb5LoginModule required
    principal="dariens@MY.REALM"
    useKeyTab=true
    keyTab="/opt/klogger/config/dariens.keytab"
    useTicketCache=false
    doNotPrompt=true
    storeKey=true;
};

I then open the socket as I did with 0.8.2.1 and then use GSS-API to 
authenticate myself with a little util that accepts the socket then returns the 
GSSContext (code below).  I then tried a number of things (being unfamiliar 
with everything in this space I relied on trial and error/brute force to 
attempt to get it working).

I tried sending/receiving byte arrays as I would have with 0.8.2.1 with 
combinations of encrypt/verifyIntegrity, I tried wrapping the original byte 
array with the context and sending/verifying the token, all to no avail and 
gave up after a number of hours.

FWIW, I do see that I am authenticating via GSS-API properly regardless of what 
I then do over the socket.

Could anyone advise how far off my approach is?  I'll be reading up on this 
more today and have some packet captures that I'll examine to glean more 
information on but in the meantime some help would be greatly appreciated!

Logs:

[2015-12-10 14:59:03,716] INFO Listening on port 2002 
(com.blackberry.bdp.klogger.TcpListener)
[2015-12-10 14:59:05,408] INFO Creating new producer for topic dariens-only-rw, 
key 1 (com.blackberry.bdp.krackle.producer.Producer)
[2015-12-10 14:59:05,624] INFO Updating metadata 
(com.blackberry.bdp.krackle.producer.Producer)
[2015-12-10 14:59:05,626] INFO Getting metadata for dariens-only-rw 
(com.blackberry.bdp.krackle.meta.MetaData)
[2015-12-10 14:59:05,629] INFO established socket to k1-kafka090.kafka.lab:9093 
(com.blackberry.bdp.krackle.meta.MetaData)
[2015-12-10 14:59:05,776] INFO sending token of size 572 from initSecContext 
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)
[2015-12-10 14:59:05,776] INFO context established 
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)
[2015-12-10 14:59:05,776] INFO client: dariens@MY.REALM 
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)
[2015-12-10 14:59:05,776] INFO server: kafka  
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)
[2015-12-10 14:59:05,776] INFO mutual auth took place?: false 
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)
[2015-12-10 14:59:05,776] INFO context supports data integrity?: true 
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)
[2015-12-10 14:59:05,776] INFO context supports data confidentiality?: false 
(com.blackberry.bdp.krackle.security.GssKrb5AuthHandler)

configfile: reading file:/opt/klogger/config/jaas.conf
configparser:     Reading next config entry: com.sun.security.jgss.krb5.initiate
configparser:         com.sun.security.auth.module.Krb5LoginModule, required
configparser:             storeKey=true
configparser:             useKeyTab=true
configparser:             principal=dariens@MY.REALM
configparser:             keyTab=/opt/klogger/config/dariens.keytab
configparser:             useTicketCache=false
configparser:             doNotPrompt=true
    [GSS LoginConfigImpl]: Trying com.sun.security.jgss.krb5.initiate: Found!
    [LoginContext]: login success
    [LoginContext]: commit success

Some GSS-API related source:

public GssKrb5AuthHandler() throws GSSException {
  krb5Oid = new Oid("1.2.840.113554.1.2.2");
  manager = GSSManager.getInstance();
  requestMutualAuth = false;
  encrypt = false;
  verifyIntegrity = true;
}

public GSSContext getContext(Socket sock,
  String peerPrincipal)
    throws GSSException, IOException {
  DataInputStream inStream = new DataInputStream(sock.getInputStream());
  DataOutputStream outStream = new DataOutputStream(sock.getOutputStream());
  GSSName peerName = manager.createName(peerPrincipal, null);
  GSSContext context = manager.createContext(peerName,
    krb5Oid,
    null,
    GSSContext.DEFAULT_LIFETIME);
  context.requestMutualAuth(requestMutualAuth);  // Mutual authentication
  context.requestConf(encrypt);  // Confidentility/encryption
  context.requestInteg(verifyIntegrity); // Integrity(i.e. require wrap() and 
getMICmethods()

  byte[] token = new byte[0];
  while (!context.isEstablished()) {
    token = context.initSecContext(token, 0, token.length);
    if (token != null) {
      LOG.info("sending token of size {} from initSecContext", token.length);
      outStream.writeInt(token.length);
      outStream.write(token);
      outStream.flush();
    }
    if (!context.isEstablished()) {
      token = new byte[inStream.readInt()];
      LOG.info("read input token (size {}) for processing by initSecContext", 
token.length);
      inStream.readFully(token);
    }
  }
  LOG.info("context established");
  LOG.info("client: {}", context.getSrcName());
  LOG.info("server: {} ", context.getTargName());
  LOG.info("mutual auth took place?: {}", context.getMutualAuthState());
  LOG.info("context supports data integrity?: {}", context.getIntegState());
  LOG.info("context supports data confidentiality?: {}", 
context.getConfState());
  return context;
}




Reply via email to