When an AWS cloudwatch event is passed to a consumer it looks like this: { "awslogs": { "data": "ewogICAgIm1l..." } }
To get the actual message I do this: def _decode(data): compressed_payload = b64decode(data) json_payload = zlib.decompress(compressed_payload, 16+zlib.MAX_WBITS) return json.loads(json_payload) message = _decode(json.dumps(event['awslogs']['data'])) This returns the log message as a string. For my unit tests I need to reverse this - given a message as a string I want to generate the compressed, encoded event structure. I have not been able to get this to work. I have this: message = b'test message' compressed= zlib.compress(message) event['awslogs']['data'] = str(compressed) message = _decode(json.dumps(event['awslogs']['data'])) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 3, in _decode zlib.error: Error -3 while decompressing data: incorrect header check Anyone see how to make this work? TIA! -- https://mail.python.org/mailman/listinfo/python-list