This is an automated email from the ASF dual-hosted git repository.

Cole-Greer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/master by this push:
     new 86147b9e68 Standardized `gremlin-python` connection options (#3469)
86147b9e68 is described below

commit 86147b9e684ea1508b02d3525996558c22e369d4
Author: Guian Gumpac <[email protected]>
AuthorDate: Fri Jun 26 19:46:51 2026 -0700

    Standardized `gremlin-python` connection options (#3469)
    
    # Standardize `gremlin-python` connection options
    
    Implements the Python portion of the TinkerPop 4.x GLV connection-options 
standardization. Renames two options to canonical names (old names kept as 
deprecated aliases), adds a set of new options, removes two, and raises the 
minimum `aiohttp` floor. Python driver changes only; the other GLVs follow in 
separate PRs.
    
    **Proposal:** 
https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg
    
    ## Renames (deprecated aliases retained)
    
    | Old           | New               | Default      |
    | ------------- | ----------------- | ------------ |
    | `pool_size`   | `max_connections` | 128 (was 8)  |
    | `ssl_options` | `ssl`             | -            |
    
    The old names still work but emit a `DeprecationWarning` (marked "As of 
release 4.0.0, ...") and should be migrated. `max_connections` is now also 
applied to the aiohttp `TCPConnector` `limit`, so the transport layer reflects 
the option in addition to sizing the connection pool.
    
    ## Behavior changes (breaking)
    * **`compression`** is a new option defaulting to `'deflate'` (on); the 
driver advertises `Accept-Encoding: deflate` by default. Set 
`compression='none'` to disable, which also suppresses aiohttp's automatic 
`Accept-Encoding` injection so compression is not silently negotiated. 
Defaulting on is a deliberate deviation from the proposal's agreed default-off, 
applied consistently across the GLVs by later agreement.
    * **`connect_timeout`** defaults to 5s. Combined with the existing 
`read_timeout`, it is wired into a single aiohttp `ClientTimeout` via the 
socket-level `sock_connect`/`sock_read` knobs (not a whole-request `total`), so 
long but legitimate streaming responses are not aborted while a stalled server 
no longer hangs forever.
    * The minimum supported **`aiohttp`** is raised to `3.11` (required for the 
`socket_factory` used by `keep_alive_time`).
    
    ## New options
    * **`connect_timeout`** (5s) - socket-connect timeout (the existing 
`read_timeout` was rewired into the unified `ClientTimeout`).
    * **`idle_timeout`** (180s) - mapped to the aiohttp `TCPConnector` 
keep-alive timeout.
    * **`keep_alive_time`** (30s) - enables TCP keep-alive probes via the 
connector socket factory (`TCP_KEEPIDLE`/`TCP_KEEPALIVE`, guarded by platform 
availability).
    * **`compression`** (`'none'`/`'deflate'`, default `'deflate'`) - the wire 
compression negotiated with the server.
    * **`default_batch_size`** (64) - connection-level default that fills the 
per-request `batchSize` when unset.
    * **`proxy`** and **`trust_env`** - explicit HTTP proxy and 
environment-trust options for the aiohttp transport.
    * **`auth.sigv4`** gains a credentials-provider variant accepting an 
optional credentials provider or callable, falling back to the AWS environment 
variables.
    
    ## Removed (breaking)
    * **`max_content_length`** - previously accepted but discarded.
    * **`headers`** kwarg on `Client`/`DriverRemoteConnection` - custom headers 
must now be set via an interceptor.
    
    ## Bug fix
    * Fixed `Client.submit`/`submit_async` mutating a caller-supplied 
`RequestMessage` in place; the message fields are now cloned before applying 
`request_options`/`default_batch_size`, so resubmitting the same message no 
longer accumulates state (matching the no-mutate contract of the .NET/JS 
drivers). This is required for the new `default_batch_size` to fill safely.
    
    ## Testing
    * `gremlin-python` unit tests pass, including new `test_client_options`, 
`test_connection_options`, and `test_transport_compression` suites. The 
compression suite uses an in-process aiohttp loopback server to verify real 
`Accept-Encoding` negotiation and transparent deflate decompression over a 
socket.
    * CHANGELOG, reference config table (`gremlin-variants.asciidoc`), and 
upgrade guide (`release-4.x.x.asciidoc`) updated for the Python slice.
    
    ## Notes
    * The minimum `aiohttp` is raised from `3.8` to `3.11` (`pyproject.toml`). 
This is a dependency-floor bump, required for the 
`TCPConnector(socket_factory=...)` API that backs `keep_alive_time`; users 
pinned to `aiohttp` 3.8-3.10 must upgrade.
    
    Assisted-by: Kiro: Claude Opus 4.8
---
 CHANGELOG.asciidoc                                 |  16 +-
 docs/src/reference/gremlin-variants.asciidoc       |  49 +++-
 docs/src/upgrade/release-4.x.x.asciidoc            |  53 +++-
 .../src/main/python/examples/connections.py        |  14 +-
 .../gremlin_python/driver/aiohttp/transport.py     | 186 ++++++++++--
 .../src/main/python/gremlin_python/driver/auth.py  |  49 +++-
 .../main/python/gremlin_python/driver/client.py    |  34 ++-
 .../python/gremlin_python/driver/connection.py     |   7 +-
 .../driver/driver_remote_connection.py             |  14 +-
 gremlin-python/src/main/python/pyproject.toml      |   2 +-
 .../src/main/python/tests/integration/conftest.py  |   4 +-
 .../python/tests/integration/driver/test_client.py |  47 +++-
 .../integration/driver/test_client_behavior.py     |   2 +-
 .../test_driver_remote_connection_threaded.py      |   2 +-
 .../tests/unit/driver/test_client_options.py       | 264 +++++++++++++++++
 .../tests/unit/driver/test_connection_options.py   | 312 +++++++++++++++++++++
 16 files changed, 976 insertions(+), 79 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 31085a634c..d2e0ad55cf 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,21 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 [[release-4-0-0]]
 === TinkerPop 4.0.0 (Release Date: NOT OFFICIALLY RELEASED YET)
 
+* Standardized `gremlin-python` connection options per the TinkerPop 4.x GLV 
proposal:
+** Renamed `pool_size` to `max_connections` (breaking; the old name has been 
removed) and changed the default from 8 to 128; `max_connections` is now also 
applied to the aiohttp `TCPConnector` `limit` so the transport layer reflects 
the option in addition to sizing the Connection pool.
+** Renamed `ssl_options` to `ssl` accepting an `ssl.SSLContext` (breaking; the 
old name has been removed).
+** Added `connect_timeout` (default 5s) and rewired the existing 
`read_timeout` into a single aiohttp `ClientTimeout` 
(`sock_connect`/`sock_read`); the socket-level knobs are used rather than a 
whole-request `total` so long but legitimate streaming responses are not 
aborted, while a stalled server no longer hangs forever.
+** Added `idle_timeout` (default 180s) mapped to the aiohttp `TCPConnector` 
keep-alive timeout.
+** Added `keep_alive_time` (default 30s) enabling TCP keep-alive probes via 
the connector socket factory (`TCP_KEEPIDLE`/`TCP_KEEPALIVE` guarded by 
platform availability); this required raising the minimum `aiohttp` to `3.11` 
(the `socket_factory` floor). *(breaking)*
+** Each timeout option is named with a millisecond suffix as the primary form 
(`connect_timeout_millis`, `idle_timeout_millis`, `read_timeout_millis`, 
`keep_alive_time_millis`); the unsuffixed canonical name (`connect_timeout`, 
etc.) accepts the idiomatic seconds value. Both resolve to seconds internally 
for aiohttp.
+** Added `compression` accepting `'none'`/`'deflate'`, default `'deflate'` 
(on), advertising `Accept-Encoding: deflate` by default; when set to `'none'`, 
aiohttp's automatic `Accept-Encoding` injection is suppressed so compression is 
not silently negotiated.
+** Added `batch_size` (default 64) connection-level default that fills the 
per-request `batchSize` when unset.
+** Added explicit HTTP `proxy` and `trust_env` options surfaced on the aiohttp 
`ClientSession`.
+** Added a credentials-provider variant to `auth.sigv4` that accepts an 
optional credentials provider or callable, falling back to the AWS environment 
variables.
+** Removed the `max_content_length` kwarg from the `gremlin-python` driver 
(previously accepted but discarded).
+** Removed the standalone `headers` kwarg from `gremlin-python` 
`Client`/`DriverRemoteConnection`; custom headers must now be set via 
interceptors.
+** Fixed `gremlin-python` `Client.submit`/`submit_async` mutating a 
caller-supplied `RequestMessage` in place; the message fields are now cloned 
before applying `request_options`/`batch_size`, so resubmitting the same 
message no longer accumulates state, matching the no-mutate contract of the 
.NET/JS drivers.
+** Fixed `gremlin-python` `Client.submit`/`submit_async` mutating a 
caller-supplied `RequestMessage` in place; the message fields are now cloned 
before applying `request_options`/`default_batch_size`, so resubmitting the 
same message no longer accumulates state, matching the no-mutate contract of 
the .NET/JS drivers.
 * Standardized `gremlin-driver` (Java) connection options
 ** Renamed the `connectionSetupTimeoutMillis` builder option to 
`connectTimeout` (default lowered from 15s to 5s) and actually wired it to 
`ChannelOption.CONNECT_TIMEOUT_MILLIS` on the connection bootstrap; it 
previously was validated but never applied. The old name is removed. Settable 
as `connectTimeout(Duration)` or `connectTimeoutMillis(int)` (YAML key 
`connectionPool.connectTimeoutMillis`). *(breaking)*
 ** Renamed `maxConnectionPoolSize` to `maxConnections` (default 128), 
`idleConnectionTimeoutMillis` to `idleTimeout` (default 180s), and 
`resultIterationBatchSize` to `batchSize` (default 64; the connection-level 
default that fills a request's per-request `batchSize` when unset, mirroring 
how `bulkResults` shares one name across scopes); the old builder methods, 
accessors, and YAML keys are removed. *(breaking)*
@@ -61,7 +76,6 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Added `Character`, `Binary`, `Duration` to the `gremlin-lang` grammar, GLVs, 
and Translators.
 * Refactored `gremlin-python` receive path to stream results directly from the 
HTTP response through the configured `response_serializer`.
 * Replaced `read()` with `get_stream()` on `AbstractBaseTransport` in 
`gremlin-python` for streaming deserialization.
-* Removed `max_content_length` from `gremlin-python` driver (no longer 
applicable with streaming deserialization).
 * Removed GraphSON parameterized test fixtures from `gremlin-python` 
integration tests (GraphBinary only).
 * Removed `AbstractBaseTransport`, `AbstractBaseProtocol`, and 
`GremlinServerHTTPProtocol` from `gremlin-python`, merging protocol logic into 
`Connection`.
 * Removed `GraphSONSerializersV4` and the 
`gremlin_python.structure.io.graphsonV4` module from `gremlin-python`. 
GraphBinary is the only supported wire format.
diff --git a/docs/src/reference/gremlin-variants.asciidoc 
b/docs/src/reference/gremlin-variants.asciidoc
index b78f894aa8..3cca5fe43c 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -3177,13 +3177,15 @@ the "g" provided to the `DriverRemoteConnection` 
corresponds to the name of a `G
 g = 
traversal().with_(DriverRemoteConnection('http://localhost:8182/gremlin','g'))
 ----
 
-If you need to send additional headers in the HTTP connection, you can pass an 
optional `headers` parameter
-to the `DriverRemoteConnection` constructor.
+If you need to send additional headers on each HTTP request, set them via a 
request interceptor rather than a
+connection parameter. Pass a callable to the `interceptors` keyword argument 
that mutates the request headers in
+place. See <<gremlin-python-interceptors, RequestInterceptor>> for details.
 
 [source,python]
 ----
 g = traversal().with_(DriverRemoteConnection(
-    'http://localhost:8182/gremlin', 'g', headers={'Header':'Value'}))
+    'http://localhost:8182/gremlin', 'g',
+    interceptors=[lambda req: req.headers.update({'Header': 'Value'})]))
 ----
 
 Gremlin-Python contains an `auth` module that provides built-in authentication 
functions. These are passed to the
@@ -3207,7 +3209,7 @@ with 'https://'. If Gremlin-Server uses a self-signed 
certificate for SSL, Greml
 the CA certificate file (in openssl .pem format), to be specified in the 
SSL_CERT_FILE environment variable.
 
 NOTE: If connecting from an inherently single-threaded Python process where 
blocking while waiting for Gremlin
-traversals to complete is acceptable, it might be helpful to set `pool_size` 
and `max_workers` parameters to 1.
+traversals to complete is acceptable, it might be helpful to set 
`max_connections` and `max_workers` parameters to 1.
 See the <<python-configuration,Configuration>> section just below.  Examples 
where this could apply are serverless cloud functions or WSGI
 worker processes.
 
@@ -3311,13 +3313,21 @@ can be passed to the `Client` or 
`DriverRemoteConnection` instance as keyword ar
 [width="100%",cols="3,10,^2",options="header"]
 |=========================================================
 |Key |Description |Default
-|headers |Additional headers that will be added to each request message. 
|`None`
-|max_workers |Maximum number of worker threads. |Number of CPUs * 5
-|request_serializer |The request serializer 
implementation.|`gremlin_python.driver.serializer.GraphBinarySerializersV4`
+|max_workers |Maximum number of worker threads. |Same as `max_connections` 
(128)
 |response_serializer |The response serializer 
implementation.|`gremlin_python.driver.serializer.GraphBinarySerializersV4`
-|interceptors |The request interceptors to run after request 
serialization.|`None`
+|interceptors |The request interceptors to run before the request body is 
serialized.|`None`
 |auth |An authentication interceptor. Always appended to the end of the 
interceptor list so it runs last. |`None`
-|pool_size |The number of connections used by the pool. |4
+|max_connections |The maximum number of connections used by the pool. |128
+|connect_timeout_millis |Timeout in milliseconds for establishing the 
connection (TCP connect plus TLS handshake). Also settable as `connect_timeout` 
(in seconds). |5000
+|read_timeout_millis |Per-read idle timeout in milliseconds applied while 
streaming a response. Resets per chunk. Also settable as `read_timeout` (in 
seconds). |`None`
+|write_timeout |Timeout in seconds for writing a request to the transport. 
|`None`
+|ssl |An `ssl.SSLContext` used for TLS connections. |`None`
+|idle_timeout_millis |How long in milliseconds an idle connection remains in 
the pool before being closed. Also settable as `idle_timeout` (in seconds). 
|180000
+|keep_alive_time_millis |TCP keep-alive idle time in milliseconds before 
probes begin on an otherwise idle connection. Also settable as 
`keep_alive_time` (in seconds). |30000
+|compression |The wire compression negotiated with the server (`'none'` or 
`'deflate'`). |`'deflate'`
+|batch_size |The connection-level default batch size used when a request does 
not specify one. |64
+|proxy |HTTP proxy URL routed through the underlying `ClientSession`. |`None`
+|trust_env |Whether to trust environment variables for proxy and SSL 
configuration. |False
 |enable_user_agent_on_connect |Enables sending a user agent to the server 
during connection requests.
 More details can be found in provider docs
 
link:https://tinkerpop.apache.org/docs/x.y.z/dev/provider/#_graph_driver_provider_requirements[here].|True
@@ -3334,10 +3344,29 @@ import ssl
 ...
 g = traversal().with_(
   DriverRemoteConnection('https://localhost:8182/gremlin','g',
-                         ssl_options=ssl.create_default_context(),
+                         ssl=ssl.create_default_context(),
                          read_timeout=30))
 ----
 
+Note that no driver timeout bounds the *total* duration of a request once it 
is under way. `read_timeout` only bounds
+the gap between response chunks, so a response that keeps producing chunks 
will not time out no matter how long it
+runs overall, and there is no client-side "overall" request timeout. If you 
need an absolute deadline, impose it in
+your application. `submit_async()` and `ResultSet.all()` return 
`concurrent.futures.Future` objects, so pass a
+`timeout` to `result()`:
+
+[source,python]
+----
+from concurrent.futures import TimeoutError
+
+# bound the entire request (submit plus full result iteration) to 30 seconds
+try:
+    result_set = client.submit_async("g.V().out().out()").result(timeout=30)
+    results = result_set.all().result(timeout=30)
+except TimeoutError:
+    # deadline exceeded; stop waiting on the request
+    ...
+----
+
 [[gremlin-python-interceptors]]
 === RequestInterceptor
 
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc 
b/docs/src/upgrade/release-4.x.x.asciidoc
index 7bbe5d6438..1558bcd027 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -32,6 +32,57 @@ complete list of all the modifications that are part of this 
release.
 
 === Upgrading for Users
 
+==== Standardizing Python Connection Options
+
+TinkerPop 4.x standardizes connection option names and defaults across the 
GLVs. In `gremlin-python`, several
+connection options passed to `Client`/`DriverRemoteConnection` have been 
renamed for consistency. Because this is a
+major version, the old names have been removed rather than retained as 
aliases, and a number of new options have been
+added. The notes below describe the Python changes. See <<glv-driver-changes, 
GLV Driver Changes>> for the equivalent changes in
+the other drivers.
+
+Renames (breaking). The following options have been renamed and the old names 
removed. Migrate to the new names:
+
+- `pool_size` is now `max_connections` (default raised from 8 to 128). 
`max_connections` is also applied to the
+  aiohttp `TCPConnector` `limit`, so the transport layer reflects the option 
in addition to sizing the connection
+  pool.
+- `ssl_options` is now `ssl` (accepts an `ssl.SSLContext`).
+
+Behavior changes. These change runtime behavior on upgrade, even if you do not 
change your configuration:
+
+- `compression` is a new option that defaults to `'deflate'` (on), so the 
driver advertises `Accept-Encoding: deflate`
+  by default. Set `compression='none'` to disable it; when disabled, aiohttp's 
automatic `Accept-Encoding` injection is
+  suppressed so compression is not silently negotiated. Defaulting compression 
on is a deliberate deviation from the
+  proposal's agreed default-off, applied consistently across the GLVs by later 
agreement.
+- `connect_timeout` now defaults to 5s. Together with `read_timeout` it is 
wired into a single aiohttp `ClientTimeout`
+  via the socket-level `sock_connect`/`sock_read` knobs rather than a 
whole-request `total`, so long but legitimate
+  streaming responses are not aborted while a stalled server no longer hangs 
forever.
+- The minimum supported `aiohttp` has been raised to `3.11` (required for the 
`socket_factory` used by
+  `keep_alive_time`). *(breaking)*
+
+New options:
+
+- `connect_timeout` (default 5s): a new socket-connect timeout. It is combined 
with the existing `read_timeout`
+  (rewired into a single aiohttp `ClientTimeout` via 
`sock_connect`/`sock_read`, as described above).
+  Each timeout option's primary form is the millisecond-suffixed name 
(`connect_timeout_millis`, `idle_timeout_millis`,
+  `read_timeout_millis`, `keep_alive_time_millis`); the unsuffixed canonical 
name (`connect_timeout`, etc.) accepts the
+  idiomatic seconds value.
+- `idle_timeout` (default 180s): mapped to the aiohttp `TCPConnector` 
keep-alive timeout.
+- `keep_alive_time` (default 30s): enables TCP keep-alive probes via the 
connector socket factory
+  (`TCP_KEEPIDLE`/`TCP_KEEPALIVE`, guarded by platform availability).
+- `compression` (`'none'`/`'deflate'`, default `'deflate'`): the wire 
compression negotiated with the server.
+- `batch_size` (default 64): a connection-level default that fills the 
per-request `batchSize` when unset.
+- `proxy` and `trust_env`: explicit HTTP proxy and environment-trust options 
surfaced on the aiohttp `ClientSession`.
+- `auth.sigv4` gains a credentials-provider variant that accepts an optional 
credentials provider or callable, falling
+  back to the AWS environment variables.
+
+Removals (breaking):
+
+- The `max_content_length` kwarg has been removed (it was previously accepted 
but discarded).
+- The standalone `headers` kwarg has been removed from 
`Client`/`DriverRemoteConnection`; custom headers must now be
+  set via interceptors.
+
+See: 
link:https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg[[DISCUSS] 
Standardizing GLV connection options in TinkerPop 4].
+
 ==== Standardizing Java Connection Options
 
 TinkerPop 4.x standardizes connection option names and defaults across the 
GLVs. In the Java reference driver
@@ -406,7 +457,7 @@ from gremlin_python.driver.aiohttp.transport import 
AiohttpHTTPTransport
 Client(url, 'g', transport_factory=lambda: 
AiohttpHTTPTransport(ssl_options=ctx, read_timeout=30))
 
 # After
-Client(url, 'g', ssl_options=ctx, read_timeout=30)
+Client(url, 'g', ssl=ctx, read_timeout=30)
 ----
 
 The `Connection` constructor signature has changed. The `protocol` and 
`transport_factory` positional arguments have
diff --git a/gremlin-python/src/main/python/examples/connections.py 
b/gremlin-python/src/main/python/examples/connections.py
index a496d730df..21f8434c86 100644
--- a/gremlin-python/src/main/python/examples/connections.py
+++ b/gremlin-python/src/main/python/examples/connections.py
@@ -65,7 +65,7 @@ def with_auth():
         ssl_opts.check_hostname = False
         ssl_opts.verify_mode = ssl.CERT_NONE
         rc = DriverRemoteConnection(server_url, 'g', auth=basic('stephen', 
'password'),
-                                    ssl_options=ssl_opts)
+                                    ssl=ssl_opts)
     else:
         rc = DriverRemoteConnection(server_url, 'g', auth=basic('stephen', 
'password'))
 
@@ -80,9 +80,19 @@ def with_auth():
 # connecting with customized configurations
 def with_configs():
     server_url = os.getenv('GREMLIN_SERVER_URL', 
'http://localhost:8182/gremlin').format(45940)
+    # Custom headers are no longer a connection kwarg; set them via an 
interceptor.
+    def add_headers(request):
+        request.headers['x-custom-header'] = 'example'
+
     rc = DriverRemoteConnection(
         server_url, 'g',
-        headers=None,
+        max_connections=8,
+        connect_timeout_millis=5000,
+        idle_timeout_millis=180000,
+        keep_alive_time_millis=30000,
+        batch_size=64,
+        compression='none',
+        interceptors=[add_headers],
     )
     g = traversal().with_remote(rc)
 
diff --git 
a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py 
b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
index bdd100ce24..2367f8aaaa 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
@@ -18,6 +18,7 @@
 #
 import aiohttp
 import asyncio
+import socket
 import sys
 
 if sys.version_info >= (3, 11):
@@ -27,6 +28,81 @@ else:
 
 __author__ = 'Lyndon Bauto ([email protected])'
 
+# Default connection option values (canonical TinkerPop 4.x GLV defaults). The 
millisecond-suffixed
+# options are the primary form (mirroring the other GLVs); aiohttp itself 
works in seconds, so the
+# values are converted internally.
+DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000
+DEFAULT_IDLE_TIMEOUT_MILLIS = 180000
+DEFAULT_KEEP_ALIVE_TIME_MILLIS = 30000
+# Seconds equivalents retained for internal use / backwards reference.
+DEFAULT_CONNECT_TIMEOUT = DEFAULT_CONNECT_TIMEOUT_MILLIS / 1000
+DEFAULT_IDLE_TIMEOUT = DEFAULT_IDLE_TIMEOUT_MILLIS / 1000
+DEFAULT_KEEP_ALIVE_TIME = DEFAULT_KEEP_ALIVE_TIME_MILLIS / 1000
+DEFAULT_COMPRESSION = 'deflate'
+
+def _resolve_timeout_seconds(millis, seconds, default_millis):
+    """Resolve a timeout to seconds (aiohttp's unit) from the ``*_millis`` 
number or the idiomatic
+    unsuffixed seconds number (``None`` means not supplied for either). 
Supplying both raises
+    ``ValueError``; if neither is given, ``default_millis`` is used (``None`` 
leaves it unset).
+    """
+    if millis is not None and seconds is not None:
+        raise ValueError("provide only one of the milliseconds option or the 
seconds option, not both")
+    if seconds is not None:
+        return seconds
+    if millis is not None:
+        return millis / 1000
+    if default_millis is None:
+        return None
+    return default_millis / 1000
+
+
+def _normalize_compression(compression):
+    """Normalize the compression option to a canonical string ('none' or 
'deflate').
+
+    Accepts the string forms 'none'/'deflate'.
+    """
+    if compression is None:
+        return DEFAULT_COMPRESSION
+    if isinstance(compression, str):
+        normalized = compression.lower()
+        if normalized in ('none', 'deflate'):
+            return normalized
+        raise ValueError("compression must be one of 'none', 'deflate', got 
'%s'" % compression)
+    raise TypeError("compression must be a str ('none'|'deflate'), got %s" % 
type(compression).__name__)
+
+
+def _keep_alive_socket_options(keep_alive_time):
+    """Build the list of socket options that enable TCP keep-alive with the
+    given idle time before probes begin. TCP_KEEPIDLE is platform dependent
+    (Linux); macOS exposes the equivalent as TCP_KEEPALIVE. Both are guarded so
+    platforms lacking the option (e.g. Windows) simply enable SO_KEEPALIVE."""
+    options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
+    idle_opt = getattr(socket, 'TCP_KEEPIDLE', None)
+    if idle_opt is None:
+        # macOS names the idle-before-probe option TCP_KEEPALIVE
+        idle_opt = getattr(socket, 'TCP_KEEPALIVE', None)
+    if idle_opt is not None:
+        options.append((socket.IPPROTO_TCP, idle_opt, int(keep_alive_time)))
+    return options
+
+
+def _keep_alive_socket_factory(keep_alive_time):
+    """Return a socket_factory (aiohttp >= 3.11) that applies the keep-alive
+    socket options when each connection socket is created."""
+    options = _keep_alive_socket_options(keep_alive_time)
+
+    def factory(addr_info):
+        family, type_, proto, _, _ = addr_info
+        sock = socket.socket(family=family, type=type_, proto=proto)
+        for level, optname, value in options:
+            try:
+                sock.setsockopt(level, optname, value)
+            except (OSError, AttributeError):
+                pass
+        return sock
+
+    return factory
+
 
 class AiohttpSyncStream:
     """Wraps aiohttp's async StreamReader as a synchronous file-like object.
@@ -78,7 +154,13 @@ class AiohttpSyncStream:
 class AiohttpHTTPTransport:
     nest_asyncio_applied = False
 
-    def __init__(self, call_from_event_loop=None, read_timeout=None, 
write_timeout=None, **kwargs):
+    def __init__(self, call_from_event_loop=None, write_timeout=None,
+                 connect_timeout_millis=None, connect_timeout=None,
+                 idle_timeout_millis=None, idle_timeout=None,
+                 read_timeout_millis=None, read_timeout=None,
+                 keep_alive_time_millis=None, keep_alive_time=None,
+                 compression=DEFAULT_COMPRESSION, proxy=None, trust_env=False,
+                 max_connections=None, **kwargs):
         if call_from_event_loop is not None and call_from_event_loop and not 
AiohttpHTTPTransport.nest_asyncio_applied:
             """ 
                 The AiohttpTransport implementation uses the asyncio event 
loop. Because of this, it cannot be called 
@@ -95,17 +177,36 @@ class AiohttpHTTPTransport:
         self._client_session = None
         self._http_req_resp = None
         self._enable_ssl = False
+        self._ssl_context = None
         self._url = None
 
         # Set all inner variables to parameters passed in.
         self._aiohttp_kwargs = kwargs
         self._write_timeout = write_timeout
-        self._read_timeout = read_timeout
-        # max_content_length is no longer enforced with streaming 
deserialization, but pop it
-        # to prevent it from leaking to aiohttp as an unknown kwarg
-        self._aiohttp_kwargs.pop("max_content_length", None)
-        if "ssl_options" in self._aiohttp_kwargs:
-            self._ssl_context = self._aiohttp_kwargs.pop("ssl_options")
+
+        # Timeouts accept a millisecond number (*_millis, primary form) or the 
idiomatic seconds number
+        # (unsuffixed). read_timeout defaults off; the others use the 
canonical millisecond defaults.
+        self._read_timeout = _resolve_timeout_seconds(read_timeout_millis, 
read_timeout, None)
+
+        # Connection-level pooling / lifecycle options.
+        self._connect_timeout = 
_resolve_timeout_seconds(connect_timeout_millis, connect_timeout, 
DEFAULT_CONNECT_TIMEOUT_MILLIS)
+        self._idle_timeout = _resolve_timeout_seconds(idle_timeout_millis, 
idle_timeout, DEFAULT_IDLE_TIMEOUT_MILLIS)
+        self._keep_alive_time = 
_resolve_timeout_seconds(keep_alive_time_millis, keep_alive_time, 
DEFAULT_KEEP_ALIVE_TIME_MILLIS)
+        # Caps the aiohttp connector's simultaneous connections per Connection
+        # (the Client also sizes its Connection pool by this value).
+        self._max_connections = max_connections
+
+        # Compression negotiation. Default 'deflate' (on); advertises
+        # Accept-Encoding: deflate. Set 'none' to opt out.
+        self._compression = _normalize_compression(compression)
+
+        # HTTP proxy support routed through the ClientSession.
+        self._proxy = proxy
+        self._trust_env = trust_env
+
+        # ssl: canonical name accepting an ssl.SSLContext.
+        if "ssl" in self._aiohttp_kwargs:
+            self._ssl_context = self._aiohttp_kwargs.pop("ssl")
             self._enable_ssl = True
 
     def __del__(self):
@@ -117,26 +218,75 @@ class AiohttpHTTPTransport:
         self._url = url
         # Inner function to perform async connect.
         async def async_connect():
-            # Start client session and use it to send all HTTP requests. 
Headers can be set here.
+            # Build the TCP connector with the standardized pooling / lifecycle
+            # options. keepalive_timeout maps to the idle connection timeout;
+            # the socket factory enables TCP keep-alive probes after
+            # keep_alive_time idle.
+            connector_kwargs = {}
             if self._enable_ssl:
-                # ssl context is established through tcp connector
-                tcp_conn = aiohttp.TCPConnector(ssl_context=self._ssl_context)
-                self._client_session = 
aiohttp.ClientSession(connector=tcp_conn,
-                                                             headers=headers, 
loop=self._loop)
-            else:
-                self._client_session = aiohttp.ClientSession(headers=headers, 
loop=self._loop)
+                # ssl context is established through the tcp connector
+                connector_kwargs['ssl_context'] = self._ssl_context
+            if self._idle_timeout is not None:
+                connector_kwargs['keepalive_timeout'] = self._idle_timeout
+            if self._keep_alive_time is not None:
+                self._apply_keep_alive(connector_kwargs)
+            # Reflect max_connections at the aiohttp layer so the connector's
+            # simultaneous-connection limit matches the driver option.
+            if self._max_connections is not None:
+                connector_kwargs['limit'] = self._max_connections
+
+            session_kwargs = {'headers': headers, 'loop': self._loop,
+                              'trust_env': self._trust_env}
+            # Use the per-socket timeouts (sock_connect/sock_read) rather than 
a
+            # whole-request total, which would abort long but legitimate 
streaming
+            # responses. sock_read bounds idle time between chunks so a stalled
+            # server cannot hang forever.
+            timeout_kwargs = {}
+            if self._connect_timeout is not None:
+                timeout_kwargs['sock_connect'] = self._connect_timeout
+            if self._read_timeout is not None:
+                timeout_kwargs['sock_read'] = self._read_timeout
+            if timeout_kwargs:
+                session_kwargs['timeout'] = 
aiohttp.ClientTimeout(**timeout_kwargs)
+            if connector_kwargs:
+                session_kwargs['connector'] = 
aiohttp.TCPConnector(**connector_kwargs)
+
+            self._client_session = aiohttp.ClientSession(**session_kwargs)
 
         # Execute the async connect synchronously.
         self._loop.run_until_complete(async_connect())
 
+    def _apply_keep_alive(self, connector_kwargs):
+        """Wire TCP keep-alive into the connector via the aiohttp 
socket_factory.
+        The factory sets SO_KEEPALIVE plus the per-socket idle time; 
unsupported
+        platforms degrade gracefully inside the factory."""
+        connector_kwargs['socket_factory'] = 
_keep_alive_socket_factory(self._keep_alive_time)
+
     def write(self, message):
+        # Negotiate compression unless the caller already set Accept-Encoding:
+        # deflate advertises Accept-Encoding: deflate; none suppresses 
aiohttp's
+        # auto-injected Accept-Encoding so compression is not silently 
negotiated.
+        headers = message['headers']
+        has_accept_encoding = any(k.lower() == 'accept-encoding' for k in 
headers)
+        skip_auto_headers = None
+        if self._compression == 'deflate':
+            if not has_accept_encoding:
+                headers['accept-encoding'] = 'deflate'
+        elif not has_accept_encoding:
+            skip_auto_headers = ['Accept-Encoding']
+
         # Inner function to perform async write.
         async def async_write():
+            post_kwargs = dict(self._aiohttp_kwargs)
+            if self._proxy is not None:
+                post_kwargs['proxy'] = self._proxy
+            if skip_auto_headers is not None:
+                post_kwargs['skip_auto_headers'] = skip_auto_headers
             async with async_timeout.timeout(self._write_timeout):
                 self._http_req_resp = await 
self._client_session.post(url=self._url,
                                                                       
data=message['payload'],
-                                                                      
headers=message['headers'],
-                                                                      
**self._aiohttp_kwargs)
+                                                                      
headers=headers,
+                                                                      
**post_kwargs)
 
         # Execute the async write synchronously.
         self._loop.run_until_complete(async_write())
@@ -183,5 +333,5 @@ class AiohttpHTTPTransport:
 
     @property
     def closed(self):
-        # Connection is closed when client session is closed.
-        return self._client_session.closed
+        # Connection is closed when client session is closed (or not yet 
created).
+        return self._client_session is None or self._client_session.closed
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/auth.py 
b/gremlin-python/src/main/python/gremlin_python/driver/auth.py
index 89d6ccf889..991c5cbaca 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/auth.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/auth.py
@@ -28,30 +28,53 @@ def basic(username, password):
     return interceptor
 
 
-def sigv4(region, service):
-    """Returns an interceptor that signs the request with AWS SigV4."""
+def sigv4(region, service, credentials=None):
+    """Returns an interceptor that signs the request with AWS SigV4.
+
+    By default credentials are sourced from the standard AWS environment
+    variables (``AWS_ACCESS_KEY_ID``, ``AWS_SECRET_ACCESS_KEY`` and the 
optional
+    ``AWS_SESSION_TOKEN``). A custom credentials provider may be supplied via
+    ``credentials``; it accepts either:
+
+    * a callable returning a botocore ``Credentials`` object (or any object 
with
+      ``access_key``/``secret_key``/``token`` attributes), evaluated per 
request, or
+    * a botocore ``Credentials`` object (or ``Session``) used directly.
+
+    When no provider is given, signing falls back to the environment.
+    """
     import os
     from boto3 import Session
     from botocore.auth import SigV4Auth
     from botocore.awsrequest import AWSRequest
 
+    def _resolve_credentials():
+        if credentials is None:
+            access_key = os.environ.get('AWS_ACCESS_KEY_ID', '')
+            secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', '')
+            session_token = os.environ.get('AWS_SESSION_TOKEN', '')
+            session = Session(
+                aws_access_key_id=access_key,
+                aws_secret_access_key=secret_key,
+                aws_session_token=session_token,
+                region_name=region
+            )
+            return session.get_credentials()
+
+        provider = credentials() if callable(credentials) else credentials
+        # A botocore Session exposes get_credentials(); a Credentials object is
+        # already usable as-is.
+        if hasattr(provider, 'get_credentials'):
+            return provider.get_credentials()
+        return provider
+
     def interceptor(request):
         # Ensure body is serialized so we can sign it
         body_bytes = request.serialize_body()
 
-        access_key = os.environ.get('AWS_ACCESS_KEY_ID', '')
-        secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', '')
-        session_token = os.environ.get('AWS_SESSION_TOKEN', '')
-
-        session = Session(
-            aws_access_key_id=access_key,
-            aws_secret_access_key=secret_key,
-            aws_session_token=session_token,
-            region_name=region
-        )
+        resolved = _resolve_credentials()
 
         sigv4_request = AWSRequest(method=request.method, url=request.url, 
data=body_bytes)
-        SigV4Auth(session.get_credentials(), service, 
region).add_auth(sigv4_request)
+        SigV4Auth(resolved, service, region).add_auth(sigv4_request)
         request.headers.update(dict(sigv4_request.headers))
 
     return interceptor
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/client.py 
b/gremlin-python/src/main/python/gremlin_python/driver/client.py
index a32e8f9d0b..7cb073f696 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/client.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/client.py
@@ -38,10 +38,11 @@ __author__ = 'David M. Brown ([email protected]), Lyndon 
Bauto (lyndonb@bitqui
 
 class Client:
 
-    def __init__(self, url, traversal_source, pool_size=None, max_workers=None,
+    def __init__(self, url, traversal_source, max_connections=128, 
max_workers=None,
                  response_serializer=None, interceptors=None, auth=None,
-                 headers=None, enable_user_agent_on_connect=True,
-                 bulk_results=False, pdt_registry=None, **transport_kwargs):
+                 enable_user_agent_on_connect=True,
+                 bulk_results=False, pdt_registry=None, batch_size=None,
+                 **transport_kwargs):
         log.info("Creating Client with url '%s'", url)
 
         self._closed = False
@@ -49,10 +50,12 @@ class Client:
         # A raw list is safe here because Python's GIL ensures list.append and
         # list.remove are atomic at the bytecode level.
         self._tracked_transactions = []
-        self._headers = headers
         self._enable_user_agent_on_connect = enable_user_agent_on_connect
         self._bulk_results = bulk_results
         self._traversal_source = traversal_source
+        if batch_size is None:
+            batch_size = 64
+        self._batch_size = batch_size
         if response_serializer is None:
             response_serializer = serializer.GraphBinarySerializersV4()
         if pdt_registry is not None:
@@ -64,14 +67,12 @@ class Client:
 
         self._transport_kwargs = transport_kwargs
 
-        if pool_size is None:
-            pool_size = 8
-        self._pool_size = pool_size
+        self._max_connections = max_connections
         # This is until concurrent.futures backport 3.1.0 release
         if max_workers is None:
             # If your application is overlapping Gremlin I/O on multiple 
threads
             # consider passing kwarg max_workers = (cpu_count() or 1) * 5
-            max_workers = pool_size
+            max_workers = max_connections
         self._executor = ThreadPoolExecutor(max_workers=max_workers)
         # Threadsafe queue
         self._pool = queue.Queue()
@@ -93,7 +94,7 @@ class Client:
         return self._traversal_source
 
     def _fill_pool(self):
-        for i in range(self._pool_size):
+        for i in range(self._max_connections):
             conn = self._get_connection()
             self._pool.put_nowait(conn)
 
@@ -147,9 +148,9 @@ class Client:
             self._executor, self._pool,
             response_serializer=self._response_serializer,
             auth=self._auth, interceptors=self._interceptors,
-            headers=self._headers,
             enable_user_agent_on_connect=self._enable_user_agent_on_connect,
             bulk_results=self._bulk_results,
+            max_connections=self._max_connections,
             **self._transport_kwargs)
 
     def submit(self, message, bindings=None, request_options=None):
@@ -181,6 +182,14 @@ class Client:
         if isinstance(message, str):
             log.debug("fields='%s', gremlin='%s'", str(fields), str(message))
             message = request.RequestMessage(fields=fields, gremlin=message)
+        else:
+            # A caller-supplied RequestMessage must not be mutated in place:
+            # resubmitting the same message (e.g. on retry) would otherwise
+            # accumulate request_options/batchSize from prior submits. Clone 
the
+            # fields dict so this submit's mutations stay local, matching the
+            # no-mutate contract of the .NET/JS drivers. Freshly built messages
+            # (the string path above) already own a private fields dict.
+            message = message._replace(fields=dict(message.fields))
 
         conn = self._pool.get(True)
         if request_options:
@@ -193,4 +202,9 @@ class Client:
                     bindings_val = 
GremlinLang.convert_parameters_to_string(bindings_val)
                 message.fields['bindings'] = bindings_val
 
+        # Fill in the connection-level default batch size when the caller did
+        # not set a per-request batchSize.
+        if self._batch_size is not None and 'batchSize' not in message.fields:
+            message.fields['batchSize'] = self._batch_size
+
         return conn.write(message)
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/connection.py 
b/gremlin-python/src/main/python/gremlin_python/driver/connection.py
index c9b65eab78..7a46a53a44 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/connection.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/connection.py
@@ -37,7 +37,7 @@ class Connection:
     def __init__(self, url, traversal_source,
                  executor, pool,
                  response_serializer=None, auth=None, interceptors=None,
-                 headers=None, enable_user_agent_on_connect=True,
+                 enable_user_agent_on_connect=True,
                  bulk_results=False, **transport_kwargs):
         if callable(interceptors):
             interceptors = [interceptors]
@@ -53,7 +53,10 @@ class Connection:
             interceptors = (interceptors or []) + [auth]
 
         self._url = url
-        self._headers = headers
+        # Custom request headers are set via interceptors. This internal dict
+        # only carries connection-level headers managed by the driver itself
+        # (user agent, bulkResults) and is established at construction time.
+        self._headers = None
         self._traversal_source = traversal_source
         self._transport_kwargs = transport_kwargs
         self._executor = executor
diff --git 
a/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
 
b/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
index 8c82b3d729..3c0f094569 100644
--- 
a/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
+++ 
b/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
@@ -32,17 +32,17 @@ __author__ = 'David M. Brown ([email protected]), Lyndon 
Bauto (lyndonb@bitqui
 class DriverRemoteConnection(RemoteConnection):
 
     def __init__(self, url, traversal_source="g",
-                 pool_size=None, max_workers=None,
+                 max_connections=128, max_workers=None,
                  response_serializer=None, interceptors=None, auth=None,
-                 headers=None, enable_user_agent_on_connect=True,
-                 bulk_results=False, pdt_registry=None, **transport_kwargs):
+                 enable_user_agent_on_connect=True,
+                 bulk_results=False, pdt_registry=None, batch_size=None,
+                 **transport_kwargs):
         log.info("Creating DriverRemoteConnection with url '%s'", str(url))
         self.__url = url
         self.__traversal_source = traversal_source
-        self.__pool_size = pool_size
+        self.__max_connections = max_connections
         self.__max_workers = max_workers
         self.__auth = auth
-        self.__headers = headers
         self.__enable_user_agent_on_connect = enable_user_agent_on_connect
         self.__bulk_results = bulk_results
         self.__transport_kwargs = transport_kwargs
@@ -51,14 +51,14 @@ class DriverRemoteConnection(RemoteConnection):
         if response_serializer is None:
             response_serializer = serializer.GraphBinarySerializersV4()
         self._client = client.Client(url, traversal_source,
-                                     pool_size=pool_size,
+                                     max_connections=max_connections,
                                      max_workers=max_workers,
                                      response_serializer=response_serializer,
                                      interceptors=interceptors, auth=auth,
-                                     headers=headers,
                                      
enable_user_agent_on_connect=enable_user_agent_on_connect,
                                      bulk_results=bulk_results,
                                      pdt_registry=pdt_registry,
+                                     batch_size=batch_size,
                                      **transport_kwargs)
         self._url = self._client._url
         self._traversal_source = self._client._traversal_source
diff --git a/gremlin-python/src/main/python/pyproject.toml 
b/gremlin-python/src/main/python/pyproject.toml
index 8332cb6d9e..8510bc701b 100644
--- a/gremlin-python/src/main/python/pyproject.toml
+++ b/gremlin-python/src/main/python/pyproject.toml
@@ -29,7 +29,7 @@ maintainers = [{name = "Apache TinkerPop", email = 
"[email protected]"}]
 requires-python = ">=3.10,<3.14"
 dependencies = [
     "nest_asyncio",
-    "aiohttp>=3.8.0,<4.0.0",
+    "aiohttp>=3.11.0,<4.0.0",
     "aenum>=1.4.5,<4.0.0",
     "isodate>=0.6.0,<1.0.0",
     "boto3",
diff --git a/gremlin-python/src/main/python/tests/integration/conftest.py 
b/gremlin-python/src/main/python/tests/integration/conftest.py
index 8776859ed2..fb505e8cb8 100644
--- a/gremlin-python/src/main/python/tests/integration/conftest.py
+++ b/gremlin-python/src/main/python/tests/integration/conftest.py
@@ -98,7 +98,7 @@ def authenticated_client(request):
             ssl_opts.verify_mode = ssl.CERT_NONE
             client = Client(basic_url, 'gmodern',
                             auth=basic('stephen', 'password'),
-                            ssl_options=ssl_opts)
+                            ssl=ssl_opts)
         else:
             raise ValueError("Invalid authentication option - " + 
request.param)
     except OSError:
@@ -155,7 +155,7 @@ def remote_connection_authenticated(request):
             ssl_opts.verify_mode = ssl.CERT_NONE
             remote_conn = DriverRemoteConnection(basic_url, 'gmodern',
                                                  auth=basic('stephen', 
'password'),
-                                                 ssl_options=ssl_opts)
+                                                 ssl=ssl_opts)
         else:
             raise ValueError("Invalid authentication option - " + 
request.param)
     except OSError:
diff --git 
a/gremlin-python/src/main/python/tests/integration/driver/test_client.py 
b/gremlin-python/src/main/python/tests/integration/driver/test_client.py
index cc972204d0..0981e7737b 100644
--- a/gremlin-python/src/main/python/tests/integration/driver/test_client.py
+++ b/gremlin-python/src/main/python/tests/integration/driver/test_client.py
@@ -70,6 +70,33 @@ def test_client_simple_eval(client):
     assert client.submit('g.inject(2)').all().result()[0] == 2
 
 
+def test_client_deflate_compression_round_trip():
+    # With compression enabled the driver must (1) advertise Accept-Encoding: 
deflate
+    # on the request and (2) transparently decompress the server's 
deflate-compressed
+    # response. The interceptor captures a reference to the outgoing headers 
dict, which
+    # the transport mutates in place to add Accept-Encoding before sending. 
Use a large,
+    # repetitive payload so the response is actually compressed and spans 
multiple buffer fills.
+    captured_headers = {}
+
+    def capture(http_request):
+        # keep a reference; the transport adds Accept-Encoding to this same 
dict at send time
+        captured_headers['ref'] = http_request.headers
+
+    client = Client(test_no_auth_url, 'g', compression='deflate', 
interceptors=capture)
+    try:
+        result = client.submit('[" ".repeat(200000), " ".repeat(100000)]',
+                               request_options={'language': 
'gremlin-groovy'}).all().result()
+        # (2) decompression succeeded end to end
+        assert len(result[0]) == 200000
+        assert len(result[1]) == 100000
+        # (1) the request advertised deflate
+        sent = captured_headers.get('ref', {})
+        assert sent.get('accept-encoding') == 'deflate', \
+            "expected Accept-Encoding: deflate on the request, got %r" % 
sent.get('accept-encoding')
+    finally:
+        client.close()
+
+
 def test_client_simple_eval_bindings(client):
     assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0] 
== 29
 
@@ -121,8 +148,8 @@ def test_bad_serialization(client):
 
 
 def test_client_connection_pool_after_error(client):
-    # Overwrite fixture with pool_size=1 client
-    client = Client(test_no_auth_url, 'gmodern', pool_size=1)
+    # Overwrite fixture with max_connections=1 client
+    client = Client(test_no_auth_url, 'gmodern', max_connections=1)
 
     try:
         # should fire an exception
@@ -149,7 +176,7 @@ def test_client_no_hang_if_submit_on_closed(client):
 
 
 def test_client_close_all_connection_in_pool(client):
-    client = Client(test_no_auth_url, 'g', pool_size=1)
+    client = Client(test_no_auth_url, 'g', max_connections=1)
     assert client.available_pool_size == 1
     client.submit('g.inject(4)').all().result()
     client.close()
@@ -158,7 +185,7 @@ def test_client_close_all_connection_in_pool(client):
 
 def test_client_side_timeout_set_for_aiohttp(client):
     client = Client(test_no_auth_url, 'gmodern',
-                    read_timeout=1, write_timeout=1)
+                    read_timeout_millis=1000, write_timeout=1)
 
     try:
         # should fire an exception
@@ -282,8 +309,8 @@ def test_client_async(client):
 
 
 def test_connection_share(client):
-    # Overwrite fixture with pool_size=1 client
-    client = Client(test_no_auth_url, 'gmodern', pool_size=1)
+    # Overwrite fixture with max_connections=1 client
+    client = Client(test_no_auth_url, 'gmodern', max_connections=1)
     g = GraphTraversalSource(Graph(), TraversalStrategies())
     t = g.V()
     message = create_basic_request_message(t)
@@ -294,7 +321,7 @@ def test_connection_share(client):
     result_set2 = future2.result()
     assert len(result_set2.all().result()) == 6
 
-    # This future has to finish for the second to yield result - pool_size=1
+    # This future has to finish for the second to yield result - 
max_connections=1
     assert future.done()
     result_set = future.result()
     assert len(result_set.all().result()) == 6
@@ -305,7 +332,7 @@ def test_multi_conn_pool(client):
     t = g.V()
     message = create_basic_request_message(t)
     message2 = create_basic_request_message(t)
-    client = Client(test_no_auth_url, 'g', pool_size=1)
+    client = Client(test_no_auth_url, 'g', max_connections=1)
     future = client.submit_async(message)
     future2 = client.submit_async(message2)
 
@@ -604,7 +631,7 @@ def 
test_auto_serializes_request_message_with_interceptor_mutation():
             http_request.body = RequestMessage(fields={"g": "gmodern"}, 
gremlin="g.inject(99)")
 
     client = Client(test_no_auth_url, 'gmodern',
-                    pool_size=1, interceptors=swap_query)
+                    max_connections=1, interceptors=swap_query)
     try:
         result = client.submit("g.inject(1)").next()
         assert 99 == result
@@ -623,7 +650,7 @@ def test_interceptor_errors_propagate():
             raise RuntimeError("interceptor broke")
 
     client = Client(test_no_auth_url, 'gmodern',
-                    pool_size=1, interceptors=failing_interceptor)
+                    max_connections=1, interceptors=failing_interceptor)
     try:
         # First request should fail with interceptor error
         try:
diff --git 
a/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
 
b/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
index 149f2735f4..013280e39f 100644
--- 
a/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
+++ 
b/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
@@ -136,7 +136,7 @@ def test_should_handle_slow_response(socket_server_client):
 def test_should_timeout_when_server_never_responds():
     # Use a short read_timeout so the test fails fast instead of waiting for
     # aiohttp's 5-minute default. aiohttp surfaces this as 
asyncio.TimeoutError.
-    client = Client(url, 'g', read_timeout=2)
+    client = Client(url, 'g', read_timeout_millis=2000)
     try:
         with pytest.raises(asyncio.TimeoutError):
             client.submit(GREMLIN_NO_RESPONSE).all().result()
diff --git 
a/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
 
b/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
index 898872d1f8..d9edf298f0 100644
--- 
a/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
+++ 
b/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
@@ -63,7 +63,7 @@ def _executor(q, conn):
     if not conn:
         # This isn't a fixture so close manually
         close = True
-        conn = DriverRemoteConnection(test_no_auth_url, 'gmodern', pool_size=4)
+        conn = DriverRemoteConnection(test_no_auth_url, 'gmodern', 
max_connections=4)
     try:
         g = traversal().with_(conn)
         future = g.V().promise()
diff --git 
a/gremlin-python/src/main/python/tests/unit/driver/test_client_options.py 
b/gremlin-python/src/main/python/tests/unit/driver/test_client_options.py
new file mode 100644
index 0000000000..215891e966
--- /dev/null
+++ b/gremlin-python/src/main/python/tests/unit/driver/test_client_options.py
@@ -0,0 +1,264 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Unit tests for connection-level options surfaced on Client,
+DriverRemoteConnection, and the SigV4 credentials-provider variant."""
+
+import warnings
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from gremlin_python.driver.client import Client
+from gremlin_python.driver.connection import Connection
+from gremlin_python.driver.request import RequestMessage
+
+
+# Patch Connection so Client._fill_pool does not attempt any real connections.
+def _make_client(**kwargs):
+    with patch('gremlin_python.driver.client.connection.Connection', 
MagicMock()):
+        return Client('http://localhost:8182/gremlin', 'g', **kwargs)
+
+
+class TestMaxConnections:
+
+    def test_default_is_128(self):
+        client = _make_client()
+        assert client._max_connections == 128
+
+    def test_explicit_max_connections(self):
+        client = _make_client(max_connections=4)
+        assert client._max_connections == 4
+
+
+class TestBatchSize:
+
+    def test_default_is_64(self):
+        client = _make_client()
+        assert client._batch_size == 64
+
+    def test_explicit_value(self):
+        client = _make_client(batch_size=200)
+        assert client._batch_size == 200
+
+    def test_fills_batch_size_when_unset(self):
+        client = _make_client(batch_size=64)
+        conn = MagicMock()
+        client._pool.get = MagicMock(return_value=conn)
+        client.submit_async('g.V()')
+        sent = conn.write.call_args[0][0]
+        assert sent.fields['batchSize'] == 64
+
+    def test_does_not_override_per_request_batch_size(self):
+        client = _make_client(batch_size=64)
+        conn = MagicMock()
+        client._pool.get = MagicMock(return_value=conn)
+        client.submit_async('g.V()', request_options={'batchSize': 10})
+        sent = conn.write.call_args[0][0]
+        assert sent.fields['batchSize'] == 10
+
+
+class TestMaxConnectionsThreadedToTransport:
+
+    def test_max_connections_forwarded_to_connection(self):
+        # Client must pass max_connections through to each Connection so the
+        # aiohttp connector limit can be set, in addition to sizing the pool.
+        captured = {}
+
+        def fake_connection(*args, **kwargs):
+            captured.update(kwargs)
+            return MagicMock()
+
+        with patch('gremlin_python.driver.client.connection.Connection',
+                   side_effect=fake_connection):
+            Client('http://localhost:8182/gremlin', 'g', max_connections=5)
+        assert captured.get('max_connections') == 5
+
+    def test_default_max_connections_forwarded_to_connection(self):
+        captured = {}
+
+        def fake_connection(*args, **kwargs):
+            captured.update(kwargs)
+            return MagicMock()
+
+        with patch('gremlin_python.driver.client.connection.Connection',
+                   side_effect=fake_connection):
+            Client('http://localhost:8182/gremlin', 'g')
+        assert captured.get('max_connections') == 128
+
+
+class TestRequestMessageNoMutation:
+
+    def test_resubmit_does_not_accumulate_fields(self):
+        # A caller-supplied RequestMessage must not be mutated in place: the
+        # second submit with different options must not see the first submit's
+        # batchSize/request_options leak in.
+        client = _make_client(batch_size=64)
+        conn = MagicMock()
+        client._pool.get = MagicMock(return_value=conn)
+
+        original = RequestMessage(fields={'g': 'g'}, gremlin='g.V()')
+
+        client.submit_async(original, request_options={'evaluationTimeout': 
1000})
+        # The caller's original message must be untouched.
+        assert 'batchSize' not in original.fields
+        assert 'evaluationTimeout' not in original.fields
+        assert original.fields == {'g': 'g'}
+
+        # Resubmit the same message with different options; it must not carry
+        # over state from the first submit.
+        client.submit_async(original, request_options={'batchSize': 5})
+        sent = conn.write.call_args[0][0]
+        assert sent.fields['batchSize'] == 5
+        assert 'evaluationTimeout' not in sent.fields
+        # And the original is still pristine.
+        assert original.fields == {'g': 'g'}
+
+    def test_batch_size_not_written_to_caller_message(self):
+        # Use a non-default value so the assertion proves the configured
+        # batch_size flowed through, not the library default (64).
+        client = _make_client(batch_size=32)
+        conn = MagicMock()
+        client._pool.get = MagicMock(return_value=conn)
+        original = RequestMessage(fields={'g': 'g'}, gremlin='g.V()')
+        client.submit_async(original)
+        # default batchSize was applied to the sent clone, not the caller's msg
+        sent = conn.write.call_args[0][0]
+        assert sent.fields['batchSize'] == 32
+        assert 'batchSize' not in original.fields
+
+
+class TestHeadersKwargRemoved:
+
+    def test_client_rejects_headers_kwarg(self):
+        # headers is no longer a named parameter; it lands in transport_kwargs
+        # and would be forwarded to the transport. Verify Client has no 
_headers.
+        client = _make_client()
+        assert not hasattr(client, '_headers')
+
+    def test_connection_has_no_headers_param(self):
+        import inspect
+        params = inspect.signature(Connection.__init__).parameters
+        assert 'headers' not in params
+
+    def test_connection_internal_headers_default_none(self):
+        conn = Connection(
+            url='http://localhost:8182/gremlin',
+            traversal_source='g',
+            executor=MagicMock(),
+            pool=MagicMock(),
+            enable_user_agent_on_connect=False,
+        )
+        assert conn._headers is None
+
+
+class TestDriverRemoteConnectionOptions:
+
+    def test_max_connections_forwarded(self):
+        from gremlin_python.driver.driver_remote_connection import 
DriverRemoteConnection
+        with 
patch('gremlin_python.driver.driver_remote_connection.client.Client') as 
MockClient:
+            instance = MockClient.return_value
+            instance._url = 'http://localhost:8182/gremlin'
+            instance._traversal_source = 'g'
+            DriverRemoteConnection('http://localhost:8182/gremlin', 'g',
+                                   max_connections=16, batch_size=32)
+        _, kwargs = MockClient.call_args
+        assert kwargs['max_connections'] == 16
+        assert kwargs['batch_size'] == 32
+
+    def test_no_headers_param(self):
+        import inspect
+        from gremlin_python.driver.driver_remote_connection import 
DriverRemoteConnection
+        params = inspect.signature(DriverRemoteConnection.__init__).parameters
+        assert 'headers' not in params
+        assert 'pool_size' not in params
+        assert 'max_connections' in params
+
+    def test_submit_async_signature_single_arg(self):
+        import inspect
+        from gremlin_python.driver.driver_remote_connection import 
DriverRemoteConnection
+        params = 
list(inspect.signature(DriverRemoteConnection.submit_async).parameters)
+        # self + gremlin_lang only
+        assert params == ['self', 'gremlin_lang']
+
+
+
+class TestSigV4CredentialsProvider:
+
+    def _fake_request(self):
+        req = MagicMock()
+        req.method = 'POST'
+        req.url = 'http://localhost:8182/gremlin'
+        req.serialize_body.return_value = b'{}'
+        req.headers = {}
+        return req
+
+    def test_callable_credentials_provider_used(self):
+        from gremlin_python.driver.auth import sigv4
+
+        sentinel_creds = object()
+        provider = MagicMock(return_value=sentinel_creds)
+
+        with patch('botocore.auth.SigV4Auth') as MockAuth:
+            signer = MockAuth.return_value
+            signer.add_auth = MagicMock()
+            interceptor = sigv4('us-east-1', 'neptune-db', 
credentials=provider)
+            req = self._fake_request()
+            interceptor(req)
+
+        provider.assert_called_once()
+        # SigV4Auth must be constructed with the credentials returned by the 
provider
+        args, _ = MockAuth.call_args
+        assert args[0] is sentinel_creds
+
+    def test_credentials_object_with_get_credentials(self):
+        from gremlin_python.driver.auth import sigv4
+
+        resolved = object()
+
+        class FakeSession:  # not callable; mimics a botocore Session
+            def get_credentials(self_inner):
+                return resolved
+
+        with patch('botocore.auth.SigV4Auth') as MockAuth:
+            MockAuth.return_value.add_auth = MagicMock()
+            interceptor = sigv4('us-east-1', 'neptune-db', 
credentials=FakeSession())
+            interceptor(self._fake_request())
+
+        args, _ = MockAuth.call_args
+        assert args[0] is resolved
+
+    def test_env_fallback_when_no_provider(self):
+        from gremlin_python.driver.auth import sigv4
+
+        resolved = object()
+        with patch('boto3.Session') as MockSession, \
+                patch('botocore.auth.SigV4Auth') as MockAuth:
+            MockSession.return_value.get_credentials.return_value = resolved
+            MockAuth.return_value.add_auth = MagicMock()
+            interceptor = sigv4('us-east-1', 'neptune-db')
+            interceptor(self._fake_request())
+
+        # With no provider supplied, credentials must be resolved from the AWS
+        # environment via a boto3 Session, and those resolved credentials must 
be
+        # the ones handed to the SigV4 signer.
+        MockSession.assert_called_once()
+        MockSession.return_value.get_credentials.assert_called_once()
+        args, _ = MockAuth.call_args
+        assert args[0] is resolved
diff --git 
a/gremlin-python/src/main/python/tests/unit/driver/test_connection_options.py 
b/gremlin-python/src/main/python/tests/unit/driver/test_connection_options.py
new file mode 100644
index 0000000000..4411db936c
--- /dev/null
+++ 
b/gremlin-python/src/main/python/tests/unit/driver/test_connection_options.py
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Unit tests for the standardized GLV connection options (TinkerPop 4.x).
+
+These cover option wiring on the aiohttp transport, the Client pool, the
+Connection request path, the DriverRemoteConnection surface, and the SigV4
+credentials-provider variant. They avoid any network I/O.
+"""
+
+import socket
+import warnings
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from gremlin_python.driver.aiohttp.transport import (
+    AiohttpHTTPTransport,
+    _normalize_compression,
+    _keep_alive_socket_options,
+    _keep_alive_socket_factory,
+    DEFAULT_CONNECT_TIMEOUT,
+    DEFAULT_IDLE_TIMEOUT,
+    DEFAULT_KEEP_ALIVE_TIME,
+)
+
+
+# ---------------------------------------------------------------------------
+# Compression normalization
+# ---------------------------------------------------------------------------
+
+class TestCompressionNormalization:
+
+    def test_default_is_deflate(self):
+        assert _normalize_compression(None) == 'deflate'
+
+    def test_string_none(self):
+        assert _normalize_compression('none') == 'none'
+
+    def test_string_deflate(self):
+        assert _normalize_compression('deflate') == 'deflate'
+
+    def test_case_insensitive(self):
+        assert _normalize_compression('DEFLATE') == 'deflate'
+
+    def test_invalid_string_raises(self):
+        with pytest.raises(ValueError):
+            _normalize_compression('gzip')
+
+    def test_invalid_type_raises(self):
+        with pytest.raises(TypeError):
+            _normalize_compression(5)
+
+
+# ---------------------------------------------------------------------------
+# Keep-alive socket option construction
+# ---------------------------------------------------------------------------
+
+class TestKeepAliveSocketOptions:
+
+    def test_includes_so_keepalive(self):
+        opts = _keep_alive_socket_options(30)
+        assert (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) in opts
+
+    def test_includes_idle_when_platform_supports(self):
+        idle_opt = getattr(socket, 'TCP_KEEPIDLE', None) or getattr(socket, 
'TCP_KEEPALIVE', None)
+        opts = _keep_alive_socket_options(45)
+        if idle_opt is not None:
+            assert (socket.IPPROTO_TCP, idle_opt, 45) in opts
+        else:
+            # Only SO_KEEPALIVE is present when no idle option exists (e.g. 
Windows)
+            assert len(opts) == 1
+
+    def test_socket_factory_applies_options(self):
+        factory = _keep_alive_socket_factory(30)
+        addr_info = (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 
0))
+        sock = factory(addr_info)
+        try:
+            assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) != 0
+        finally:
+            sock.close()
+
+
+# ---------------------------------------------------------------------------
+# Transport option wiring
+# ---------------------------------------------------------------------------
+
+class TestTransportDefaults:
+
+    def test_defaults(self):
+        t = AiohttpHTTPTransport()
+        assert t._connect_timeout == DEFAULT_CONNECT_TIMEOUT == 5
+        assert t._idle_timeout == DEFAULT_IDLE_TIMEOUT == 180
+        assert t._keep_alive_time == DEFAULT_KEEP_ALIVE_TIME == 30
+        assert t._compression == 'deflate'
+        assert t._proxy is None
+        assert t._trust_env is False
+        t.close()
+
+    def test_explicit_values(self):
+        t = AiohttpHTTPTransport(connect_timeout_millis=2000, 
idle_timeout_millis=60000,
+                                 keep_alive_time_millis=15000, 
compression='deflate',
+                                 proxy='http://proxy:3128', trust_env=True)
+        assert t._connect_timeout == 2
+        assert t._idle_timeout == 60
+        assert t._keep_alive_time == 15
+        assert t._compression == 'deflate'
+        assert t._proxy == 'http://proxy:3128'
+        assert t._trust_env is True
+        t.close()
+
+    def test_timeouts_via_seconds(self):
+        t = AiohttpHTTPTransport(connect_timeout=2,
+                                 idle_timeout=60,
+                                 read_timeout=30,
+                                 keep_alive_time=15)
+        # the unsuffixed seconds form sets the same internal seconds values as 
the *_millis form
+        assert t._connect_timeout == 2
+        assert t._idle_timeout == 60
+        assert t._read_timeout == 30
+        assert t._keep_alive_time == 15
+        t.close()
+
+    def test_timeout_rejects_both_millis_and_seconds(self):
+        with pytest.raises(ValueError):
+            AiohttpHTTPTransport(connect_timeout_millis=2000, 
connect_timeout=2)
+
+    def test_ssl_canonical_option(self):
+        import ssl as ssl_module
+        ctx = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_CLIENT)
+        t = AiohttpHTTPTransport(ssl=ctx)
+        assert t._enable_ssl is True
+        assert t._ssl_context is ctx
+        t.close()
+
+    def test_max_content_length_removed(self):
+        # max_content_length is no longer accepted; it would now be forwarded 
to
+        # aiohttp as an unknown kwarg. Verify it is not silently stored 
anywhere.
+        t = AiohttpHTTPTransport()
+        assert 'max_content_length' not in t._aiohttp_kwargs
+        t.close()
+
+
+class TestTransportConnectWiring:
+
+    def _connect_capture(self, **transport_kwargs):
+        """Construct a transport and capture the kwargs passed to TCPConnector
+        and ClientSession during connect(), without doing real I/O."""
+        t = AiohttpHTTPTransport(**transport_kwargs)
+        captured = {}
+
+        class FakeConnector:
+            def __init__(self, **kwargs):
+                captured['connector'] = kwargs
+
+        class FakeSession:
+            def __init__(self, **kwargs):
+                captured['session'] = kwargs
+
+        with patch('aiohttp.TCPConnector', FakeConnector), \
+                patch('aiohttp.ClientSession', FakeSession):
+            t.connect('http://localhost:8182/gremlin', headers={'a': 'b'})
+        t._client_session = None  # FakeSession isn't a real session
+        return captured, t
+
+    def test_idle_timeout_maps_to_keepalive_timeout(self):
+        captured, t = self._connect_capture(idle_timeout_millis=90000)
+        assert captured['connector']['keepalive_timeout'] == 90
+        t.close()
+
+    def test_connect_timeout_sets_sock_connect(self):
+        captured, t = self._connect_capture(connect_timeout_millis=3000)
+        timeout = captured['session']['timeout']
+        assert timeout.sock_connect == 3
+        t.close()
+
+    def test_keep_alive_wires_socket_factory(self):
+        captured, t = self._connect_capture(keep_alive_time_millis=30000)
+        # aiohttp >= 3.11 is the declared floor, so socket_factory is always 
used.
+        assert 'socket_factory' in captured['connector']
+        assert 'socket_options' not in captured['connector']
+        t.close()
+
+    def test_max_connections_sets_connector_limit(self):
+        captured, t = self._connect_capture(max_connections=7)
+        assert captured['connector']['limit'] == 7
+        t.close()
+
+    def test_no_max_connections_leaves_limit_unset(self):
+        captured, t = self._connect_capture()
+        assert 'limit' not in captured['connector']
+        t.close()
+
+    def test_read_timeout_sets_sock_read(self):
+        captured, t = self._connect_capture(connect_timeout_millis=3000, 
read_timeout_millis=11000)
+        timeout = captured['session']['timeout']
+        assert timeout.sock_connect == 3
+        assert timeout.sock_read == 11
+        t.close()
+
+    def test_default_timeout_has_no_unbounded_total(self):
+        # Defaults must still arm a socket-connect bound; building 
ClientTimeout
+        # from the socket knobs (not a whole-request total) keeps streaming 
safe.
+        captured, t = self._connect_capture()
+        timeout = captured['session']['timeout']
+        assert timeout.sock_connect == 5
+        assert timeout.total is None
+        t.close()
+
+    def test_trust_env_passed_to_session(self):
+        captured, t = self._connect_capture(trust_env=True)
+        assert captured['session']['trust_env'] is True
+        t.close()
+
+    def test_ssl_context_passed_to_connector(self):
+        import ssl as ssl_module
+        ctx = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_CLIENT)
+        captured, t = self._connect_capture(ssl=ctx)
+        assert captured['connector']['ssl_context'] is ctx
+        t.close()
+
+
+class TestTransportWriteCompression:
+
+    def _make_transport(self, **kwargs):
+        t = AiohttpHTTPTransport(**kwargs)
+        t._url = 'http://localhost:8182/gremlin'
+
+        captured = {}
+
+        async def fake_post(url, data, headers, **post_kwargs):
+            captured['headers'] = headers
+            captured['post_kwargs'] = post_kwargs
+            return MagicMock()
+
+        session = MagicMock()
+        session.post = fake_post
+        t._client_session = session
+        return t, captured
+
+    def test_default_offers_deflate(self):
+        # With the default compression ('deflate'), the transport advertises
+        # Accept-Encoding: deflate and does not skip the auto-header.
+        t, captured = self._make_transport()
+        t.write({'headers': {'accept': 'x'}, 'payload': b'{}'})
+        assert captured['headers'].get('accept-encoding') == 'deflate'
+        assert 'skip_auto_headers' not in captured['post_kwargs']
+        t._client_session = None
+        t.close()
+
+    def test_deflate_sets_accept_encoding(self):
+        t, captured = self._make_transport(compression='deflate')
+        t.write({'headers': {'accept': 'x'}, 'payload': b'{}'})
+        assert captured['headers'].get('accept-encoding') == 'deflate'
+        # deflate must NOT skip the Accept-Encoding auto-header
+        assert 'skip_auto_headers' not in captured['post_kwargs']
+        t._client_session = None
+        t.close()
+
+    def test_none_suppresses_auto_accept_encoding(self):
+        # compression='none' must stop aiohttp from auto-injecting
+        # Accept-Encoding (gzip, deflate, ...), which would otherwise silently
+        # negotiate compression. No explicit header is added; instead the
+        # auto-header is skipped.
+        t, captured = self._make_transport(compression='none')
+        t.write({'headers': {'accept': 'x'}, 'payload': b'{}'})
+        assert 'accept-encoding' not in captured['headers']
+        assert captured['post_kwargs'].get('skip_auto_headers') == 
['Accept-Encoding']
+        t._client_session = None
+        t.close()
+
+    def test_none_respects_explicit_accept_encoding(self):
+        # A caller/interceptor that explicitly sets Accept-Encoding is honored,
+        # and the auto-header skip is not applied so their value is sent.
+        t, captured = self._make_transport(compression='none')
+        t.write({'headers': {'Accept-Encoding': 'gzip'}, 'payload': b'{}'})
+        assert captured['headers']['Accept-Encoding'] == 'gzip'
+        assert 'skip_auto_headers' not in captured['post_kwargs']
+        t._client_session = None
+        t.close()
+
+    def test_deflate_respects_existing_header(self):
+        t, captured = self._make_transport(compression='deflate')
+        t.write({'headers': {'Accept-Encoding': 'gzip'}, 'payload': b'{}'})
+        assert captured['headers']['Accept-Encoding'] == 'gzip'
+        assert 'accept-encoding' not in captured['headers']
+        assert 'skip_auto_headers' not in captured['post_kwargs']
+        t._client_session = None
+        t.close()
+
+    def test_proxy_passed_to_post(self):
+        t, captured = self._make_transport(proxy='http://proxy:3128')
+        t.write({'headers': {}, 'payload': b'{}'})
+        assert captured['post_kwargs'].get('proxy') == 'http://proxy:3128'
+        t._client_session = None
+        t.close()

Reply via email to