Hi,

I'm testing a pipeline using Python Beam and the DirectRunner that reads from a 
Pub/Sub subscription however after "some time" (i.e. I don't quite see a 
predictable pattern yet) I get a bundle failure exception that flags a failure 
with a gRPC call to Pub/Sub:

ERROR:apache_beam.runners.direct.executor:Exception at bundle 
<apache_beam.runners.direct.bundle_factory._Bundle object at 0x120aaf550>, due 
to an exception.
 Traceback (most recent call last):
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in 
error_remapped_callable
    return callable_(*args, **kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/grpc/_channel.py", 
line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "<redacted venv path>/lib/python3.7/site-packages/grpc/_channel.py", 
line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.DEADLINE_EXCEEDED
    details = "Deadline Exceeded"
    debug_error_string = 
"{"created":"@1591255982.041826000","description":"Deadline 
Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}"

This then causes a number of other knock-on exceptions (shown farther below). 
When this happens, looking at the subscription its clear that the number of 
unacked/old messages both start increasing and there's a long ensuing delay 
until messages are output by downstream GBK/Combine operations. Not really sure 
how to begin debugging this and would love to get the communities feedback.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<redacted venv 
path>/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 
381, in call
    finish_state)
  File "<redacted venv 
path>/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 
421, in attempt_call
    result = evaluator.finish_bundle()
  File "<redacted venv 
path>/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
 line 574, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "<redacted venv 
path>/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
 line 562, in _read_from_pubsub
    return_immediately=True)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, 
in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/cloud/pubsub_v1/gapic/subscriber_client.py",
 line 1005, in pull
    request, retry=retry, timeout=timeout, metadata=metadata
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 
143, in __call__
    return wrapped_func(*args, **kwargs)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in 
retry_wrapped_func
    on_error=on_error,
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in 
retry_target
    return target()
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in 
func_with_timeout
    return func(*args, **kwargs)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in 
error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded

ERROR:grpc._plugin_wrapping:AuthMetadataPluginCallback 
"<google.auth.transport.grpc.AuthMetadataPlugin object at 0x12093c1d0>" raised 
exception!
Traceback (most recent call last):
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in 
create_connection
    raise err
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in 
create_connection
    sock.connect(sa)
TimeoutError: [Errno 60] Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in 
urlopen
    chunked=chunked,
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 381, in 
_make_request
    self._validate_conn(conn)
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 976, in 
_validate_conn
    conn.connect()
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connection.py", line 308, in connect
    conn = self._new_conn()
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection 
object at 0x120257d90>: Failed to establish a new connection: [Errno 60] 
Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/requests/adapters.py", 
line 449, in send
    timeout=timeout
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 725, in 
urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "<redacted venv 
path>/lib/python3.7/site-packages/urllib3/util/retry.py", line 439, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: 
HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries 
exceeded with url: /token (Caused by 
NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: 
Failed to establish a new connection: [Errno 60] Operation timed out'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/auth/transport/requests.py", line 181, 
in __call__
    method, url, data=body, headers=headers, timeout=timeout, **kwargs
  File "<redacted venv path>/lib/python3.7/site-packages/requests/sessions.py", 
line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/requests/sessions.py", 
line 643, in send
    r = adapter.send(request, **kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/requests/adapters.py", 
line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: 
HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries 
exceeded with url: /token (Caused by 
NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: 
Failed to establish a new connection: [Errno 60] Operation timed out'))

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<redacted venv 
path>/lib/python3.7/site-packages/grpc/_plugin_wrapping.py", line 78, in 
__call__
    context, _AuthMetadataPluginCallback(callback_state, callback))
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 84, in 
__call__
    callback(self._get_authorization_headers(context), None)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 71, in 
_get_authorization_headers
    self._request, context.method_name, context.service_url, headers
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/auth/credentials.py", line 124, in 
before_request
    self.refresh(request)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/oauth2/service_account.py", line 334, 
in refresh
    access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, 
assertion)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/oauth2/_client.py", line 153, in 
jwt_grant
    response_data = _token_endpoint_request(request, token_uri, body)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/oauth2/_client.py", line 105, in 
_token_endpoint_request
    response = request(method="POST", url=token_uri, headers=headers, body=body)
  File "<redacted venv 
path>/lib/python3.7/site-packages/google/auth/transport/requests.py", line 186, 
in __call__
    six.raise_from(new_exc, caught_exc)
  File "<string>", line 3, in raise_from
google.auth.exceptions.TransportError: 
HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries 
exceeded with url: /token (Caused by 
NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: 
Failed to establish a new connection: [Errno 60] Operation timed out'))

-Pradip

Reply via email to