Hi devs! I'm working on enhancing a custom 0.8.2.1 producer/consumer to support establishing connections a secured 0.9.0 cluster with strict ACLs on each topic. I'm pretty new to (read: first day working with) GSS-API/JAAS and not really sure how to approach this problem. Our existing implementation is pretty straight forward, establish a socket to the broker and send/receive byte arrays conforming to the Kafka protocol accordingly.
Now--with 0.9.0 brokers configured to listen on 9093 with SASL_PLAINTEXT. I'm performing a auth with Kerberos on our existing Kerberos infrastructure that's in a known functional state and I'm running into a world of confusion. My initial experiments consisted of sending a metadata fetch request for a single topic only that has ACLs restricting metadata, consuming and producing to only a single principal. Something that should be pretty straight forward. I'm launching my client with the following system properties configured: JAVA_OPTS="$JAVA_OPTS -Djavax.security.auth.useSubjectCredsOnly=false" JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=$CONFIGDIR/jaas.conf" JAVA_OPTS="$JAVA_OPTS -Djava.security.debug=gssloginconfig,configfile,configparser,logincontext" JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.realm=MY.REALM" JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.kdc=my.kdc" ...and my jaas.conf contains: com.sun.security.jgss.krb5.initiate { com.sun.security.auth.module.Krb5LoginModule required principal="dariens@MY.REALM" useKeyTab=true keyTab="/opt/klogger/config/dariens.keytab" useTicketCache=false doNotPrompt=true storeKey=true; }; I then open the socket as I did with 0.8.2.1 and then use GSS-API to authenticate myself with a little util that accepts the socket then returns the GSSContext (code below). I then tried a number of things (being unfamiliar with everything in this space I relied on trial and error/brute force to attempt to get it working). I tried sending/receiving byte arrays as I would have with 0.8.2.1 with combinations of encrypt/verifyIntegrity, I tried wrapping the original byte array with the context and sending/verifying the token, all to no avail and gave up after a number of hours. FWIW, I do see that I am authenticating via GSS-API properly regardless of what I then do over the socket. Could anyone advise how far off my approach is? I'll be reading up on this more today and have some packet captures that I'll examine to glean more information on but in the meantime some help would be greatly appreciated! Logs: [2015-12-10 14:59:03,716] INFO Listening on port 2002 (com.blackberry.bdp.klogger.TcpListener) [2015-12-10 14:59:05,408] INFO Creating new producer for topic dariens-only-rw, key 1 (com.blackberry.bdp.krackle.producer.Producer) [2015-12-10 14:59:05,624] INFO Updating metadata (com.blackberry.bdp.krackle.producer.Producer) [2015-12-10 14:59:05,626] INFO Getting metadata for dariens-only-rw (com.blackberry.bdp.krackle.meta.MetaData) [2015-12-10 14:59:05,629] INFO established socket to k1-kafka090.kafka.lab:9093 (com.blackberry.bdp.krackle.meta.MetaData) [2015-12-10 14:59:05,776] INFO sending token of size 572 from initSecContext (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) [2015-12-10 14:59:05,776] INFO context established (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) [2015-12-10 14:59:05,776] INFO client: dariens@MY.REALM (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) [2015-12-10 14:59:05,776] INFO server: kafka (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) [2015-12-10 14:59:05,776] INFO mutual auth took place?: false (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) [2015-12-10 14:59:05,776] INFO context supports data integrity?: true (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) [2015-12-10 14:59:05,776] INFO context supports data confidentiality?: false (com.blackberry.bdp.krackle.security.GssKrb5AuthHandler) configfile: reading file:/opt/klogger/config/jaas.conf configparser: Reading next config entry: com.sun.security.jgss.krb5.initiate configparser: com.sun.security.auth.module.Krb5LoginModule, required configparser: storeKey=true configparser: useKeyTab=true configparser: principal=dariens@MY.REALM configparser: keyTab=/opt/klogger/config/dariens.keytab configparser: useTicketCache=false configparser: doNotPrompt=true [GSS LoginConfigImpl]: Trying com.sun.security.jgss.krb5.initiate: Found! [LoginContext]: login success [LoginContext]: commit success Some GSS-API related source: public GssKrb5AuthHandler() throws GSSException { krb5Oid = new Oid("1.2.840.113554.1.2.2"); manager = GSSManager.getInstance(); requestMutualAuth = false; encrypt = false; verifyIntegrity = true; } public GSSContext getContext(Socket sock, String peerPrincipal) throws GSSException, IOException { DataInputStream inStream = new DataInputStream(sock.getInputStream()); DataOutputStream outStream = new DataOutputStream(sock.getOutputStream()); GSSName peerName = manager.createName(peerPrincipal, null); GSSContext context = manager.createContext(peerName, krb5Oid, null, GSSContext.DEFAULT_LIFETIME); context.requestMutualAuth(requestMutualAuth); // Mutual authentication context.requestConf(encrypt); // Confidentility/encryption context.requestInteg(verifyIntegrity); // Integrity(i.e. require wrap() and getMICmethods() byte[] token = new byte[0]; while (!context.isEstablished()) { token = context.initSecContext(token, 0, token.length); if (token != null) { LOG.info("sending token of size {} from initSecContext", token.length); outStream.writeInt(token.length); outStream.write(token); outStream.flush(); } if (!context.isEstablished()) { token = new byte[inStream.readInt()]; LOG.info("read input token (size {}) for processing by initSecContext", token.length); inStream.readFully(token); } } LOG.info("context established"); LOG.info("client: {}", context.getSrcName()); LOG.info("server: {} ", context.getTargName()); LOG.info("mutual auth took place?: {}", context.getMutualAuthState()); LOG.info("context supports data integrity?: {}", context.getIntegState()); LOG.info("context supports data confidentiality?: {}", context.getConfState()); return context; }