Updated Branches: refs/heads/cassandra-2.0 38ae1beef -> 326d5454d
Track clients' remote addresses in ClientState patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-6070 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/326d5454 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/326d5454 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/326d5454 Branch: refs/heads/cassandra-2.0 Commit: 326d5454dd22a40117791952a80fad37e9161d89 Parents: 38ae1be Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Sep 24 16:59:09 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Sep 24 17:00:17 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + examples/client_only/src/ClientOnlyExample.java | 109 ++++--------------- src/java/org/apache/cassandra/auth/Auth.java | 5 +- .../cassandra/auth/CassandraAuthorizer.java | 2 +- .../cassandra/auth/PasswordAuthenticator.java | 2 +- .../apache/cassandra/cql3/QueryProcessor.java | 5 +- .../cql3/statements/ListUsersStatement.java | 2 +- .../apache/cassandra/service/ClientState.java | 64 +++++++---- .../apache/cassandra/service/QueryState.java | 8 ++ .../cassandra/thrift/ThriftClientState.java | 11 +- .../cassandra/thrift/ThriftSessionManager.java | 6 +- .../cassandra/transport/ServerConnection.java | 6 +- 12 files changed, 96 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index de51837..4bc1fd6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081) * Fix insertion of collections with CAS (CASSANDRA-6069) * Correctly send metadata on SELECT COUNT (CASSANDRA-6080) + * Track clients' remote addresses in ClientState (CASSANDRA-6070) Merged from 1.2: * Allow where clause conditions to be in parenthesis (CASSANDRA-6037) * Do not open non-ssl storage port if encryption option is all (CASSANDRA-3916) http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/examples/client_only/src/ClientOnlyExample.java ---------------------------------------------------------------------- diff --git a/examples/client_only/src/ClientOnlyExample.java b/examples/client_only/src/ClientOnlyExample.java index e823ead..7b42140 100644 --- a/examples/client_only/src/ClientOnlyExample.java +++ b/examples/client_only/src/ClientOnlyExample.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,22 +16,18 @@ * limitations under the License. */ -import java.nio.ByteBuffer; -import java.util.*; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.*; -import org.apache.cassandra.transport.messages.ResultMessage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.Uninterruptibles; public class ClientOnlyExample { @@ -47,24 +43,15 @@ public class ClientOnlyExample private static void testWriting() throws Exception { - ClientState state = new ClientState(false); - state.setKeyspace(KEYSPACE); // do some writing. for (int i = 0; i < 100; i++) { - QueryProcessor.process( - new StringBuilder() - .append("INSERT INTO ") - .append(COLUMN_FAMILY) - .append(" (id, name, value) VALUES ( 'key") - .append(i) - .append("', 'colb', 'value") - .append(i) - .append("' )") - .toString(), - ConsistencyLevel.QUORUM, - new QueryState(state) - ); + QueryProcessor.process(String.format("INSERT INTO %s.%s (id, name, value) VALUES ( 'key%d', 'colb', 'value%d')", + KEYSPACE, + COLUMN_FAMILY, + i, + i), + ConsistencyLevel.QUORUM); System.out.println("wrote key" + i); } @@ -74,33 +61,14 @@ public class ClientOnlyExample private static void testReading() throws Exception { // do some queries. - ClientState state = new ClientState(false); - state.setKeyspace(KEYSPACE); for (int i = 0; i < 100; i++) { - List<List<ByteBuffer>> rows = ((ResultMessage.Rows)QueryProcessor.process( - new StringBuilder() - .append("SELECT id, name, value FROM ") - .append(COLUMN_FAMILY) - .append(" WHERE id = 'key") - .append(i) - .append("'") - .toString(), - ConsistencyLevel.QUORUM, - new QueryState(state) - )).result.rows; - - assert rows.size() == 1; - List<ByteBuffer> r = rows.get(0); - assert r.size() == 3; - System.out.println(new StringBuilder() - .append("ID: ") - .append(AsciiType.instance.compose(r.get(0))) - .append(", Name: ") - .append(AsciiType.instance.compose(r.get(1))) - .append(", Value: ") - .append(AsciiType.instance.compose(r.get(2))) - .toString()); + String query = String.format("SELECT id, name, value FROM %s.%s WHERE id = 'key%d'", + KEYSPACE, + COLUMN_FAMILY, + i); + UntypedResultSet.Row row = QueryProcessor.process(query, ConsistencyLevel.QUORUM).one(); + System.out.println(String.format("ID: %s, Name: %s, Value: %s", row.getString("id"), row.getString("name"), row.getString("value"))); } } @@ -137,41 +105,10 @@ public class ClientOnlyExample private static void setupKeyspace() throws RequestExecutionException, RequestValidationException, InterruptedException { - if (QueryProcessor.process( - new StringBuilder() - .append("SELECT * FROM system.schema_keyspaces WHERE keyspace_name='") - .append(KEYSPACE) - .append("'") - .toString(), ConsistencyLevel.QUORUM) - .isEmpty()) - { - QueryProcessor.process(new StringBuilder() - .append("CREATE KEYSPACE ") - .append(KEYSPACE) - .append(" WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }") - .toString(), ConsistencyLevel.QUORUM); - Thread.sleep(1000); - } - - if (QueryProcessor.process( - new StringBuilder() - .append("SELECT * FROM system.schema_columnfamilies WHERE keyspace_name='") - .append(KEYSPACE) - .append("' AND columnfamily_name='") - .append(COLUMN_FAMILY) - .append("'") - .toString(), ConsistencyLevel.QUORUM) - .isEmpty()) - { - ClientState state = new ClientState(); - state.setKeyspace(KEYSPACE); - - QueryProcessor.process(new StringBuilder() - .append("CREATE TABLE ") - .append(COLUMN_FAMILY) - .append(" ( id ascii PRIMARY KEY, name ascii, value ascii )") - .toString(), ConsistencyLevel.QUORUM, new QueryState(state)); - Thread.sleep(1000); - } + QueryProcessor.process("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", + ConsistencyLevel.ANY); + QueryProcessor.process("CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + COLUMN_FAMILY + " (id ascii PRIMARY KEY, name ascii, value ascii )", + ConsistencyLevel.ANY); + TimeUnit.MILLISECONDS.sleep(1000); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/auth/Auth.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java index 5385d72..ef25a1b 100644 --- a/src/java/org/apache/cassandra/auth/Auth.java +++ b/src/java/org/apache/cassandra/auth/Auth.java @@ -234,8 +234,9 @@ public class Auth { try { - ResultMessage.Rows rows = selectUserStatement.execute(new QueryState(new ClientState(true)), - new QueryOptions(consistencyForUser(username), Lists.newArrayList(ByteBufferUtil.bytes(username)))); + ResultMessage.Rows rows = selectUserStatement.execute(QueryState.forInternalCalls(), + new QueryOptions(consistencyForUser(username), + Lists.newArrayList(ByteBufferUtil.bytes(username)))); return new UntypedResultSet(rows.result); } catch (RequestValidationException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java index 3304f8e..deecfdb 100644 --- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java +++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java @@ -72,7 +72,7 @@ public class CassandraAuthorizer implements IAuthorizer UntypedResultSet result; try { - ResultMessage.Rows rows = authorizeStatement.execute(new QueryState(new ClientState(true)), + ResultMessage.Rows rows = authorizeStatement.execute(QueryState.forInternalCalls(), new QueryOptions(ConsistencyLevel.ONE, Lists.newArrayList(ByteBufferUtil.bytes(user.getName()), ByteBufferUtil.bytes(resource.getName())))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java index 12cfb06..b498a3b 100644 --- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java +++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java @@ -107,7 +107,7 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator UntypedResultSet result; try { - ResultMessage.Rows rows = authenticateStatement.execute(new QueryState(new ClientState(true)), + ResultMessage.Rows rows = authenticateStatement.execute(QueryState.forInternalCalls(), new QueryOptions(consistencyForUser(username), Lists.newArrayList(ByteBufferUtil.bytes(username)))); result = new UntypedResultSet(rows.result); http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 31bcaa8..52396e7 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -126,8 +126,7 @@ public class QueryProcessor { try { - QueryState state = new QueryState(new ClientState(true)); - ResultMessage result = process(query, state, new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); + ResultMessage result = process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); if (result instanceof ResultMessage.Rows) return new UntypedResultSet(((ResultMessage.Rows)result).result); else @@ -143,7 +142,7 @@ public class QueryProcessor { try { - ClientState state = new ClientState(true); + ClientState state = ClientState.forInternalCalls(); QueryState qState = new QueryState(state); state.setKeyspace(Keyspace.SYSTEM_KS); CQLStatement statement = getStatement(query, state).statement; http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java index 7a1dfec..561bf1c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java @@ -42,6 +42,6 @@ public class ListUsersStatement extends AuthenticationStatement { return QueryProcessor.process(String.format("SELECT * FROM %s.%s", Auth.AUTH_KS, Auth.USERS_CF), ConsistencyLevel.QUORUM, - new QueryState(new ClientState(true))); + QueryState.forInternalCalls()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 32e21f4..3798845 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service; +import java.net.SocketAddress; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -25,8 +26,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; @@ -44,11 +43,10 @@ import org.apache.cassandra.utils.SemanticVersion; */ public class ClientState { - private static final Logger logger = LoggerFactory.getLogger(ClientState.class); public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION; - private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new HashSet<IResource>(5); - private static final Set<IResource> PROTECTED_AUTH_RESOURCES = new HashSet<IResource>(); + private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new HashSet<>(); + private static final Set<IResource> PROTECTED_AUTH_RESOURCES = new HashSet<>(); // User-level permissions cache. private static final LoadingCache<Pair<AuthenticatedUser, IResource>, Set<Permission>> permissionsCache = initPermissionsCache(); @@ -70,29 +68,55 @@ public class ClientState // Current user for the session private volatile AuthenticatedUser user; - private String keyspace; + private volatile String keyspace; private SemanticVersion cqlVersion; - // internalCall is used to mark ClientState as used by some internal component - // that should have an ability to modify system keyspace - private final boolean internalCall; + // isInternal is used to mark ClientState as used by some internal component + // that should have an ability to modify system keyspace. + private final boolean isInternal; - public ClientState() - { - this(false); - } + // The remote address of the client - null for internal clients. + private final SocketAddress remoteAddress; /** - * Construct a new, empty ClientState + * Construct a new, empty ClientState for internal calls. */ - public ClientState(boolean internalCall) + private ClientState() + { + this.isInternal = true; + this.remoteAddress = null; + } + + protected ClientState(SocketAddress remoteAddress) { - this.internalCall = internalCall; + this.isInternal = false; + this.remoteAddress = remoteAddress; if (!DatabaseDescriptor.getAuthenticator().requireAuthentication()) this.user = AuthenticatedUser.ANONYMOUS_USER; } + /** + * @return a ClientState object for internal C* calls (not limited by any kind of auth). + */ + public static ClientState forInternalCalls() + { + return new ClientState(); + } + + /** + * @return a ClientState object for external clients (thrift/native protocol users). + */ + public static ClientState forExternalCalls(SocketAddress remoteAddress) + { + return new ClientState(remoteAddress); + } + + public SocketAddress getRemoteAddress() + { + return remoteAddress; + } + public String getRawKeyspace() { return keyspace; @@ -122,13 +146,12 @@ public class ClientState if (!user.isAnonymous() && !Auth.isExistingUser(user.getName())) throw new AuthenticationException(String.format("User %s doesn't exist - create it with CREATE USER query first", user.getName())); - this.user = user; } public void hasAllKeyspacesAccess(Permission perm) throws UnauthorizedException { - if (internalCall) + if (isInternal) return; validateLogin(); ensureHasPermission(perm, DataResource.root()); @@ -149,7 +172,7 @@ public class ClientState throws UnauthorizedException, InvalidRequestException { validateKeyspace(keyspace); - if (internalCall) + if (isInternal) return; validateLogin(); preventSystemKSSchemaModification(keyspace, resource, perm); @@ -164,10 +187,9 @@ public class ClientState public void ensureHasPermission(Permission perm, IResource resource) throws UnauthorizedException { for (IResource r : Resources.chain(resource)) - { if (authorize(r).contains(perm)) return; - } + throw new UnauthorizedException(String.format("User %s has no %s permission on %s or any of its parents", user.getName(), perm, http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index 90bbaad..12fc392 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -36,6 +36,14 @@ public class QueryState this.clientState = clientState; } + /** + * @return a QueryState object for internal C* calls (not limited by any kind of auth). + */ + public static QueryState forInternalCalls() + { + return new QueryState(ClientState.forInternalCalls()); + } + public ClientState getClientState() { return clientState; http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/thrift/ThriftClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftClientState.java b/src/java/org/apache/cassandra/thrift/ThriftClientState.java index 795ac51..56dcf6f 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftClientState.java +++ b/src/java/org/apache/cassandra/thrift/ThriftClientState.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.thrift; +import java.net.SocketAddress; import java.util.*; import org.apache.cassandra.config.DatabaseDescriptor; @@ -38,15 +39,17 @@ public class ThriftClientState extends ClientState private final QueryState queryState; // An LRU map of prepared statements - private final Map<Integer, CQLStatement> prepared = new LinkedHashMap<Integer, CQLStatement>(16, 0.75f, true) { - protected boolean removeEldestEntry(Map.Entry<Integer, CQLStatement> eldest) { + private final Map<Integer, CQLStatement> prepared = new LinkedHashMap<Integer, CQLStatement>(16, 0.75f, true) + { + protected boolean removeEldestEntry(Map.Entry<Integer, CQLStatement> eldest) + { return size() > MAX_CACHE_PREPARED; } }; - public ThriftClientState() + public ThriftClientState(SocketAddress remoteAddress) { - super(); + super(remoteAddress); this.queryState = new QueryState(this); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java index bbc4bff..daf2f9c 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java +++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java @@ -35,8 +35,8 @@ public class ThriftSessionManager private static final Logger logger = LoggerFactory.getLogger(ThriftSessionManager.class); public final static ThriftSessionManager instance = new ThriftSessionManager(); - private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<SocketAddress>(); - private final Map<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<SocketAddress, ThriftClientState>(); + private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<>(); + private final Map<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>(); /** * @param socket the address on which the current thread will work on requests for until further notice @@ -57,7 +57,7 @@ public class ThriftSessionManager ThriftClientState cState = activeSocketSessions.get(socket); if (cState == null) { - cState = new ThriftClientState(); + cState = new ThriftClientState(socket); activeSocketSessions.put(socket, cState); } return cState; http://git-wip-us.apache.org/repos/asf/cassandra/blob/326d5454/src/java/org/apache/cassandra/transport/ServerConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java index cb9081c..9bc07cb 100644 --- a/src/java/org/apache/cassandra/transport/ServerConnection.java +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -23,16 +23,16 @@ import org.jboss.netty.channel.Channel; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.ISaslAwareAuthenticator; +import org.apache.cassandra.auth.ISaslAwareAuthenticator.SaslAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.auth.ISaslAwareAuthenticator.SaslAuthenticator; import org.cliffc.high_scale_lib.NonBlockingHashMap; public class ServerConnection extends Connection { - private enum State { UNINITIALIZED, AUTHENTICATION, READY; } + private enum State { UNINITIALIZED, AUTHENTICATION, READY } private volatile SaslAuthenticator saslAuthenticator; private final ClientState clientState; @@ -43,7 +43,7 @@ public class ServerConnection extends Connection public ServerConnection(Channel channel, int version, Connection.Tracker tracker) { super(channel, version, tracker); - this.clientState = new ClientState(); + this.clientState = ClientState.forExternalCalls(channel.getRemoteAddress()); this.state = State.UNINITIALIZED; }