Hi, I'm testing a pipeline using Python Beam and the DirectRunner that reads from a Pub/Sub subscription however after "some time" (i.e. I don't quite see a predictable pattern yet) I get a bundle failure exception that flags a failure with a gRPC call to Pub/Sub:
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x120aaf550>, due to an exception. Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable return callable_(*args, **kwargs) File "<redacted venv path>/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__ return _end_unary_response_blocking(state, call, False, None) File "<redacted venv path>/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking raise _InactiveRpcError(state) grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.DEADLINE_EXCEEDED details = "Deadline Exceeded" debug_error_string = "{"created":"@1591255982.041826000","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}" This then causes a number of other knock-on exceptions (shown farther below). When this happens, looking at the subscription its clear that the number of unacked/old messages both start increasing and there's a long ensuing delay until messages are output by downstream GBK/Combine operations. Not really sure how to begin debugging this and would love to get the communities feedback. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 381, in call finish_state) File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 421, in attempt_call result = evaluator.finish_bundle() File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 574, in finish_bundle data = self._read_from_pubsub(self.source.timestamp_attribute) File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 562, in _read_from_pubsub return_immediately=True) File "<redacted venv path>/lib/python3.7/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda> fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw) # noqa File "<redacted venv path>/lib/python3.7/site-packages/google/cloud/pubsub_v1/gapic/subscriber_client.py", line 1005, in pull request, retry=retry, timeout=timeout, metadata=metadata File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__ return wrapped_func(*args, **kwargs) File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func on_error=on_error, File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target return target() File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout return func(*args, **kwargs) File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line 3, in raise_from google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded ERROR:grpc._plugin_wrapping:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x12093c1d0>" raised exception! Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn (self._dns_host, self.port), self.timeout, **extra_kw File "<redacted venv path>/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection raise err File "<redacted venv path>/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection sock.connect(sa) TimeoutError: [Errno 60] Operation timed out During handling of the above exception, another exception occurred: Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen chunked=chunked, File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 381, in _make_request self._validate_conn(conn) File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 976, in _validate_conn conn.connect() File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connection.py", line 308, in connect conn = self._new_conn() File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn self, "Failed to establish a new connection: %s" % e urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out During handling of the above exception, another exception occurred: Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/requests/adapters.py", line 449, in send timeout=timeout File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 725, in urlopen method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2] File "<redacted venv path>/lib/python3.7/site-packages/urllib3/util/retry.py", line 439, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out')) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/requests.py", line 181, in __call__ method, url, data=body, headers=headers, timeout=timeout, **kwargs File "<redacted venv path>/lib/python3.7/site-packages/requests/sessions.py", line 530, in request resp = self.send(prep, **send_kwargs) File "<redacted venv path>/lib/python3.7/site-packages/requests/sessions.py", line 643, in send r = adapter.send(request, **kwargs) File "<redacted venv path>/lib/python3.7/site-packages/requests/adapters.py", line 516, in send raise ConnectionError(e, request=request) requests.exceptions.ConnectionError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out')) The above exception was the direct cause of the following exception: Traceback (most recent call last): File "<redacted venv path>/lib/python3.7/site-packages/grpc/_plugin_wrapping.py", line 78, in __call__ context, _AuthMetadataPluginCallback(callback_state, callback)) File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 84, in __call__ callback(self._get_authorization_headers(context), None) File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 71, in _get_authorization_headers self._request, context.method_name, context.service_url, headers File "<redacted venv path>/lib/python3.7/site-packages/google/auth/credentials.py", line 124, in before_request self.refresh(request) File "<redacted venv path>/lib/python3.7/site-packages/google/oauth2/service_account.py", line 334, in refresh access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion) File "<redacted venv path>/lib/python3.7/site-packages/google/oauth2/_client.py", line 153, in jwt_grant response_data = _token_endpoint_request(request, token_uri, body) File "<redacted venv path>/lib/python3.7/site-packages/google/oauth2/_client.py", line 105, in _token_endpoint_request response = request(method="POST", url=token_uri, headers=headers, body=body) File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/requests.py", line 186, in __call__ six.raise_from(new_exc, caught_exc) File "<string>", line 3, in raise_from google.auth.exceptions.TransportError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out')) -Pradip