Refactor AWS v4 signing into smaller tested functions Signed-off-by: Tomaz Muraus <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/7bb56899 Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/7bb56899 Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/7bb56899 Branch: refs/heads/trunk Commit: 7bb56899ed76cbae162c6264eddfde3602756f0c Parents: bf5e60e Author: Gertjan Oude Lohuis <[email protected]> Authored: Thu Mar 5 15:08:08 2015 +0100 Committer: Tomaz Muraus <[email protected]> Committed: Fri Mar 6 15:58:12 2015 +0100 ---------------------------------------------------------------------- libcloud/common/aws.py | 106 ++++++++++------- libcloud/test/common/test_aws.py | 211 ++++++++++++++++++++++++++++++++++ 2 files changed, 277 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/libcloud/blob/7bb56899/libcloud/common/aws.py ---------------------------------------------------------------------- diff --git a/libcloud/common/aws.py b/libcloud/common/aws.py index 4bee406..9cc269d 100644 --- a/libcloud/common/aws.py +++ b/libcloud/common/aws.py @@ -194,52 +194,78 @@ class V4SignedAWSConnection(AWSTokenConnection): return params, headers def _get_authorization_v4_header(self, params, headers, dt): - # TODO: according to AWS spec (and RFC 2616 Section 4.2.) excess whitespace - # from inside non-quoted strings should be stripped. Now we only strip the - # start and end of the string. See http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html + assert self.method == 'GET', 'AWS Signature V4 not implemented for other methods than GET' + + return 'AWS4-HMAC-SHA256 Credential=%(u)s/%(c)s, SignedHeaders=%(sh)s, Signature=%(s)s' % { + 'u': self.user_id, + 'c': self._get_credential_scope(dt), + 'sh': self._get_signed_headers(headers), + 's': self._get_signature(params, headers, dt) + } + + def _get_signature(self, params, headers, dt): + return _sign( + self._get_key_to_sign_with(dt), + self._get_string_to_sign(params, headers, dt), + hex=True) + + def _get_key_to_sign_with(self, dt): + return _sign( + _sign( + _sign( + _sign(('AWS4' + self.key), dt.strftime('%Y%m%d')), + self.driver.region_name), + self.service_name), + 'aws4_request') + + def _get_string_to_sign(self, params, headers, dt): + credential_scope = self._get_credential_scope(dt) + canonical_request = self._get_canonical_request(params, headers) + + return '\n'.join(['AWS4-HMAC-SHA256', + dt.strftime('%Y%m%dT%H%M%SZ'), + credential_scope, + hashlib.sha256(canonical_request).hexdigest()]) + + def _get_credential_scope(self, dt): + return '/'.join([dt.strftime('%Y%m%d'), + self.driver.region_name, + self.service_name, + 'aws4_request']) + + def _get_signed_headers(self, headers): + return ';'.join([k.lower() for k in sorted(headers.keys())]) + + def _get_canonical_headers(self, headers): canonical_headers = '\n'.join([':'.join([k.lower(), v.strip()]) for k, v in sorted(headers.items())]) canonical_headers += '\n' + return canonical_headers - signed_headers = ';'.join([k.lower() for k in sorted(headers.keys())]) + def _get_payload_hash(self): + return hashlib.sha256('').hexdigest() + def _get_request_params(self, params): # For self.method == GET - request_params = '&'.join(["%s=%s" % (urlquote(str(k), safe=''), urlquote(str(v), safe='-_~')) - for k, v in sorted(params.items())]) - payload_hash = hashlib.sha256('').hexdigest() - - canonical_request = '\n'.join([self.method, - self.action, - request_params, - canonical_headers, - signed_headers, - payload_hash]) - - credential_scope = '/'.join([dt.strftime('%Y%m%d'), - self.driver.region_name, - self.service_name, - 'aws4_request']) - string_to_sign = '\n'.join(['AWS4-HMAC-SHA256', - dt.strftime('%Y%m%dT%H%M%SZ'), - credential_scope, - hashlib.sha256(canonical_request).hexdigest()]) - - # Key derivation functions. See: - # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python - def getSignatureKey(key, date_stamp, regionName, serviceName): - def sign(key, msg): - return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() - - signed_date = sign(('AWS4' + key).encode('utf-8'), date_stamp) - signed_region = sign(signed_date, regionName) - signed_service = sign(signed_region, serviceName) - return sign(signed_service, 'aws4_request') - - signing_key = getSignatureKey(self.key, dt.strftime('%Y%m%d'), self.driver.region_name, self.service_name) - signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest() - - return 'AWS4-HMAC-SHA256 Credential=%s/%s, SignedHeaders=%s, Signature=%s' % \ - (self.user_id, credential_scope, signed_headers, signature) + return '&'.join(["%s=%s" % (urlquote(k, safe=''), urlquote(str(v), safe='~')) + for k, v in sorted(params.items())]) + + def _get_canonical_request(self, params, headers): + return '\n'.join([ + self.method, + self.action, + self._get_request_params(params), + self._get_canonical_headers(headers), + self._get_signed_headers(headers), + self._get_payload_hash() + ]) + + +def _sign(key, msg, hex=False): + if hex: + return hmac.new(key, b(msg), hashlib.sha256).hexdigest() + else: + return hmac.new(key, b(msg), hashlib.sha256).digest() class AWSDriver(BaseDriver): http://git-wip-us.apache.org/repos/asf/libcloud/blob/7bb56899/libcloud/test/common/test_aws.py ---------------------------------------------------------------------- diff --git a/libcloud/test/common/test_aws.py b/libcloud/test/common/test_aws.py new file mode 100644 index 0000000..1f207df --- /dev/null +++ b/libcloud/test/common/test_aws.py @@ -0,0 +1,211 @@ +from datetime import datetime +import unittest +import mock +from libcloud.common.aws import V4SignedAWSConnection + + +class EC2MockDriver(object): + region_name = 'my_region' + + +class V4SignedAWSConnectionTest(unittest.TestCase): + + def setUp(self): + V4SignedAWSConnection.service_name = 'my_service' + V4SignedAWSConnection.method = 'GET' + V4SignedAWSConnection.action = '/my_action/' + V4SignedAWSConnection.driver = EC2MockDriver() + + self.conn = V4SignedAWSConnection('my_key', 'my_secret') + self.now = datetime(2015, 3, 4, hour=17, minute=34, second=52) + + def test_v4_signature(self): + sig = self.conn._get_authorization_v4_header({ + 'Action': 'DescribeInstances', + 'Version': '2013-10-15' + }, { + 'Host': 'ec2.eu-west-1.amazonaws.com', + 'Accept-Encoding': 'gzip,deflate', + 'X-AMZ-Date': '20150304T173452Z', + 'User-Agent': 'libcloud/0.17.0 (Amazon EC2 (eu-central-1)) ' + }, self.now) + self.assertEqual(sig, 'AWS4-HMAC-SHA256 ' + 'Credential=my_key/20150304/my_region/my_service/aws4_request, ' + 'SignedHeaders=accept-encoding;host;user-agent;x-amz-date, ' + 'Signature=f9868f8414b3c3f856c7955019cc1691265541f5162b9b772d26044280d39bd3') + + def test_v4_signature_raises_error_if_request_method_not_GET(self): + V4SignedAWSConnection.method = 'POST' + + with self.assertRaises(Exception): + self.conn._get_authorization_v4_header({}, {}, self.now) + + def test_v4_signature_contains_user_id(self): + sig = self.conn._get_authorization_v4_header({}, {}, self.now) + self.assertIn('Credential=my_key/', sig) + + def test_v4_signature_contains_credential_scope(self): + with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_credential_scope') as mock_get_creds: + mock_get_creds.return_value = 'my_credential_scope' + sig = self.conn._get_authorization_v4_header({}, {}, self.now) + + self.assertIn('Credential=my_key/my_credential_scope, ', sig) + + def test_v4_signature_contains_signed_headers(self): + with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_signed_headers') as mock_get_headers: + mock_get_headers.return_value = 'my_signed_headers' + sig = self.conn._get_authorization_v4_header({}, {}, self.now) + self.assertIn('SignedHeaders=my_signed_headers, ', sig) + + def test_v4_signature_contains_signature(self): + with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_signature') as mock_get_signature: + mock_get_signature.return_value = 'my_signature' + sig = self.conn._get_authorization_v4_header({}, {}, self.now) + self.assertIn('Signature=my_signature', sig) + + def test_get_signature_(self): + def _sign(key, msg, hex=False): + if hex: + return 'H|%s|%s' % (key, msg) + else: + return '%s|%s' % (key, msg) + + with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_key_to_sign_with') as mock_get_key: + with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_string_to_sign') as mock_get_string: + with mock.patch('libcloud.common.aws._sign', new=_sign): + mock_get_key.return_value = 'my_signing_key' + mock_get_string.return_value = 'my_string_to_sign' + sig = self.conn._get_signature({}, {}, self.now) + + self.assertEqual(sig, 'H|my_signing_key|my_string_to_sign') + + def test_get_string_to_sign(self): + with mock.patch('hashlib.sha256') as mock_sha256: + mock_sha256.return_value.hexdigest.return_value = 'chksum_of_canonical_request' + to_sign = self.conn._get_string_to_sign({}, {}, self.now) + + self.assertEqual(to_sign, + 'AWS4-HMAC-SHA256\n' + '20150304T173452Z\n' + '20150304/my_region/my_service/aws4_request\n' + 'chksum_of_canonical_request') + + def test_get_key_to_sign_with(self): + def _sign(key, msg, hex=False): + return '%s|%s' % (key, msg) + + with mock.patch('libcloud.common.aws._sign', new=_sign): + key = self.conn._get_key_to_sign_with(self.now) + + self.assertEqual(key, 'AWS4my_secret|20150304|my_region|my_service|aws4_request') + + def test_get_signed_headers_contains_all_headers_lowercased(self): + headers = {'Content-Type': 'text/plain', 'Host': 'my_host', 'X-Special-Header': ''} + signed_headers = self.conn._get_signed_headers(headers) + + self.assertIn('content-type', signed_headers) + self.assertIn('host', signed_headers) + self.assertIn('x-special-header', signed_headers) + + def test_get_signed_headers_concats_headers_sorted_lexically(self): + headers = {'Host': 'my_host', 'X-Special-Header': '', '1St-Header': '2', 'Content-Type': 'text/plain'} + signed_headers = self.conn._get_signed_headers(headers) + + self.assertEqual(signed_headers, '1st-header;content-type;host;x-special-header') + + def test_get_credential_scope(self): + scope = self.conn._get_credential_scope(self.now) + self.assertEqual(scope, '20150304/my_region/my_service/aws4_request') + + def test_get_canonical_headers_joins_all_headers(self): + headers = { + 'accept-encoding': 'gzip,deflate', + 'host': 'my_host', + } + self.assertEqual(self.conn._get_canonical_headers(headers), + 'accept-encoding:gzip,deflate\n' + 'host:my_host\n') + + def test_get_canonical_headers_sorts_headers_lexically(self): + headers = { + 'accept-encoding': 'gzip,deflate', + 'host': 'my_host', + '1st-header': '2', + 'x-amz-date': '20150304T173452Z', + 'user-agent': 'my-ua' + } + self.assertEqual(self.conn._get_canonical_headers(headers), + '1st-header:2\n' + 'accept-encoding:gzip,deflate\n' + 'host:my_host\n' + 'user-agent:my-ua\n' + 'x-amz-date:20150304T173452Z\n') + + def test_get_canonical_headers_lowercases_headers_names(self): + headers = { + 'Accept-Encoding': 'GZIP,DEFLATE', + 'User-Agent': 'My-UA' + } + self.assertEqual(self.conn._get_canonical_headers(headers), + 'accept-encoding:GZIP,DEFLATE\n' + 'user-agent:My-UA\n') + + def test_get_canonical_headers_trims_header_values(self): + # TODO: according to AWS spec (and RFC 2616 Section 4.2.) excess whitespace + # from inside non-quoted strings should be stripped. Now we only strip the + # start and end of the string. See + # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html + headers = { + 'accept-encoding': ' gzip,deflate', + 'user-agent': 'libcloud/0.17.0 ' + } + self.assertEqual(self.conn._get_canonical_headers(headers), + 'accept-encoding:gzip,deflate\n' + 'user-agent:libcloud/0.17.0\n') + + def test_get_request_params_joins_params_sorted_lexically(self): + self.assertEqual(self.conn._get_request_params({ + 'Action': 'DescribeInstances', + 'Filter.1.Name': 'state', + 'Version': '2013-10-15' + }), + 'Action=DescribeInstances&Filter.1.Name=state&Version=2013-10-15') + + def test_get_request_params_allows_integers_as_value(self): + self.assertEqual(self.conn._get_request_params({'Action': 'DescribeInstances', 'Port': 22}), + 'Action=DescribeInstances&Port=22') + + def test_get_request_params_urlquotes_params_keys(self): + self.assertEqual(self.conn._get_request_params({'Action+Reaction': 'DescribeInstances'}), + 'Action%2BReaction=DescribeInstances') + + def test_get_request_params_urlquotes_params_values(self): + self.assertEqual(self.conn._get_request_params({ + 'Action': 'DescribeInstances&Addresses', + 'Port-Range': '2000 3000' + }), + 'Action=DescribeInstances%26Addresses&Port-Range=2000%203000') + + def test_get_request_params_urlquotes_params_values_allows_safe_chars_in_value(self): + # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html + self.assertEqual('Action=a~b.c_d-e', + self.conn._get_request_params({'Action': 'a~b.c_d-e'})) + + def test_get_payload_hash_returns_digest_of_empty_string_for_GET_requests(self): + V4SignedAWSConnection.method = 'GET' + self.assertEqual(self.conn._get_payload_hash(), + 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855') + + def test_get_canonical_request(self): + req = self.conn._get_canonical_request( + {'Action': 'DescribeInstances', 'Version': '2013-10-15'}, + {'Accept-Encoding': 'gzip,deflate', 'User-Agent': 'My-UA'} + ) + self.assertEqual(req, 'GET\n' + '/my_action/\n' + 'Action=DescribeInstances&Version=2013-10-15\n' + 'accept-encoding:gzip,deflate\n' + 'user-agent:My-UA\n' + '\n' + 'accept-encoding;user-agent\n' + 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
