This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new 669de5040 feat(python): add async grpc mode for python (#3768)
669de5040 is described below
commit 669de50404c25299c588de2db079ba9362cca8f8
Author: Shawn Yang <[email protected]>
AuthorDate: Wed Jun 17 11:33:46 2026 +0530
feat(python): add async grpc mode for python (#3768)
## Why?
Python gRPC service generation needs first-class AsyncIO support while
still allowing existing synchronous `grpcio` applications to generate
sync-compatible companions.
## What does this PR do?
- Defaults generated Python gRPC companions to `grpc.aio` servicers and
async request handling.
- Adds `--grpc-python-mode={async,sync}` for choosing Python gRPC API
mode, with CLI validation that the option is only used with Python
`--grpc` output.
- Keeps synchronous Python gRPC generation available through explicit
sync mode.
- Updates compiler tests, generated-code docs, compiler guide pages, and
language gRPC support docs for the new Python mode behavior.
- Splits Java/Python gRPC interop coverage into async and sync Python
peers, shares common test values, and updates CI/test class names.
## Related issues
#3273
## AI Contribution Checklist
- [ ] Substantial AI assistance was used in this PR: `yes` / `no`
- [ ] If `yes`, I included a completed [AI Contribution
Checklist](https://github.com/apache/fory/blob/main/AI_POLICY.md#9-contributor-checklist-for-ai-assisted-prs)
in this PR description and the required `AI Usage Disclosure`.
- [ ] If `yes`, my PR description includes the required `ai_review`
summary and screenshot evidence of the final clean AI review results
from both fresh reviewers on the current PR diff or current HEAD after
the latest code changes.
## Does this PR introduce any user-facing change?
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
---
.github/workflows/ci.yml | 8 +-
compiler/fory_compiler/cli.py | 22 ++
compiler/fory_compiler/generators/base.py | 1 +
.../fory_compiler/generators/services/python.py | 35 +-
.../fory_compiler/tests/test_service_codegen.py | 109 +++++-
docs/compiler/compiler-guide.md | 20 +-
docs/compiler/flatbuffers-idl.md | 7 +-
docs/compiler/generated-code.md | 16 +-
docs/compiler/index.md | 6 +-
docs/compiler/protobuf-idl.md | 17 +-
docs/compiler/schema-idl.md | 4 +-
docs/guide/csharp/grpc-support.md | 2 +-
docs/guide/go/grpc-support.md | 2 +-
docs/guide/java/grpc-support.md | 2 +-
docs/guide/javascript/grpc-support.md | 2 +-
docs/guide/kotlin/grpc-support.md | 2 +-
docs/guide/python/grpc-support.md | 167 ++++++---
docs/guide/rust/grpc-support.md | 2 +-
docs/guide/scala/grpc-support.md | 2 +-
integration_tests/grpc_tests/generate_grpc.py | 14 +
.../{GoGrpcInteropTest.java => GoGrpcTest.java} | 2 +-
.../org/apache/fory/grpc_tests/GrpcTestBase.java | 32 +-
...linGrpcInteropTest.java => KotlinGrpcTest.java} | 2 +-
...pcInteropTest.java => PythonAsyncGrpcTest.java} | 8 +-
...rpcInteropTest.java => PythonSyncGrpcTest.java} | 11 +-
...{RustGrpcInteropTest.java => RustGrpcTest.java} | 2 +-
.../__init__.py} | 19 -
.../grpc_tests/python/grpc_sync_tests/grpc_peer.py | 244 +++++++++++++
.../__init__.py} | 19 -
.../grpc_tests/python/grpc_test_common/values.py | 194 ++++++++++
.../grpc_tests/python/grpc_tests/grpc_interop.py | 400 ---------------------
.../grpc_tests/python/grpc_tests/grpc_peer.py | 268 ++++++++++++++
integration_tests/grpc_tests/python/pyproject.toml | 2 +-
integration_tests/grpc_tests/run_tests.sh | 2 +-
34 files changed, 1097 insertions(+), 548 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index dee626009..287b9212b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -729,7 +729,7 @@ jobs:
- name: Run Java/Python gRPC Tests
run: |
cd integration_tests/grpc_tests/java
- mvn -T16 --no-transfer-progress -Dtest=PythonGrpcInteropTest test
+ mvn -T16 --no-transfer-progress
-Dtest=PythonAsyncGrpcTest,PythonSyncGrpcTest test
grpc_java_go_tests:
name: Java/Go gRPC Tests
@@ -768,7 +768,7 @@ jobs:
- name: Run Java/Go gRPC Tests
run: |
cd integration_tests/grpc_tests/java
- mvn -T16 --no-transfer-progress -Dtest=GoGrpcInteropTest test
+ mvn -T16 --no-transfer-progress -Dtest=GoGrpcTest test
grpc_java_rust_tests:
name: Java/Rust gRPC Tests
@@ -810,7 +810,7 @@ jobs:
- name: Run Java/Rust gRPC Tests
run: |
cd integration_tests/grpc_tests/java
- mvn -T16 --no-transfer-progress -Dtest=RustGrpcInteropTest test
+ mvn -T16 --no-transfer-progress -Dtest=RustGrpcTest test
grpc_java_kotlin_tests:
name: Java/Kotlin gRPC Tests
@@ -853,7 +853,7 @@ jobs:
- name: Run Java/Kotlin gRPC Tests
run: |
cd integration_tests/grpc_tests/java
- mvn -T16 --no-transfer-progress -Dtest=KotlinGrpcInteropTest test
+ mvn -T16 --no-transfer-progress -Dtest=KotlinGrpcTest test
javascript:
name: JavaScript CI
diff --git a/compiler/fory_compiler/cli.py b/compiler/fory_compiler/cli.py
index 004666cac..7410e727b 100644
--- a/compiler/fory_compiler/cli.py
+++ b/compiler/fory_compiler/cli.py
@@ -660,6 +660,12 @@ def parse_args(args: Optional[List[str]] = None) ->
argparse.Namespace:
action="store_true",
help="Generate JavaScript gRPC-Web client code",
)
+ parser.add_argument(
+ "--grpc-python-mode",
+ choices=["async", "sync"],
+ default=None,
+ help="Python gRPC API mode: async (default) or sync",
+ )
return parser.parse_args(args)
@@ -709,6 +715,7 @@ def compile_file(
resolve_cache: Optional[Dict[Path, Schema]] = None,
grpc: bool = False,
grpc_web: bool = False,
+ grpc_python_mode: str = "async",
*,
generated_outputs: Optional[Dict[Path, Path]] = None,
) -> bool:
@@ -780,6 +787,7 @@ def compile_file(
swift_namespace_style=swift_namespace_style,
grpc=grpc,
grpc_web=grpc_web,
+ grpc_python_mode=grpc_python_mode,
)
generator_class = GENERATORS[lang]
@@ -851,6 +859,7 @@ def compile_file_recursive(
generated_outputs: Dict[Path, Path],
grpc: bool = False,
grpc_web: bool = False,
+ grpc_python_mode: str = "async",
) -> bool:
file_path = file_path.resolve()
if file_path in generated:
@@ -917,6 +926,7 @@ def compile_file_recursive(
generated_outputs,
grpc,
grpc_web,
+ grpc_python_mode,
):
stack.remove(file_path)
return False
@@ -933,6 +943,7 @@ def compile_file_recursive(
resolve_cache,
grpc,
grpc_web,
+ grpc_python_mode,
generated_outputs=generated_outputs,
)
if ok:
@@ -1018,6 +1029,16 @@ def cmd_compile(args: argparse.Namespace) -> int:
file=sys.stderr,
)
return 1
+ grpc_python_mode = args.grpc_python_mode or "async"
+ if args.grpc_python_mode is not None and not args.grpc:
+ print("Error: --grpc-python-mode requires --grpc.", file=sys.stderr)
+ return 1
+ if args.grpc_python_mode is not None and "python" not in lang_output_dirs:
+ print(
+ "Error: --grpc-python-mode is only supported with Python output.",
+ file=sys.stderr,
+ )
+ return 1
# Create output directories
for out_dir in lang_output_dirs.values():
@@ -1049,6 +1070,7 @@ def cmd_compile(args: argparse.Namespace) -> int:
generated_outputs,
args.grpc,
args.grpc_web,
+ grpc_python_mode,
):
success = False
except ImportError as e:
diff --git a/compiler/fory_compiler/generators/base.py
b/compiler/fory_compiler/generators/base.py
index 438bcb446..d4900686f 100644
--- a/compiler/fory_compiler/generators/base.py
+++ b/compiler/fory_compiler/generators/base.py
@@ -54,6 +54,7 @@ class GeneratorOptions:
swift_namespace_style: Optional[str] = None
grpc: bool = False
grpc_web: bool = False
+ grpc_python_mode: str = "async"
class BaseGenerator(ABC):
diff --git a/compiler/fory_compiler/generators/services/python.py
b/compiler/fory_compiler/generators/services/python.py
index feb1a259b..f92944894 100644
--- a/compiler/fory_compiler/generators/services/python.py
+++ b/compiler/fory_compiler/generators/services/python.py
@@ -65,12 +65,15 @@ class PythonServiceGeneratorMixin:
def generate_grpc_module(self, services: List[Service]) -> GeneratedFile:
"""Generate a grpcio-style companion module for schema services."""
module_name = self.get_module_name()
+ async_mode = self.python_grpc_async()
lines = []
lines.append(self.get_license_header("#"))
lines.append("")
lines.append("from __future__ import annotations")
lines.append("")
lines.append("import grpc")
+ if async_mode:
+ lines.append("import grpc.aio")
lines.append(f"import {module_name} as _models")
lines.append("")
lines.append("")
@@ -86,7 +89,10 @@ class PythonServiceGeneratorMixin:
for service in services:
lines.extend(self.generate_python_grpc_stub(service))
lines.append("")
- lines.extend(self.generate_python_grpc_servicer(service))
+ if async_mode:
+ lines.extend(self.generate_python_grpc_async_servicer(service))
+ else:
+ lines.extend(self.generate_python_grpc_servicer(service))
lines.append("")
lines.extend(self.generate_python_grpc_registration(service))
lines.append("")
@@ -136,6 +142,27 @@ class PythonServiceGeneratorMixin:
lines.append(' raise NotImplementedError("Method not
implemented!")')
return lines
+ def generate_python_grpc_async_servicer(self, service: Service) ->
List[str]:
+ lines = []
+ lines.append(f"class {service.name}Servicer(object):")
+ lines.append(f' """AsyncIO base servicer for {service.name}."""')
+ if not service.methods:
+ lines.append(" pass")
+ return lines
+ for method in service.methods:
+ python_name = self.python_grpc_method_name(method)
+ lines.append("")
+ if method.client_streaming:
+ lines.append(
+ f" async def {python_name}(self, request_iterator,
context):"
+ )
+ else:
+ lines.append(f" async def {python_name}(self, request,
context):")
+ lines.append(
+ ' await context.abort(grpc.StatusCode.UNIMPLEMENTED,
"Method not implemented!")'
+ )
+ return lines
+
def generate_python_grpc_registration(self, service: Service) -> List[str]:
lines = []
add_fn = self.python_grpc_add_servicer_name(service)
@@ -194,3 +221,9 @@ class PythonServiceGeneratorMixin:
def python_grpc_add_servicer_name(self, service: Service) -> str:
return
f"_add_{self.safe_name(self.to_snake_case(service.name))}_servicer"
+
+ def python_grpc_async(self) -> bool:
+ mode = self.options.grpc_python_mode
+ if mode not in ("async", "sync"):
+ raise ValueError(f"Unknown Python gRPC mode: {mode}")
+ return mode == "async"
diff --git a/compiler/fory_compiler/tests/test_service_codegen.py
b/compiler/fory_compiler/tests/test_service_codegen.py
index 8766929df..b7d2d2a97 100644
--- a/compiler/fory_compiler/tests/test_service_codegen.py
+++ b/compiler/fory_compiler/tests/test_service_codegen.py
@@ -184,12 +184,18 @@ def test_python_grpc_byte_callbacks():
files = generate_service_files(schema, PythonGenerator)
assert set(files) == {"demo_greeter_grpc.py"}
content = files["demo_greeter_grpc.py"]
+ assert "import grpc.aio" in content
assert "class GreeterStub(object):" in content
assert "class GreeterServicer(object):" in content
assert "def add_servicer(servicer, server):" in content
assert "add_GreeterServicer_to_server" not in content
assert "self.say_hello = channel.unary_unary(" in content
- assert "def say_hello(self, request, context):" in content
+ assert "async def say_hello(self, request, context):" in content
+ assert (
+ 'await context.abort(grpc.StatusCode.UNIMPLEMENTED, "Method not
implemented!")'
+ in content
+ )
+ assert "raise NotImplementedError" not in content
assert ' "SayHello": grpc.unary_unary_rpc_method_handler(' in
content
assert "servicer.say_hello" in content
assert "return _models._get_fory().serialize(value)" in content
@@ -199,6 +205,29 @@ def test_python_grpc_byte_callbacks():
assert "FromString" not in content
+def test_python_grpc_sync_mode():
+ schema = parse_fdl(_GREETER_WITH_SERVICE)
+ generator = PythonGenerator(
+ schema,
+ GeneratorOptions(output_dir=Path("/tmp"), grpc=True,
grpc_python_mode="sync"),
+ )
+ files = {item.path: item.content for item in generator.generate_services()}
+ assert set(files) == {"demo_greeter_grpc.py"}
+ content = files["demo_greeter_grpc.py"]
+ assert "import grpc.aio" not in content
+ assert "class GreeterStub(object):" in content
+ assert "class GreeterServicer(object):" in content
+ assert "self.say_hello = channel.unary_unary(" in content
+ assert "def say_hello(self, request, context):" in content
+ assert "async def say_hello" not in content
+ assert "context.set_code(grpc.StatusCode.UNIMPLEMENTED)" in content
+ assert 'context.set_details("Method not implemented!")' in content
+ assert 'raise NotImplementedError("Method not implemented!")' in content
+ assert '"/demo.greeter.Greeter/SayHello"' in content
+ assert "SerializeToString" not in content
+ assert "FromString" not in content
+
+
def test_js_grpc_uses_model_fory():
schema = parse_fdl(_GREETER_WITH_SERVICE)
files = generate_service_files(schema, JavaScriptGenerator)
@@ -974,15 +1003,31 @@ def test_grpc_streaming_method_shapes():
assert "io.grpc.MethodDescriptor<Payload, Payload>" in java
python = next(iter(generate_service_files(schema,
PythonGenerator).values()))
+ assert "import grpc.aio" in python
assert "channel.unary_unary(" in python
assert "channel.unary_stream(" in python
assert "channel.stream_unary(" in python
assert "channel.stream_stream(" in python
+ assert "grpc.unary_unary_rpc_method_handler(" in python
+ assert "grpc.unary_stream_rpc_method_handler(" in python
+ assert "grpc.stream_unary_rpc_method_handler(" in python
assert "grpc.stream_stream_rpc_method_handler(" in python
assert "self.unary = channel.unary_unary(" in python
assert "self.server = channel.unary_stream(" in python
assert "self.client = channel.stream_unary(" in python
assert "self.bidi = channel.stream_stream(" in python
+ assert "async def unary(self, request, context):" in python
+ assert "async def server(self, request, context):" in python
+ assert "async def client(self, request_iterator, context):" in python
+ assert "async def bidi(self, request_iterator, context):" in python
+ assert (
+ python.count(
+ 'await context.abort(grpc.StatusCode.UNIMPLEMENTED, "Method not
implemented!")'
+ )
+ == 4
+ )
+ assert "yield" not in python
+ assert "raise NotImplementedError" not in python
go = next(iter(generate_service_files(schema, GoGenerator).values()))
assert "ClientStreams:\ttrue" in go
@@ -1535,7 +1580,7 @@ def test_scala_grpc_preflight_collision(tmp_path: Path,
capsys):
assert "GreeterGrpc.scala" in err
-def test_java_grpc_service_class_collision_with_imported_type_fails(tmp_path:
Path):
+def test_java_grpc_class_import_collision(tmp_path: Path):
common = tmp_path / "common.fdl"
common.write_text(
dedent(
@@ -1959,6 +2004,28 @@ def test_compile_js_grpc_web(tmp_path: Path, capsys):
assert output.count("service_grpc_web.ts") == 1
+def test_compile_python_grpc_sync_mode(tmp_path: Path, capsys):
+ schema_path = tmp_path / "service.fdl"
+ schema_path.write_text(_GREETER_WITH_SERVICE)
+ python_out = tmp_path / "python"
+ ok = compile_file(
+ schema_path,
+ {"python": python_out},
+ grpc=True,
+ grpc_python_mode="sync",
+ generated_outputs={},
+ )
+ output = capsys.readouterr().out
+
+ assert ok is True
+ service_file = python_out / "demo_greeter_grpc.py"
+ assert service_file.exists()
+ content = service_file.read_text()
+ assert "import grpc.aio" not in content
+ assert "def say_hello(self, request, context):" in content
+ assert output.count("demo_greeter_grpc.py") == 1
+
+
def test_cli_rejects_grpc_web_non_js(tmp_path: Path, capsys):
schema_path = tmp_path / "service.fdl"
schema_path.write_text(_GREETER_WITH_SERVICE)
@@ -1977,6 +2044,44 @@ def test_cli_rejects_grpc_web_non_js(tmp_path: Path,
capsys):
)
+def test_cli_rejects_mode_without_grpc(tmp_path: Path, capsys):
+ schema_path = tmp_path / "service.fdl"
+ schema_path.write_text(_GREETER_WITH_SERVICE)
+ args = parse_args(
+ [
+ str(schema_path),
+ "--python_out",
+ str(tmp_path / "python"),
+ "--grpc-python-mode",
+ "sync",
+ ]
+ )
+
+ assert cmd_compile(args) == 1
+ assert "--grpc-python-mode requires --grpc" in capsys.readouterr().err
+
+
+def test_cli_rejects_mode_non_python(tmp_path: Path, capsys):
+ schema_path = tmp_path / "service.fdl"
+ schema_path.write_text(_GREETER_WITH_SERVICE)
+ args = parse_args(
+ [
+ str(schema_path),
+ "--java_out",
+ str(tmp_path / "java"),
+ "--grpc",
+ "--grpc-python-mode",
+ "sync",
+ ]
+ )
+
+ assert cmd_compile(args) == 1
+ assert (
+ "--grpc-python-mode is only supported with Python output"
+ in capsys.readouterr().err
+ )
+
+
def test_js_output_path_collision(tmp_path: Path, capsys):
first_dir = tmp_path / "first"
second_dir = tmp_path / "second"
diff --git a/docs/compiler/compiler-guide.md b/docs/compiler/compiler-guide.md
index 85d5bace5..0c5ebd491 100644
--- a/docs/compiler/compiler-guide.md
+++ b/docs/compiler/compiler-guide.md
@@ -73,6 +73,7 @@ Compile options:
| `--emit-fdl` | Emit translated FDL (for non-FDL
inputs) | `false` |
| `--emit-fdl-path` | Write translated FDL to this path
(file or directory) | (stdout) |
| `--grpc` | Generate gRPC service companions for
supported outputs | `false` |
+| `--grpc-python-mode=MODE` | Python gRPC mode: `async` or `sync`
| `async` |
| `--grpc-web` | Generate JavaScript gRPC-Web client
companions | `false` |
Schema-level file options are supported for language-specific generation
choices.
@@ -155,16 +156,23 @@ foryc compiler/examples/service.fdl
--java_out=./generated/java --python_out=./g
```
The generated gRPC service code uses Fory to serialize request and response
-payloads. Java output imports grpc-java APIs, Python output imports `grpc`, Go
-output imports grpc-go, Rust output imports `tonic` and `bytes`, Scala output
-imports grpc-java APIs, and Kotlin output imports grpc-java and grpc-kotlin
APIs
-and uses coroutine stubs. C# output imports `Grpc.Core.Api` types and can be
-hosted with normal .NET gRPC packages such as `Grpc.AspNetCore` or called
-through `Grpc.Net.Client`. JavaScript output imports `@grpc/grpc-js`.
+payloads. Java output imports grpc-java APIs, Python output defaults to
+`grpc.aio`, Go output imports grpc-go, Rust output imports `tonic` and `bytes`,
+Scala output imports grpc-java APIs, and Kotlin output imports grpc-java and
+grpc-kotlin APIs and uses coroutine stubs. C# output imports `Grpc.Core.Api`
+types and can be hosted with normal .NET gRPC packages such as
`Grpc.AspNetCore`
+or called through `Grpc.Net.Client`. JavaScript output imports `@grpc/grpc-js`.
Applications that compile or run those generated service files must provide
their own gRPC dependencies. Fory packages do not add a hard gRPC dependency
for
this feature.
+Generate synchronous Python gRPC companions for existing sync `grpcio`
+applications with:
+
+```bash
+foryc compiler/examples/service.fdl --python_out=./generated/python --grpc
--grpc-python-mode=sync
+```
+
**Generate JavaScript gRPC-Web browser clients:**
```bash
diff --git a/docs/compiler/flatbuffers-idl.md b/docs/compiler/flatbuffers-idl.md
index abf665218..89b375153 100644
--- a/docs/compiler/flatbuffers-idl.md
+++ b/docs/compiler/flatbuffers-idl.md
@@ -144,9 +144,10 @@ foryc api.fbs --java_out=./generated/java
--python_out=./generated/python --go_o
Generated service code imports grpc APIs, so applications must provide
grpc-java,
grpc-kotlin, Scala grpc-java APIs, `grpcio`, grpc-go, Rust `tonic` and `bytes`,
`@grpc/grpc-js`, or C# `Grpc.Core.Api` plus server/client dependencies when
they
-compile or run those files. Fory packages do not add gRPC as a hard dependency.
-Use `--grpc-web` with JavaScript output to generate browser clients that import
-`grpc-web`.
+compile or run those files. Python companions use `grpc.aio` by default and can
+be generated in sync mode with `--grpc-python-mode=sync`. Fory packages do not
+add gRPC as a hard dependency. Use `--grpc-web` with JavaScript output to
+generate browser clients that import `grpc-web`.
### Defaults and Metadata
diff --git a/docs/compiler/generated-code.md b/docs/compiler/generated-code.md
index 9180887bb..1f52db67e 100644
--- a/docs/compiler/generated-code.md
+++ b/docs/compiler/generated-code.md
@@ -373,9 +373,14 @@ restored = Person.from_bytes(data)
When a schema contains services and the compiler is run with `--grpc`, Python
generation emits a companion module named `<module>_grpc.py`. The module name
is
derived from the Fory package by replacing dots with underscores, or
`generated`
-when the schema has no package.
+when the schema has no package. Python gRPC output defaults to `grpc.aio`
+AsyncIO APIs.
```python
+import grpc
+import grpc.aio
+
+
class AddressBookServiceStub:
def __init__(self, channel):
self.lookup = channel.unary_unary(
@@ -386,8 +391,8 @@ class AddressBookServiceStub:
class AddressBookServiceServicer:
- def lookup(self, request, context):
- raise NotImplementedError("Method not implemented!")
+ async def lookup(self, request, context):
+ await context.abort(grpc.StatusCode.UNIMPLEMENTED, "Method not
implemented!")
def add_servicer(servicer, server): ...
@@ -400,6 +405,11 @@ companion module must install `grpcio`; `pyfory` does not
add a hard gRPC
dependency. The Python API uses snake_case method names while preserving the
original IDL method names in the gRPC wire paths.
+Generate synchronous Python `grpcio` companions with
+`--grpc --grpc-python-mode=sync`. Sync mode keeps the same generated filename
+and public names, but servicer methods use regular `def` methods and sync
+`grpc.Channel` and `grpc.Server` instances.
+
## Rust
### Output Layout
diff --git a/docs/compiler/index.md b/docs/compiler/index.md
index be81f5bfc..da10b2cd1 100644
--- a/docs/compiler/index.md
+++ b/docs/compiler/index.md
@@ -99,8 +99,10 @@ The generated service code uses normal gRPC APIs, but
request and response
objects are serialized with Fory. Applications provide their own grpc-java,
grpc-kotlin, Scala grpc-java APIs, `grpcio`, grpc-go, Rust `tonic` and `bytes`,
or C# `Grpc.Core.Api` and hosting/client dependencies; Fory packages do not add
-gRPC as a hard dependency. JavaScript Node.js companions use `@grpc/grpc-js`;
-browser clients are generated separately with `--grpc-web` and use `grpc-web`.
+gRPC as a hard dependency. Python companions use `grpc.aio` by default and can
+be generated in sync mode with `--grpc-python-mode=sync`. JavaScript Node.js
+companions use `@grpc/grpc-js`; browser clients are generated separately with
+`--grpc-web` and use `grpc-web`.
## Why Fory IDL?
diff --git a/docs/compiler/protobuf-idl.md b/docs/compiler/protobuf-idl.md
index 2c2360242..da5deba31 100644
--- a/docs/compiler/protobuf-idl.md
+++ b/docs/compiler/protobuf-idl.md
@@ -319,14 +319,15 @@ foryc api.proto --java_out=./generated/java
--python_out=./generated/python --go
```
Generated Java service files compile against grpc-java, generated Python
service
-modules import `grpc`, generated Rust service files import `tonic` and `bytes`,
-generated Go service files import grpc-go, generated JavaScript Node.js service
-files import `@grpc/grpc-js`,
-generated C# service files import `Grpc.Core.Api` types, generated Scala
service
-files compile against grpc-java, and generated Kotlin service files compile
-against grpc-java and grpc-kotlin. Add those dependencies in your application
-build; Fory packages do not add gRPC as a hard dependency. Use `--grpc-web`
-with JavaScript output to generate browser clients that import `grpc-web`.
+modules use `grpc.aio` by default, generated Rust service files import `tonic`
+and `bytes`, generated Go service files import grpc-go, generated JavaScript
+Node.js service files import `@grpc/grpc-js`, generated C# service files import
+`Grpc.Core.Api` types, generated Scala service files compile against grpc-java,
+and generated Kotlin service files compile against grpc-java and grpc-kotlin.
+Add those dependencies in your application build; Fory packages do not add gRPC
+as a hard dependency. Use `--grpc-python-mode=sync` for sync Python `grpcio`
+companions. Use `--grpc-web` with JavaScript output to generate browser clients
+that import `grpc-web`.
Protobuf `oneof` fields are translated to Fory union fields inside request and
response messages. Direct union RPC request or response types are not part of
normal protobuf RPC syntax.
diff --git a/docs/compiler/schema-idl.md b/docs/compiler/schema-idl.md
index 28da5fa7d..d2e12c989 100644
--- a/docs/compiler/schema-idl.md
+++ b/docs/compiler/schema-idl.md
@@ -953,7 +953,9 @@ service PetDirectory {
Applications that compile or run those companions provide their own gRPC
dependency, such as grpc-java, grpc-kotlin, `grpcio`, grpc-go, Rust `tonic`
and `bytes`, Scala grpc-java APIs, `@grpc/grpc-js`, `grpc-web`, or C#
- `Grpc.Core.Api` plus a server or client package.
+ `Grpc.Core.Api` plus a server or client package. Python companions use
+ `grpc.aio` by default and can be generated in sync mode with
+ `--grpc-python-mode=sync`.
**Grammar:**
diff --git a/docs/guide/csharp/grpc-support.md
b/docs/guide/csharp/grpc-support.md
index 485900d82..e017664c4 100644
--- a/docs/guide/csharp/grpc-support.md
+++ b/docs/guide/csharp/grpc-support.md
@@ -284,7 +284,7 @@ A gRPC service named `Greeter` still generates the service
companion
schema files target the same C# namespace without colliding. No
namespace-derived or service-derived module alias is generated.
-## Operations
+## gRPC Runtime Behavior
The generated service code only replaces request and response serialization.
All normal gRPC operational features still belong to your gRPC stack:
diff --git a/docs/guide/go/grpc-support.md b/docs/guide/go/grpc-support.md
index 02418eba1..4c221dc90 100644
--- a/docs/guide/go/grpc-support.md
+++ b/docs/guide/go/grpc-support.md
@@ -201,7 +201,7 @@ Generated Go code follows grpc-go conventions:
- The generated codec is used for every message frame, including streaming
frames.
-## Operations
+## gRPC Runtime Behavior
The generated service companion only supplies Fory serialization. Operational
behavior remains standard grpc-go behavior:
diff --git a/docs/guide/java/grpc-support.md b/docs/guide/java/grpc-support.md
index d1c899329..2239444fa 100644
--- a/docs/guide/java/grpc-support.md
+++ b/docs/guide/java/grpc-support.md
@@ -366,7 +366,7 @@ final class StreamingClient {
The generated descriptors preserve the exact IDL service and method names for
the gRPC path.
-## Operations
+## gRPC Runtime Behavior
The generated service code only replaces request and response serialization.
All normal gRPC operational features still belong to grpc-java:
diff --git a/docs/guide/javascript/grpc-support.md
b/docs/guide/javascript/grpc-support.md
index 696ecef24..2cbc68dbe 100644
--- a/docs/guide/javascript/grpc-support.md
+++ b/docs/guide/javascript/grpc-support.md
@@ -291,7 +291,7 @@ stream.on("end", () => {
});
```
-## Operations
+## gRPC Runtime Behavior
Generated service code only replaces request and response serialization. Normal
gRPC operational features still belong to the transport package:
diff --git a/docs/guide/kotlin/grpc-support.md
b/docs/guide/kotlin/grpc-support.md
index cf517df44..c18eabebf 100644
--- a/docs/guide/kotlin/grpc-support.md
+++ b/docs/guide/kotlin/grpc-support.md
@@ -230,7 +230,7 @@ stub.chat(
}
```
-## Operations
+## gRPC Runtime Behavior
The generated service code only replaces request and response serialization.
All normal gRPC operational features still belong to grpc-java and
diff --git a/docs/guide/python/grpc-support.md
b/docs/guide/python/grpc-support.md
index cdcff9224..0c5921cc6 100644
--- a/docs/guide/python/grpc-support.md
+++ b/docs/guide/python/grpc-support.md
@@ -28,19 +28,16 @@ IDL, or FlatBuffers IDL and you want gRPC transport
semantics with Fory payload
encoding. Use standard protobuf gRPC code generation when clients or tools must
consume protobuf message bytes directly.
-Generated Python companions currently target the synchronous `grpcio` API. Use
-regular `def` servicer methods, `grpc.server(...)`, standard `grpc.Channel`
-instances, and Python iterators or generators for streaming RPCs. The generated
-stub accepts any channel configured by your application. The compiler does not
-generate `grpc.aio` stubs or service bases, so do not implement generated
-servicer methods as `async def` unless you add a custom adapter outside the
-generated companion. Python gRPC async support based on `grpc.aio` will be
-available in the next Fory release.
+Python gRPC generation defaults to the `grpc.aio` AsyncIO API. Generated
+servicer bases use `async def` methods, generated stubs are used with
+`grpc.aio.Channel` instances, and streaming RPCs use async iterables.
Synchronous
+`grpcio` companions are still available with `--grpc-python-mode=sync`.
## Install Dependencies
-Install `grpcio` alongside `pyfory`. The generated companion imports `grpc`,
but
-`pyfory` does not add gRPC as a hard dependency.
+Install `grpcio` alongside `pyfory`. The generated companion imports `grpc`
and,
+in the default mode, `grpc.aio`, but `pyfory` does not add gRPC as a hard
+dependency.
```bash
pip install pyfory grpcio
@@ -75,79 +72,82 @@ foryc service.fdl --python_out=./generated/python --grpc
For this schema, the Python generator emits:
-| File | Purpose |
-| ---------------------- | ------------------------------------------- |
-| `demo_greeter.py` | Fory dataclasses and registration helpers |
-| `demo_greeter_grpc.py` | `grpcio` stub, servicer base, and registrar |
+| File | Purpose |
+| ---------------------- | --------------------------------------------- |
+| `demo_greeter.py` | Fory dataclasses and registration helpers |
+| `demo_greeter_grpc.py` | `grpc.aio` stub, servicer base, and registrar |
The module name is derived from the Fory package by replacing dots with
underscores. A schema with no package uses `generated.py` and
`generated_grpc.py`.
-## Implement a Server
+## Implement an Async Server
-Subclass the generated servicer and register it with a normal `grpcio` server.
+Subclass the generated servicer and register it with a `grpc.aio` server.
Generated Python method names use snake_case, while the gRPC wire path keeps
the
original IDL method name.
```python
-from concurrent import futures
+import asyncio
-import grpc
+import grpc.aio
import demo_greeter
import demo_greeter_grpc
class Greeter(demo_greeter_grpc.GreeterServicer):
- def say_hello(self, request, context):
+ async def say_hello(self, request, context):
return demo_greeter.HelloReply(reply=f"Hello, {request.name}")
-def serve():
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
+async def serve():
+ server = grpc.aio.server()
demo_greeter_grpc.add_servicer(Greeter(), server)
server.add_insecure_port("[::]:50051")
- server.start()
- server.wait_for_termination()
+ await server.start()
+ await server.wait_for_termination()
if __name__ == "__main__":
- serve()
+ asyncio.run(serve())
```
Generated request and response types are serialized by the generated companion,
so service implementations do not perform manual Fory registration.
-## Create a Client
+## Create an Async Client
-Use the generated stub with a normal `grpcio` channel. Production clients
-usually pass a TLS/auth-configured channel:
+Use the generated stub with a `grpc.aio` channel. Production clients usually
+pass a TLS/auth-configured channel:
```python
+import asyncio
+
import grpc
+import grpc.aio
import demo_greeter
import demo_greeter_grpc
-def main():
+async def main():
credentials = grpc.ssl_channel_credentials()
- with grpc.secure_channel("api.example.com:443", credentials) as channel:
+ async with grpc.aio.secure_channel("api.example.com:443", credentials) as
channel:
stub = demo_greeter_grpc.GreeterStub(channel)
- reply = stub.say_hello(demo_greeter.HelloRequest(name="Fory"))
+ reply = await stub.say_hello(demo_greeter.HelloRequest(name="Fory"))
print(reply.reply)
if __name__ == "__main__":
- main()
+ asyncio.run(main())
```
For local tests and development, an insecure channel can be used explicitly:
```python
-# Test-only channel. Use a TLS/auth-configured grpc.Channel in production.
-with grpc.insecure_channel("localhost:50051") as channel:
+# Test-only channel. Use a TLS/auth-configured grpc.aio.Channel in production.
+async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = demo_greeter_grpc.GreeterStub(channel)
```
@@ -168,63 +168,118 @@ service Greeter {
}
```
-Generated Python code follows `grpcio` conventions:
+Default Python gRPC output follows `grpc.aio` conventions:
-| IDL shape | Servicer method shape
| Stub method shape |
-| ----------------------------------------- |
------------------------------------------- | -------------------------------- |
-| `rpc A (Req) returns (Res)` | returns one response object
| returns one response object |
-| `rpc A (Req) returns (stream Res)` | yields response objects
| returns an iterator of responses |
-| `rpc A (stream Req) returns (Res)` | consumes an iterator and returns
a response | accepts an iterator of requests |
-| `rpc A (stream Req) returns (stream Res)` | consumes and yields iterators
| accepts and returns iterators |
+| IDL shape | Servicer method shape
| Stub method shape |
+| ----------------------------------------- |
----------------------------------------------- |
-------------------------------------- |
+| `rpc A (Req) returns (Res)` | `async def` returns one response
object | awaitable returns one response object |
+| `rpc A (Req) returns (stream Res)` | `async def` yields response
objects | returns an async iterator of responses |
+| `rpc A (stream Req) returns (Res)` | consumes an async iterator and
returns response | accepts an async iterator of requests |
+| `rpc A (stream Req) returns (stream Res)` | consumes and yields async
iterators | accepts and returns async iterators |
Servicer methods use snake_case names, while generated descriptors preserve the
exact IDL service and method names for the gRPC path.
-Server implementations can use Python iterators directly:
+Server implementations use async methods and async iteration:
```python
class Greeter(demo_greeter_grpc.GreeterServicer):
- def lots_of_replies(self, request, context):
+ async def lots_of_replies(self, request, context):
yield demo_greeter.HelloReply(reply=f"Hello, {request.name}")
yield demo_greeter.HelloReply(reply=f"Welcome, {request.name}")
- def lots_of_greetings(self, request_iterator, context):
- names = [request.name for request in request_iterator]
+ async def lots_of_greetings(self, request_iterator, context):
+ names = []
+ async for request in request_iterator:
+ names.append(request.name)
return demo_greeter.HelloReply(reply=", ".join(names))
- def chat(self, request_iterator, context):
- for request in request_iterator:
+ async def chat(self, request_iterator, context):
+ async for request in request_iterator:
yield demo_greeter.HelloReply(reply=f"Hello, {request.name}")
```
-Generated clients use the standard `grpcio` streaming call shapes:
+Generated clients use `grpc.aio` streaming call shapes:
```python
credentials = grpc.ssl_channel_credentials()
-with grpc.secure_channel("api.example.com:443", credentials) as channel:
+async with grpc.aio.secure_channel("api.example.com:443", credentials) as
channel:
stub = demo_greeter_grpc.GreeterStub(channel)
- for reply in stub.lots_of_replies(
+ async for reply in stub.lots_of_replies(
demo_greeter.HelloRequest(name="Fory")
):
print(reply.reply)
- def greeting_requests():
+ async def greeting_requests():
yield demo_greeter.HelloRequest(name="Ada")
yield demo_greeter.HelloRequest(name="Grace")
- summary = stub.lots_of_greetings(greeting_requests())
+ summary = await stub.lots_of_greetings(greeting_requests())
print(summary.reply)
- def chat_requests():
+ async def chat_requests():
yield demo_greeter.HelloRequest(name="Fory")
yield demo_greeter.HelloRequest(name="RPC")
- for reply in stub.chat(chat_requests()):
+ async for reply in stub.chat(chat_requests()):
print(reply.reply)
```
-## Operations
+## Sync Mode
+
+Use sync mode for existing synchronous `grpcio` applications or environments
+that do not run an asyncio event loop. Generate sync companions explicitly:
+
+```bash
+foryc service.fdl --python_out=./generated/python --grpc
--grpc-python-mode=sync
+```
+
+Sync mode emits the same `<module>_grpc.py` filename and public names, but the
+servicer methods use regular `def`, and applications use `grpc.server(...)` and
+standard `grpc.Channel` instances.
+
+Unary sync server example:
+
+```python
+from concurrent import futures
+
+import grpc
+
+import demo_greeter
+import demo_greeter_grpc
+
+
+class Greeter(demo_greeter_grpc.GreeterServicer):
+ def say_hello(self, request, context):
+ return demo_greeter.HelloReply(reply=f"Hello, {request.name}")
+
+
+server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
+demo_greeter_grpc.add_servicer(Greeter(), server)
+server.add_insecure_port("[::]:50051")
+server.start()
+server.wait_for_termination()
+```
+
+Unary sync client example:
+
+```python
+import grpc
+
+import demo_greeter
+import demo_greeter_grpc
+
+
+with grpc.insecure_channel("localhost:50051") as channel:
+ stub = demo_greeter_grpc.GreeterStub(channel)
+ reply = stub.say_hello(demo_greeter.HelloRequest(name="Fory"))
+ print(reply.reply)
+```
+
+Sync streaming follows the normal `grpcio` iterator and generator conventions.
+
+## gRPC Runtime Behavior
The generated service companion only supplies Fory serialization callbacks.
Operational behavior remains standard `grpcio` behavior:
@@ -233,8 +288,8 @@ Operational behavior remains standard `grpcio` behavior:
- TLS and authentication credentials
- Client and server interceptors
- Status codes, details, and metadata
-- Channel and server lifecycle
-- Thread pool sizing for synchronous servers
+- Async event loop, channel, and server lifecycle in default mode
+- Thread pool sizing for synchronous servers in sync mode
## Troubleshooting
diff --git a/docs/guide/rust/grpc-support.md b/docs/guide/rust/grpc-support.md
index 4c9ca55f6..d60989f87 100644
--- a/docs/guide/rust/grpc-support.md
+++ b/docs/guide/rust/grpc-support.md
@@ -300,7 +300,7 @@ reference metadata for a request or response type, Rust
gRPC generation rejects
that service. Use thread-safe reference shapes for gRPC payloads, or keep the
non-thread-safe type out of the RPC boundary.
-## Operations
+## gRPC Runtime Behavior
The generated service companion only supplies Fory serialization and tonic
bindings. Operational behavior remains standard tonic behavior:
diff --git a/docs/guide/scala/grpc-support.md b/docs/guide/scala/grpc-support.md
index 0b97929db..f89f47d68 100644
--- a/docs/guide/scala/grpc-support.md
+++ b/docs/guide/scala/grpc-support.md
@@ -371,7 +371,7 @@ Server-streaming, client-streaming, and bidirectional
server methods use
grpc-java `StreamObserver` APIs because streaming completion, request flow
control, cancellation, and backpressure follow grpc-java behavior.
-## Operations
+## gRPC Runtime Behavior
The generated service code only replaces request and response serialization.
All normal gRPC operational features still belong to grpc-java:
diff --git a/integration_tests/grpc_tests/generate_grpc.py
b/integration_tests/grpc_tests/generate_grpc.py
index 9c45e5925..e89622b5c 100644
--- a/integration_tests/grpc_tests/generate_grpc.py
+++ b/integration_tests/grpc_tests/generate_grpc.py
@@ -33,6 +33,7 @@ SCHEMAS = [
OUTPUTS = {
"java": TEST_DIR / "java/src/main/java/generated",
"python": TEST_DIR / "python/grpc_tests/generated",
+ "python_sync": TEST_DIR / "python/grpc_sync_tests/generated",
"go": TEST_DIR / "go/generated",
"rust": TEST_DIR / "rust/generated/src",
"csharp": TEST_DIR / "csharp/generated",
@@ -84,6 +85,19 @@ def main() -> int:
],
env=env,
)
+ subprocess.check_call(
+ [
+ sys.executable,
+ "-m",
+ "fory_compiler",
+ "compile",
+ str(schema),
+ f"--python_out={OUTPUTS['python_sync']}",
+ "--grpc",
+ "--grpc-python-mode=sync",
+ ],
+ env=env,
+ )
return 0
diff --git
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GoGrpcInteropTest.java
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GoGrpcTest.java
similarity index 96%
rename from
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GoGrpcInteropTest.java
rename to
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GoGrpcTest.java
index 44d9b9c03..8bcb3d5ef 100644
---
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GoGrpcInteropTest.java
+++
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GoGrpcTest.java
@@ -23,7 +23,7 @@ import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
-public class GoGrpcInteropTest extends GrpcTestBase {
+public class GoGrpcTest extends GrpcTestBase {
@Test
public void testJavaServerGoClient() throws Exception {
diff --git
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GrpcTestBase.java
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GrpcTestBase.java
index 8e9fe616e..cfec07151 100644
---
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GrpcTestBase.java
+++
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GrpcTestBase.java
@@ -311,11 +311,27 @@ public abstract class GrpcTestBase {
}
protected PeerCommand pythonCommand(String... args) {
+ Path pythonRoot =
+
repoRoot().resolve("integration_tests").resolve("grpc_tests").resolve("python");
+ return pythonCommand(
+ "grpc_tests.grpc_peer",
pythonRoot.resolve("grpc_tests").resolve("generated"), args);
+ }
+
+ protected PeerCommand pythonSyncCommand(String... args) {
+ Path pythonRoot =
+
repoRoot().resolve("integration_tests").resolve("grpc_tests").resolve("python");
+ return pythonCommand(
+ "grpc_sync_tests.grpc_peer",
+ pythonRoot.resolve("grpc_sync_tests").resolve("generated"),
+ args);
+ }
+
+ private PeerCommand pythonCommand(String moduleName, Path generatedRoot,
String... args) {
Path repoRoot = repoRoot();
Path grpcRoot =
repoRoot.resolve("integration_tests").resolve("grpc_tests");
Path pythonRoot = grpcRoot.resolve("python");
String pythonPath =
- pythonRoot.resolve("grpc_tests").resolve("generated")
+ generatedRoot
+ File.pathSeparator
+ pythonRoot
+ File.pathSeparator
@@ -327,7 +343,7 @@ public abstract class GrpcTestBase {
List<String> command = new ArrayList<>();
command.add("python");
command.add("-m");
- command.add("grpc_tests.grpc_interop");
+ command.add(moduleName);
command.addAll(Arrays.asList(args));
PeerCommand peerCommand = new PeerCommand();
peerCommand.command = command;
@@ -398,7 +414,17 @@ public abstract class GrpcTestBase {
}
protected void runPython(String peer, String... args) throws IOException,
InterruptedException {
- Process process = startPeer(pythonCommand(args));
+ runPythonPeer(peer, pythonCommand(args));
+ }
+
+ protected void runPythonSync(String peer, String... args)
+ throws IOException, InterruptedException {
+ runPythonPeer(peer, pythonSyncCommand(args));
+ }
+
+ private void runPythonPeer(String peer, PeerCommand command)
+ throws IOException, InterruptedException {
+ Process process = startPeer(command);
PeerOutputCollector outputCollector = new
PeerOutputCollector(process.getInputStream(), peer);
outputCollector.start();
boolean finished = process.waitFor(180, TimeUnit.SECONDS);
diff --git
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/KotlinGrpcInteropTest.java
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/KotlinGrpcTest.java
similarity index 96%
rename from
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/KotlinGrpcInteropTest.java
rename to
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/KotlinGrpcTest.java
index cc1d5e3a7..3a84e693c 100644
---
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/KotlinGrpcInteropTest.java
+++
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/KotlinGrpcTest.java
@@ -23,7 +23,7 @@ import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
-public class KotlinGrpcInteropTest extends GrpcTestBase {
+public class KotlinGrpcTest extends GrpcTestBase {
@Test
public void testJavaServerKotlinClient() throws Exception {
diff --git
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonGrpcInteropTest.java
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonAsyncGrpcTest.java
similarity index 86%
copy from
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonGrpcInteropTest.java
copy to
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonAsyncGrpcTest.java
index 676defdaa..b0e47838b 100644
---
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonGrpcInteropTest.java
+++
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonAsyncGrpcTest.java
@@ -23,13 +23,13 @@ import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
-public class PythonGrpcInteropTest extends GrpcTestBase {
+public class PythonAsyncGrpcTest extends GrpcTestBase {
@Test
public void testJavaServerPythonClient() throws Exception {
Server server = startJavaAllSchemasServer();
try {
- runPython("python-grpc-client", "client", "--target", "127.0.0.1:" +
server.getPort());
+ runPython("python-async-grpc-client", "client", "--target", "127.0.0.1:"
+ server.getPort());
} finally {
server.shutdownNow();
server.awaitTermination(10, TimeUnit.SECONDS);
@@ -39,9 +39,9 @@ public class PythonGrpcInteropTest extends GrpcTestBase {
@Test
public void testJavaClientPythonServer() throws Exception {
exercisePeerServer(
- "python-grpc",
+ "python-async-grpc",
"Python",
- "fory-grpc-python-",
+ "fory-grpc-python-async-",
pythonCommand("server"),
this::exerciseAllSchemas);
}
diff --git
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonGrpcInteropTest.java
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonSyncGrpcTest.java
similarity index 83%
rename from
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonGrpcInteropTest.java
rename to
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonSyncGrpcTest.java
index 676defdaa..3ea572c21 100644
---
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonGrpcInteropTest.java
+++
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/PythonSyncGrpcTest.java
@@ -23,13 +23,14 @@ import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
-public class PythonGrpcInteropTest extends GrpcTestBase {
+public class PythonSyncGrpcTest extends GrpcTestBase {
@Test
public void testJavaServerPythonClient() throws Exception {
Server server = startJavaAllSchemasServer();
try {
- runPython("python-grpc-client", "client", "--target", "127.0.0.1:" +
server.getPort());
+ runPythonSync(
+ "python-sync-grpc-client", "client", "--target", "127.0.0.1:" +
server.getPort());
} finally {
server.shutdownNow();
server.awaitTermination(10, TimeUnit.SECONDS);
@@ -39,10 +40,10 @@ public class PythonGrpcInteropTest extends GrpcTestBase {
@Test
public void testJavaClientPythonServer() throws Exception {
exercisePeerServer(
- "python-grpc",
+ "python-sync-grpc",
"Python",
- "fory-grpc-python-",
- pythonCommand("server"),
+ "fory-grpc-python-sync-",
+ pythonSyncCommand("server"),
this::exerciseAllSchemas);
}
}
diff --git
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/RustGrpcInteropTest.java
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/RustGrpcTest.java
similarity index 96%
rename from
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/RustGrpcInteropTest.java
rename to
integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/RustGrpcTest.java
index 6dae4a745..c764a5f9e 100644
---
a/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/RustGrpcInteropTest.java
+++
b/integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/RustGrpcTest.java
@@ -23,7 +23,7 @@ import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
-public class RustGrpcInteropTest extends GrpcTestBase {
+public class RustGrpcTest extends GrpcTestBase {
@Test
public void testJavaServerRustClient() throws Exception {
diff --git a/integration_tests/grpc_tests/python/pyproject.toml
b/integration_tests/grpc_tests/python/grpc_sync_tests/__init__.py
similarity index 61%
copy from integration_tests/grpc_tests/python/pyproject.toml
copy to integration_tests/grpc_tests/python/grpc_sync_tests/__init__.py
index 3a17cdf27..13a83393a 100644
--- a/integration_tests/grpc_tests/python/pyproject.toml
+++ b/integration_tests/grpc_tests/python/grpc_sync_tests/__init__.py
@@ -14,22 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-[build-system]
-requires = [
- "setuptools>=78.1.1; python_version >= '3.9'",
- "setuptools>=70.0.0,<78.1.1; python_version < '3.9'",
-]
-build-backend = "setuptools.build_meta"
-
-[project]
-name = "fory-grpc-tests"
-version = "1.3.0.dev0"
-description = "gRPC compiler integration tests for Apache Fory"
-requires-python = ">=3.8"
-license = {text = "Apache-2.0"}
-dependencies = ["grpcio>=1.62.2,<1.71", "pyfory"]
-
-[tool.setuptools.packages.find]
-where = ["."]
-include = ["grpc_tests"]
diff --git a/integration_tests/grpc_tests/python/grpc_sync_tests/grpc_peer.py
b/integration_tests/grpc_tests/python/grpc_sync_tests/grpc_peer.py
new file mode 100644
index 000000000..88e26ae2f
--- /dev/null
+++ b/integration_tests/grpc_tests/python/grpc_sync_tests/grpc_peer.py
@@ -0,0 +1,244 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import argparse
+from concurrent import futures
+from pathlib import Path
+from typing import Iterable, Sequence
+
+import grpc
+import grpc_fbs_grpc
+import grpc_fdl_grpc
+import grpc_pb_grpc
+from grpc_test_common import values
+
+
+class FdlService(grpc_fdl_grpc.FdlGrpcServiceServicer):
+ def unary_message(self, request, context):
+ return values.fdl_response(request, "unary", 10)
+
+ def server_stream_message(self, request, context):
+ for index in range(3):
+ yield values.fdl_response(request, f"server-{index}", index)
+
+ def client_stream_message(self, request_iterator, context):
+ return values.fdl_aggregate(list(request_iterator))
+
+ def bidi_stream_message(self, request_iterator, context):
+ for index, request in enumerate(request_iterator):
+ yield values.fdl_response(request, f"bidi-{index}", index)
+
+ def unary_union(self, request, context):
+ return values.fdl_union_response(
+ values.fdl_request_from_union(request), "unary", 10
+ )
+
+ def server_stream_union(self, request, context):
+ item = values.fdl_request_from_union(request)
+ for index in range(3):
+ yield values.fdl_union_response(item, f"server-{index}", index)
+
+ def client_stream_union(self, request_iterator, context):
+ requests = [values.fdl_request_from_union(item) for item in
request_iterator]
+ return values.fdl_union_aggregate(requests)
+
+ def bidi_stream_union(self, request_iterator, context):
+ for index, item in enumerate(request_iterator):
+ yield values.fdl_union_response(
+ values.fdl_request_from_union(item), f"bidi-{index}", index
+ )
+
+
+class FbsService(grpc_fbs_grpc.FbsGrpcServiceServicer):
+ def unary_message(self, request, context):
+ return values.fbs_response(request, "unary", 10)
+
+ def server_stream_message(self, request, context):
+ for index in range(3):
+ yield values.fbs_response(request, f"server-{index}", index)
+
+ def client_stream_message(self, request_iterator, context):
+ return values.fbs_aggregate(list(request_iterator))
+
+ def bidi_stream_message(self, request_iterator, context):
+ for index, request in enumerate(request_iterator):
+ yield values.fbs_response(request, f"bidi-{index}", index)
+
+ def unary_union(self, request, context):
+ return values.fbs_union_response(
+ values.fbs_request_from_union(request), "unary", 10
+ )
+
+ def server_stream_union(self, request, context):
+ item = values.fbs_request_from_union(request)
+ for index in range(3):
+ yield values.fbs_union_response(item, f"server-{index}", index)
+
+ def client_stream_union(self, request_iterator, context):
+ requests = [values.fbs_request_from_union(item) for item in
request_iterator]
+ return values.fbs_union_aggregate(requests)
+
+ def bidi_stream_union(self, request_iterator, context):
+ for index, item in enumerate(request_iterator):
+ yield values.fbs_union_response(
+ values.fbs_request_from_union(item), f"bidi-{index}", index
+ )
+
+
+class PbService(grpc_pb_grpc.PbGrpcServiceServicer):
+ def unary_message(self, request, context):
+ return values.pb_response(request, "unary", 10)
+
+ def server_stream_message(self, request, context):
+ for index in range(3):
+ yield values.pb_response(request, f"server-{index}", index)
+
+ def client_stream_message(self, request_iterator, context):
+ return values.pb_aggregate(list(request_iterator))
+
+ def bidi_stream_message(self, request_iterator, context):
+ for index, request in enumerate(request_iterator):
+ yield values.pb_response(request, f"bidi-{index}", index)
+
+
+def _assert_iterable_equal(
+ actual: Iterable[object], expected: Sequence[object]
+) -> None:
+ assert list(actual) == list(expected)
+
+
+def _exercise_message_stub(
+ stub,
+ requests: Sequence[object],
+ response_fn,
+ aggregate_fn,
+) -> None:
+ first = requests[0]
+ assert stub.unary_message(first) == response_fn(first, "unary", 10)
+ _assert_iterable_equal(
+ stub.server_stream_message(first),
+ [response_fn(first, f"server-{index}", index) for index in range(3)],
+ )
+ assert stub.client_stream_message(iter(requests)) == aggregate_fn(requests)
+ _assert_iterable_equal(
+ stub.bidi_stream_message(iter(requests)),
+ [
+ response_fn(request, f"bidi-{index}", index)
+ for index, request in enumerate(requests)
+ ],
+ )
+
+
+def _exercise_union_stub(
+ stub,
+ requests: Sequence[object],
+ union_request_fn,
+ union_response_fn,
+ union_aggregate_fn,
+) -> None:
+ union_requests = [union_request_fn(request) for request in requests]
+ first = union_requests[0]
+ first_request = requests[0]
+ assert stub.unary_union(first) == union_response_fn(first_request,
"unary", 10)
+ _assert_iterable_equal(
+ stub.server_stream_union(first),
+ [
+ union_response_fn(first_request, f"server-{index}", index)
+ for index in range(3)
+ ],
+ )
+ assert stub.client_stream_union(iter(union_requests)) ==
union_aggregate_fn(
+ requests
+ )
+ _assert_iterable_equal(
+ stub.bidi_stream_union(iter(union_requests)),
+ [
+ union_response_fn(request, f"bidi-{index}", index)
+ for index, request in enumerate(requests)
+ ],
+ )
+
+
+def run_client(target: str) -> None:
+ with grpc.insecure_channel(target) as channel:
+ _exercise_message_stub(
+ grpc_fdl_grpc.FdlGrpcServiceStub(channel),
+ values.fdl_message_requests(),
+ values.fdl_response,
+ values.fdl_aggregate,
+ )
+ _exercise_union_stub(
+ grpc_fdl_grpc.FdlGrpcServiceStub(channel),
+ values.fdl_union_requests(),
+ values.fdl_union_request,
+ values.fdl_union_response,
+ values.fdl_union_aggregate,
+ )
+
+ _exercise_message_stub(
+ grpc_fbs_grpc.FbsGrpcServiceStub(channel),
+ values.fbs_message_requests(),
+ values.fbs_response,
+ values.fbs_aggregate,
+ )
+ _exercise_union_stub(
+ grpc_fbs_grpc.FbsGrpcServiceStub(channel),
+ values.fbs_union_requests(),
+ values.fbs_union_request,
+ values.fbs_union_response,
+ values.fbs_union_aggregate,
+ )
+
+ _exercise_message_stub(
+ grpc_pb_grpc.PbGrpcServiceStub(channel),
+ values.pb_requests(),
+ values.pb_response,
+ values.pb_aggregate,
+ )
+
+
+def run_server(port_file: Path) -> None:
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
+ grpc_fdl_grpc.add_servicer(FdlService(), server)
+ grpc_fbs_grpc.add_servicer(FbsService(), server)
+ grpc_pb_grpc.add_servicer(PbService(), server)
+ port = server.add_insecure_port("127.0.0.1:0")
+ server.start()
+ port_file.write_text(str(port))
+ server.wait_for_termination()
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Java/Python Fory gRPC sync
peer")
+ subparsers = parser.add_subparsers(dest="command", required=True)
+ client_parser = subparsers.add_parser("client")
+ client_parser.add_argument("--target", required=True)
+ server_parser = subparsers.add_parser("server")
+ server_parser.add_argument("--port-file", type=Path, required=True)
+ args = parser.parse_args()
+
+ if args.command == "client":
+ run_client(args.target)
+ else:
+ run_server(args.port_file)
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/integration_tests/grpc_tests/python/pyproject.toml
b/integration_tests/grpc_tests/python/grpc_test_common/__init__.py
similarity index 61%
copy from integration_tests/grpc_tests/python/pyproject.toml
copy to integration_tests/grpc_tests/python/grpc_test_common/__init__.py
index 3a17cdf27..13a83393a 100644
--- a/integration_tests/grpc_tests/python/pyproject.toml
+++ b/integration_tests/grpc_tests/python/grpc_test_common/__init__.py
@@ -14,22 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-[build-system]
-requires = [
- "setuptools>=78.1.1; python_version >= '3.9'",
- "setuptools>=70.0.0,<78.1.1; python_version < '3.9'",
-]
-build-backend = "setuptools.build_meta"
-
-[project]
-name = "fory-grpc-tests"
-version = "1.3.0.dev0"
-description = "gRPC compiler integration tests for Apache Fory"
-requires-python = ">=3.8"
-license = {text = "Apache-2.0"}
-dependencies = ["grpcio>=1.62.2,<1.71", "pyfory"]
-
-[tool.setuptools.packages.find]
-where = ["."]
-include = ["grpc_tests"]
diff --git a/integration_tests/grpc_tests/python/grpc_test_common/values.py
b/integration_tests/grpc_tests/python/grpc_test_common/values.py
new file mode 100644
index 000000000..5d38c7a7d
--- /dev/null
+++ b/integration_tests/grpc_tests/python/grpc_test_common/values.py
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import List, Optional, Sequence
+
+import grpc_fbs
+import grpc_fdl
+import grpc_pb
+
+
+def fdl_request(id_value: str, count: int, body: str) ->
grpc_fdl.GrpcFdlRequest:
+ return grpc_fdl.GrpcFdlRequest(id=id_value, count=count, payload=body)
+
+
+def fdl_response(
+ request: grpc_fdl.GrpcFdlRequest, tag: str, offset: int
+) -> grpc_fdl.GrpcFdlResponse:
+ return grpc_fdl.GrpcFdlResponse(
+ id=f"{tag}:{request.id}",
+ count=request.count + offset,
+ payload=f"{tag}:{request.payload}",
+ )
+
+
+def fdl_aggregate(
+ requests: Sequence[grpc_fdl.GrpcFdlRequest],
+) -> grpc_fdl.GrpcFdlResponse:
+ return grpc_fdl.GrpcFdlResponse(
+ id="client:" + "+".join(request.id for request in requests),
+ count=sum(request.count for request in requests),
+ payload="client:" + "+".join(request.payload for request in requests),
+ )
+
+
+def fdl_union_request(request: grpc_fdl.GrpcFdlRequest) ->
grpc_fdl.GrpcFdlUnion:
+ return grpc_fdl.GrpcFdlUnion.request(request)
+
+
+def fdl_union_response(
+ request: grpc_fdl.GrpcFdlRequest, tag: str, offset: int
+) -> grpc_fdl.GrpcFdlUnion:
+ return grpc_fdl.GrpcFdlUnion.response(fdl_response(request, tag, offset))
+
+
+def fdl_union_aggregate(
+ requests: Sequence[grpc_fdl.GrpcFdlRequest],
+) -> grpc_fdl.GrpcFdlUnion:
+ return grpc_fdl.GrpcFdlUnion.response(fdl_aggregate(requests))
+
+
+def fdl_request_from_union(union: grpc_fdl.GrpcFdlUnion) ->
grpc_fdl.GrpcFdlRequest:
+ assert union.is_request()
+ return union.request_value()
+
+
+def fdl_message_requests() -> List[grpc_fdl.GrpcFdlRequest]:
+ return [
+ fdl_request("fdl-a", 1, "alpha"),
+ fdl_request("fdl-b", 2, "beta"),
+ ]
+
+
+def fdl_union_requests() -> List[grpc_fdl.GrpcFdlRequest]:
+ return [
+ fdl_request("fdl-u-a", 3, "union-alpha"),
+ fdl_request("fdl-u-b", 4, "union-beta"),
+ ]
+
+
+def fbs_request(id_value: str, count: int, body: str) ->
grpc_fbs.GrpcFbsRequest:
+ return grpc_fbs.GrpcFbsRequest(id=id_value, count=count, payload=body)
+
+
+def fbs_response(
+ request: grpc_fbs.GrpcFbsRequest, tag: str, offset: int
+) -> grpc_fbs.GrpcFbsResponse:
+ return grpc_fbs.GrpcFbsResponse(
+ id=f"{tag}:{request.id}",
+ count=request.count + offset,
+ payload=f"{tag}:{request.payload}",
+ )
+
+
+def fbs_aggregate(
+ requests: Sequence[grpc_fbs.GrpcFbsRequest],
+) -> grpc_fbs.GrpcFbsResponse:
+ return grpc_fbs.GrpcFbsResponse(
+ id="client:" + "+".join(request.id for request in requests),
+ count=sum(request.count for request in requests),
+ payload="client:" + "+".join(request.payload for request in requests),
+ )
+
+
+def fbs_union_request(request: grpc_fbs.GrpcFbsRequest) ->
grpc_fbs.GrpcFbsUnion:
+ return grpc_fbs.GrpcFbsUnion.grpc_fbs_request(request)
+
+
+def fbs_union_response(
+ request: grpc_fbs.GrpcFbsRequest, tag: str, offset: int
+) -> grpc_fbs.GrpcFbsUnion:
+ return grpc_fbs.GrpcFbsUnion.grpc_fbs_response(fbs_response(request, tag,
offset))
+
+
+def fbs_union_aggregate(
+ requests: Sequence[grpc_fbs.GrpcFbsRequest],
+) -> grpc_fbs.GrpcFbsUnion:
+ return grpc_fbs.GrpcFbsUnion.grpc_fbs_response(fbs_aggregate(requests))
+
+
+def fbs_request_from_union(union: grpc_fbs.GrpcFbsUnion) ->
grpc_fbs.GrpcFbsRequest:
+ assert union.is_grpc_fbs_request()
+ return union.grpc_fbs_request_value()
+
+
+def fbs_message_requests() -> List[grpc_fbs.GrpcFbsRequest]:
+ return [
+ fbs_request("fbs-a", 5, "alpha"),
+ fbs_request("fbs-b", 6, "beta"),
+ ]
+
+
+def fbs_union_requests() -> List[grpc_fbs.GrpcFbsRequest]:
+ return [
+ fbs_request("fbs-u-a", 7, "union-alpha"),
+ fbs_request("fbs-u-b", 8, "union-beta"),
+ ]
+
+
+def pb_payload_text(value: str) -> grpc_pb.GrpcPbRequest.Payload:
+ return grpc_pb.GrpcPbRequest.Payload.text(value)
+
+
+def pb_response_payload(
+ payload: Optional[grpc_pb.GrpcPbRequest.Payload],
+ tag: str,
+ offset: int,
+) -> Optional[grpc_pb.GrpcPbResponse.Payload]:
+ if payload is None:
+ return None
+ if payload.is_text():
+ return
grpc_pb.GrpcPbResponse.Payload.text(f"{tag}:{payload.text_value()}")
+ assert payload.is_number()
+ return grpc_pb.GrpcPbResponse.Payload.number(payload.number_value() +
offset)
+
+
+def pb_request(
+ id_value: str, count: int, payload: grpc_pb.GrpcPbRequest.Payload
+) -> grpc_pb.GrpcPbRequest:
+ return grpc_pb.GrpcPbRequest(id=id_value, count=count, payload=payload)
+
+
+def pb_response(
+ request: grpc_pb.GrpcPbRequest, tag: str, offset: int
+) -> grpc_pb.GrpcPbResponse:
+ return grpc_pb.GrpcPbResponse(
+ id=f"{tag}:{request.id}",
+ count=request.count + offset,
+ payload=pb_response_payload(request.payload, tag, offset),
+ )
+
+
+def pb_aggregate(
+ requests: Sequence[grpc_pb.GrpcPbRequest],
+) -> grpc_pb.GrpcPbResponse:
+ return grpc_pb.GrpcPbResponse(
+ id="client:" + "+".join(request.id for request in requests),
+ count=sum(request.count for request in requests),
+ payload=grpc_pb.GrpcPbResponse.Payload.text(
+ "client:" + "+".join(request.id for request in requests)
+ ),
+ )
+
+
+def pb_requests() -> List[grpc_pb.GrpcPbRequest]:
+ return [
+ pb_request("pb-a", 9, pb_payload_text("alpha")),
+ pb_request("pb-b", 10, grpc_pb.GrpcPbRequest.Payload.number(42)),
+ ]
diff --git a/integration_tests/grpc_tests/python/grpc_tests/grpc_interop.py
b/integration_tests/grpc_tests/python/grpc_tests/grpc_interop.py
deleted file mode 100644
index e8a3b4694..000000000
--- a/integration_tests/grpc_tests/python/grpc_tests/grpc_interop.py
+++ /dev/null
@@ -1,400 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-import argparse
-from concurrent import futures
-from pathlib import Path
-from typing import Iterable, Optional, Sequence
-
-import grpc
-import grpc_fbs
-import grpc_fbs_grpc
-import grpc_fdl
-import grpc_fdl_grpc
-import grpc_pb
-import grpc_pb_grpc
-
-
-def _fdl_request(id_value: str, count: int, payload: str) ->
grpc_fdl.GrpcFdlRequest:
- return grpc_fdl.GrpcFdlRequest(id=id_value, count=count, payload=payload)
-
-
-def _fdl_response(
- request: grpc_fdl.GrpcFdlRequest, tag: str, offset: int
-) -> grpc_fdl.GrpcFdlResponse:
- return grpc_fdl.GrpcFdlResponse(
- id=f"{tag}:{request.id}",
- count=request.count + offset,
- payload=f"{tag}:{request.payload}",
- )
-
-
-def _fdl_aggregate(
- requests: Sequence[grpc_fdl.GrpcFdlRequest],
-) -> grpc_fdl.GrpcFdlResponse:
- return grpc_fdl.GrpcFdlResponse(
- id="client:" + "+".join(request.id for request in requests),
- count=sum(request.count for request in requests),
- payload="client:" + "+".join(request.payload for request in requests),
- )
-
-
-def _fdl_union_request(
- request: grpc_fdl.GrpcFdlRequest,
-) -> grpc_fdl.GrpcFdlUnion:
- return grpc_fdl.GrpcFdlUnion.request(request)
-
-
-def _fdl_union_response(
- request: grpc_fdl.GrpcFdlRequest, tag: str, offset: int
-) -> grpc_fdl.GrpcFdlUnion:
- return grpc_fdl.GrpcFdlUnion.response(_fdl_response(request, tag, offset))
-
-
-def _fdl_union_aggregate(
- requests: Sequence[grpc_fdl.GrpcFdlRequest],
-) -> grpc_fdl.GrpcFdlUnion:
- return grpc_fdl.GrpcFdlUnion.response(_fdl_aggregate(requests))
-
-
-def _fdl_request_from_union(
- union: grpc_fdl.GrpcFdlUnion,
-) -> grpc_fdl.GrpcFdlRequest:
- assert union.is_request()
- return union.request_value()
-
-
-def _fbs_request(id_value: str, count: int, payload: str) ->
grpc_fbs.GrpcFbsRequest:
- return grpc_fbs.GrpcFbsRequest(id=id_value, count=count, payload=payload)
-
-
-def _fbs_response(
- request: grpc_fbs.GrpcFbsRequest, tag: str, offset: int
-) -> grpc_fbs.GrpcFbsResponse:
- return grpc_fbs.GrpcFbsResponse(
- id=f"{tag}:{request.id}",
- count=request.count + offset,
- payload=f"{tag}:{request.payload}",
- )
-
-
-def _fbs_aggregate(
- requests: Sequence[grpc_fbs.GrpcFbsRequest],
-) -> grpc_fbs.GrpcFbsResponse:
- return grpc_fbs.GrpcFbsResponse(
- id="client:" + "+".join(request.id for request in requests),
- count=sum(request.count for request in requests),
- payload="client:" + "+".join(request.payload for request in requests),
- )
-
-
-def _fbs_union_request(
- request: grpc_fbs.GrpcFbsRequest,
-) -> grpc_fbs.GrpcFbsUnion:
- return grpc_fbs.GrpcFbsUnion.grpc_fbs_request(request)
-
-
-def _fbs_union_response(
- request: grpc_fbs.GrpcFbsRequest, tag: str, offset: int
-) -> grpc_fbs.GrpcFbsUnion:
- return grpc_fbs.GrpcFbsUnion.grpc_fbs_response(_fbs_response(request, tag,
offset))
-
-
-def _fbs_union_aggregate(
- requests: Sequence[grpc_fbs.GrpcFbsRequest],
-) -> grpc_fbs.GrpcFbsUnion:
- return grpc_fbs.GrpcFbsUnion.grpc_fbs_response(_fbs_aggregate(requests))
-
-
-def _fbs_request_from_union(
- union: grpc_fbs.GrpcFbsUnion,
-) -> grpc_fbs.GrpcFbsRequest:
- assert union.is_grpc_fbs_request()
- return union.grpc_fbs_request_value()
-
-
-def _pb_payload_text(value: str) -> grpc_pb.GrpcPbRequest.Payload:
- return grpc_pb.GrpcPbRequest.Payload.text(value)
-
-
-def _pb_response_payload(
- payload: Optional[grpc_pb.GrpcPbRequest.Payload],
- tag: str,
- offset: int,
-) -> Optional[grpc_pb.GrpcPbResponse.Payload]:
- if payload is None:
- return None
- if payload.is_text():
- return
grpc_pb.GrpcPbResponse.Payload.text(f"{tag}:{payload.text_value()}")
- assert payload.is_number()
- return grpc_pb.GrpcPbResponse.Payload.number(payload.number_value() +
offset)
-
-
-def _pb_request(
- id_value: str, count: int, payload: grpc_pb.GrpcPbRequest.Payload
-) -> grpc_pb.GrpcPbRequest:
- return grpc_pb.GrpcPbRequest(id=id_value, count=count, payload=payload)
-
-
-def _pb_response(
- request: grpc_pb.GrpcPbRequest, tag: str, offset: int
-) -> grpc_pb.GrpcPbResponse:
- return grpc_pb.GrpcPbResponse(
- id=f"{tag}:{request.id}",
- count=request.count + offset,
- payload=_pb_response_payload(request.payload, tag, offset),
- )
-
-
-def _pb_aggregate(
- requests: Sequence[grpc_pb.GrpcPbRequest],
-) -> grpc_pb.GrpcPbResponse:
- return grpc_pb.GrpcPbResponse(
- id="client:" + "+".join(request.id for request in requests),
- count=sum(request.count for request in requests),
- payload=grpc_pb.GrpcPbResponse.Payload.text(
- "client:" + "+".join(request.id for request in requests)
- ),
- )
-
-
-class FdlService(grpc_fdl_grpc.FdlGrpcServiceServicer):
- def unary_message(self, request, context):
- return _fdl_response(request, "unary", 10)
-
- def server_stream_message(self, request, context):
- for index in range(3):
- yield _fdl_response(request, f"server-{index}", index)
-
- def client_stream_message(self, request_iterator, context):
- return _fdl_aggregate(list(request_iterator))
-
- def bidi_stream_message(self, request_iterator, context):
- for index, request in enumerate(request_iterator):
- yield _fdl_response(request, f"bidi-{index}", index)
-
- def unary_union(self, request, context):
- return _fdl_union_response(_fdl_request_from_union(request), "unary",
10)
-
- def server_stream_union(self, request, context):
- item = _fdl_request_from_union(request)
- for index in range(3):
- yield _fdl_union_response(item, f"server-{index}", index)
-
- def client_stream_union(self, request_iterator, context):
- requests = [_fdl_request_from_union(item) for item in request_iterator]
- return _fdl_union_aggregate(requests)
-
- def bidi_stream_union(self, request_iterator, context):
- for index, item in enumerate(request_iterator):
- yield _fdl_union_response(
- _fdl_request_from_union(item), f"bidi-{index}", index
- )
-
-
-class FbsService(grpc_fbs_grpc.FbsGrpcServiceServicer):
- def unary_message(self, request, context):
- return _fbs_response(request, "unary", 10)
-
- def server_stream_message(self, request, context):
- for index in range(3):
- yield _fbs_response(request, f"server-{index}", index)
-
- def client_stream_message(self, request_iterator, context):
- return _fbs_aggregate(list(request_iterator))
-
- def bidi_stream_message(self, request_iterator, context):
- for index, request in enumerate(request_iterator):
- yield _fbs_response(request, f"bidi-{index}", index)
-
- def unary_union(self, request, context):
- return _fbs_union_response(_fbs_request_from_union(request), "unary",
10)
-
- def server_stream_union(self, request, context):
- item = _fbs_request_from_union(request)
- for index in range(3):
- yield _fbs_union_response(item, f"server-{index}", index)
-
- def client_stream_union(self, request_iterator, context):
- requests = [_fbs_request_from_union(item) for item in request_iterator]
- return _fbs_union_aggregate(requests)
-
- def bidi_stream_union(self, request_iterator, context):
- for index, item in enumerate(request_iterator):
- yield _fbs_union_response(
- _fbs_request_from_union(item), f"bidi-{index}", index
- )
-
-
-class PbService(grpc_pb_grpc.PbGrpcServiceServicer):
- def unary_message(self, request, context):
- return _pb_response(request, "unary", 10)
-
- def server_stream_message(self, request, context):
- for index in range(3):
- yield _pb_response(request, f"server-{index}", index)
-
- def client_stream_message(self, request_iterator, context):
- return _pb_aggregate(list(request_iterator))
-
- def bidi_stream_message(self, request_iterator, context):
- for index, request in enumerate(request_iterator):
- yield _pb_response(request, f"bidi-{index}", index)
-
-
-def _assert_iterable_equal(
- actual: Iterable[object], expected: Sequence[object]
-) -> None:
- assert list(actual) == list(expected)
-
-
-def _exercise_message_stub(
- stub,
- requests: Sequence[object],
- response_fn,
- aggregate_fn,
-) -> None:
- first = requests[0]
- assert stub.unary_message(first) == response_fn(first, "unary", 10)
- _assert_iterable_equal(
- stub.server_stream_message(first),
- [response_fn(first, f"server-{index}", index) for index in range(3)],
- )
- assert stub.client_stream_message(iter(requests)) == aggregate_fn(requests)
- _assert_iterable_equal(
- stub.bidi_stream_message(iter(requests)),
- [
- response_fn(request, f"bidi-{index}", index)
- for index, request in enumerate(requests)
- ],
- )
-
-
-def _exercise_union_stub(
- stub,
- requests: Sequence[object],
- union_request_fn,
- union_response_fn,
- union_aggregate_fn,
-) -> None:
- union_requests = [union_request_fn(request) for request in requests]
- first = union_requests[0]
- first_request = requests[0]
- assert stub.unary_union(first) == union_response_fn(first_request,
"unary", 10)
- _assert_iterable_equal(
- stub.server_stream_union(first),
- [
- union_response_fn(first_request, f"server-{index}", index)
- for index in range(3)
- ],
- )
- assert stub.client_stream_union(iter(union_requests)) ==
union_aggregate_fn(
- requests
- )
- _assert_iterable_equal(
- stub.bidi_stream_union(iter(union_requests)),
- [
- union_response_fn(request, f"bidi-{index}", index)
- for index, request in enumerate(requests)
- ],
- )
-
-
-def run_client(target: str) -> None:
- with grpc.insecure_channel(target) as channel:
- _exercise_message_stub(
- grpc_fdl_grpc.FdlGrpcServiceStub(channel),
- [
- _fdl_request("fdl-a", 1, "alpha"),
- _fdl_request("fdl-b", 2, "beta"),
- ],
- _fdl_response,
- _fdl_aggregate,
- )
- _exercise_union_stub(
- grpc_fdl_grpc.FdlGrpcServiceStub(channel),
- [
- _fdl_request("fdl-u-a", 3, "union-alpha"),
- _fdl_request("fdl-u-b", 4, "union-beta"),
- ],
- _fdl_union_request,
- _fdl_union_response,
- _fdl_union_aggregate,
- )
-
- _exercise_message_stub(
- grpc_fbs_grpc.FbsGrpcServiceStub(channel),
- [
- _fbs_request("fbs-a", 5, "alpha"),
- _fbs_request("fbs-b", 6, "beta"),
- ],
- _fbs_response,
- _fbs_aggregate,
- )
- _exercise_union_stub(
- grpc_fbs_grpc.FbsGrpcServiceStub(channel),
- [
- _fbs_request("fbs-u-a", 7, "union-alpha"),
- _fbs_request("fbs-u-b", 8, "union-beta"),
- ],
- _fbs_union_request,
- _fbs_union_response,
- _fbs_union_aggregate,
- )
-
- _exercise_message_stub(
- grpc_pb_grpc.PbGrpcServiceStub(channel),
- [
- _pb_request("pb-a", 9, _pb_payload_text("alpha")),
- _pb_request("pb-b", 10,
grpc_pb.GrpcPbRequest.Payload.number(42)),
- ],
- _pb_response,
- _pb_aggregate,
- )
-
-
-def run_server(port_file: Path) -> None:
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
- grpc_fdl_grpc.add_servicer(FdlService(), server)
- grpc_fbs_grpc.add_servicer(FbsService(), server)
- grpc_pb_grpc.add_servicer(PbService(), server)
- port = server.add_insecure_port("127.0.0.1:0")
- server.start()
- port_file.write_text(str(port))
- server.wait_for_termination()
-
-
-def main() -> int:
- parser = argparse.ArgumentParser(description="Java/Python Fory gRPC
interop peer")
- subparsers = parser.add_subparsers(dest="command", required=True)
- client_parser = subparsers.add_parser("client")
- client_parser.add_argument("--target", required=True)
- server_parser = subparsers.add_parser("server")
- server_parser.add_argument("--port-file", type=Path, required=True)
- args = parser.parse_args()
-
- if args.command == "client":
- run_client(args.target)
- else:
- run_server(args.port_file)
- return 0
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/integration_tests/grpc_tests/python/grpc_tests/grpc_peer.py
b/integration_tests/grpc_tests/python/grpc_tests/grpc_peer.py
new file mode 100644
index 000000000..d2d24cb4d
--- /dev/null
+++ b/integration_tests/grpc_tests/python/grpc_tests/grpc_peer.py
@@ -0,0 +1,268 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import argparse
+import asyncio
+from pathlib import Path
+from typing import AsyncIterator, List, Sequence, TypeVar
+
+import grpc.aio
+import grpc_fbs_grpc
+import grpc_fdl_grpc
+import grpc_pb_grpc
+from grpc_test_common import values
+
+T = TypeVar("T")
+
+
+async def _async_iter(items: Sequence[T]) -> AsyncIterator[T]:
+ for item in items:
+ yield item
+
+
+async def _collect(call) -> List[object]:
+ result = []
+ async for item in call:
+ result.append(item)
+ return result
+
+
+class FdlService(grpc_fdl_grpc.FdlGrpcServiceServicer):
+ async def unary_message(self, request, context):
+ return values.fdl_response(request, "unary", 10)
+
+ async def server_stream_message(self, request, context):
+ for index in range(3):
+ yield values.fdl_response(request, f"server-{index}", index)
+
+ async def client_stream_message(self, request_iterator, context):
+ requests = []
+ async for request in request_iterator:
+ requests.append(request)
+ return values.fdl_aggregate(requests)
+
+ async def bidi_stream_message(self, request_iterator, context):
+ index = 0
+ async for request in request_iterator:
+ yield values.fdl_response(request, f"bidi-{index}", index)
+ index += 1
+
+ async def unary_union(self, request, context):
+ return values.fdl_union_response(
+ values.fdl_request_from_union(request), "unary", 10
+ )
+
+ async def server_stream_union(self, request, context):
+ item = values.fdl_request_from_union(request)
+ for index in range(3):
+ yield values.fdl_union_response(item, f"server-{index}", index)
+
+ async def client_stream_union(self, request_iterator, context):
+ requests = []
+ async for item in request_iterator:
+ requests.append(values.fdl_request_from_union(item))
+ return values.fdl_union_aggregate(requests)
+
+ async def bidi_stream_union(self, request_iterator, context):
+ index = 0
+ async for item in request_iterator:
+ yield values.fdl_union_response(
+ values.fdl_request_from_union(item), f"bidi-{index}", index
+ )
+ index += 1
+
+
+class FbsService(grpc_fbs_grpc.FbsGrpcServiceServicer):
+ async def unary_message(self, request, context):
+ return values.fbs_response(request, "unary", 10)
+
+ async def server_stream_message(self, request, context):
+ for index in range(3):
+ yield values.fbs_response(request, f"server-{index}", index)
+
+ async def client_stream_message(self, request_iterator, context):
+ requests = []
+ async for request in request_iterator:
+ requests.append(request)
+ return values.fbs_aggregate(requests)
+
+ async def bidi_stream_message(self, request_iterator, context):
+ index = 0
+ async for request in request_iterator:
+ yield values.fbs_response(request, f"bidi-{index}", index)
+ index += 1
+
+ async def unary_union(self, request, context):
+ return values.fbs_union_response(
+ values.fbs_request_from_union(request), "unary", 10
+ )
+
+ async def server_stream_union(self, request, context):
+ item = values.fbs_request_from_union(request)
+ for index in range(3):
+ yield values.fbs_union_response(item, f"server-{index}", index)
+
+ async def client_stream_union(self, request_iterator, context):
+ requests = []
+ async for item in request_iterator:
+ requests.append(values.fbs_request_from_union(item))
+ return values.fbs_union_aggregate(requests)
+
+ async def bidi_stream_union(self, request_iterator, context):
+ index = 0
+ async for item in request_iterator:
+ yield values.fbs_union_response(
+ values.fbs_request_from_union(item), f"bidi-{index}", index
+ )
+ index += 1
+
+
+class PbService(grpc_pb_grpc.PbGrpcServiceServicer):
+ async def unary_message(self, request, context):
+ return values.pb_response(request, "unary", 10)
+
+ async def server_stream_message(self, request, context):
+ for index in range(3):
+ yield values.pb_response(request, f"server-{index}", index)
+
+ async def client_stream_message(self, request_iterator, context):
+ requests = []
+ async for request in request_iterator:
+ requests.append(request)
+ return values.pb_aggregate(requests)
+
+ async def bidi_stream_message(self, request_iterator, context):
+ index = 0
+ async for request in request_iterator:
+ yield values.pb_response(request, f"bidi-{index}", index)
+ index += 1
+
+
+async def _exercise_message_stub(
+ stub,
+ requests: Sequence[object],
+ response_fn,
+ aggregate_fn,
+) -> None:
+ first = requests[0]
+ assert await stub.unary_message(first) == response_fn(first, "unary", 10)
+ assert await _collect(stub.server_stream_message(first)) == [
+ response_fn(first, f"server-{index}", index) for index in range(3)
+ ]
+ assert await stub.client_stream_message(_async_iter(requests)) ==
aggregate_fn(
+ requests
+ )
+ assert await _collect(stub.bidi_stream_message(_async_iter(requests))) == [
+ response_fn(request, f"bidi-{index}", index)
+ for index, request in enumerate(requests)
+ ]
+
+
+async def _exercise_union_stub(
+ stub,
+ requests: Sequence[object],
+ union_request_fn,
+ union_response_fn,
+ union_aggregate_fn,
+) -> None:
+ union_requests = [union_request_fn(request) for request in requests]
+ first = union_requests[0]
+ first_request = requests[0]
+ assert await stub.unary_union(first) == union_response_fn(
+ first_request, "unary", 10
+ )
+ assert await _collect(stub.server_stream_union(first)) == [
+ union_response_fn(first_request, f"server-{index}", index) for index
in range(3)
+ ]
+ assert await stub.client_stream_union(
+ _async_iter(union_requests)
+ ) == union_aggregate_fn(requests)
+ assert await _collect(stub.bidi_stream_union(_async_iter(union_requests)))
== [
+ union_response_fn(request, f"bidi-{index}", index)
+ for index, request in enumerate(requests)
+ ]
+
+
+async def run_client(target: str) -> None:
+ async with grpc.aio.insecure_channel(target) as channel:
+ await _exercise_message_stub(
+ grpc_fdl_grpc.FdlGrpcServiceStub(channel),
+ values.fdl_message_requests(),
+ values.fdl_response,
+ values.fdl_aggregate,
+ )
+ await _exercise_union_stub(
+ grpc_fdl_grpc.FdlGrpcServiceStub(channel),
+ values.fdl_union_requests(),
+ values.fdl_union_request,
+ values.fdl_union_response,
+ values.fdl_union_aggregate,
+ )
+
+ await _exercise_message_stub(
+ grpc_fbs_grpc.FbsGrpcServiceStub(channel),
+ values.fbs_message_requests(),
+ values.fbs_response,
+ values.fbs_aggregate,
+ )
+ await _exercise_union_stub(
+ grpc_fbs_grpc.FbsGrpcServiceStub(channel),
+ values.fbs_union_requests(),
+ values.fbs_union_request,
+ values.fbs_union_response,
+ values.fbs_union_aggregate,
+ )
+
+ await _exercise_message_stub(
+ grpc_pb_grpc.PbGrpcServiceStub(channel),
+ values.pb_requests(),
+ values.pb_response,
+ values.pb_aggregate,
+ )
+
+
+async def run_server(port_file: Path) -> None:
+ server = grpc.aio.server()
+ grpc_fdl_grpc.add_servicer(FdlService(), server)
+ grpc_fbs_grpc.add_servicer(FbsService(), server)
+ grpc_pb_grpc.add_servicer(PbService(), server)
+ port = server.add_insecure_port("127.0.0.1:0")
+ await server.start()
+ port_file.write_text(str(port))
+ await server.wait_for_termination()
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Java/Python Fory gRPC async
peer")
+ subparsers = parser.add_subparsers(dest="command", required=True)
+ client_parser = subparsers.add_parser("client")
+ client_parser.add_argument("--target", required=True)
+ server_parser = subparsers.add_parser("server")
+ server_parser.add_argument("--port-file", type=Path, required=True)
+ args = parser.parse_args()
+
+ if args.command == "client":
+ asyncio.run(run_client(args.target))
+ else:
+ asyncio.run(run_server(args.port_file))
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/integration_tests/grpc_tests/python/pyproject.toml
b/integration_tests/grpc_tests/python/pyproject.toml
index 3a17cdf27..de8858ea9 100644
--- a/integration_tests/grpc_tests/python/pyproject.toml
+++ b/integration_tests/grpc_tests/python/pyproject.toml
@@ -32,4 +32,4 @@ dependencies = ["grpcio>=1.62.2,<1.71", "pyfory"]
[tool.setuptools.packages.find]
where = ["."]
-include = ["grpc_tests"]
+include = ["grpc_test_common", "grpc_sync_tests", "grpc_tests"]
diff --git a/integration_tests/grpc_tests/run_tests.sh
b/integration_tests/grpc_tests/run_tests.sh
index c1e532e94..34fdcc59b 100755
--- a/integration_tests/grpc_tests/run_tests.sh
+++ b/integration_tests/grpc_tests/run_tests.sh
@@ -21,7 +21,7 @@ set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd "${SCRIPT_DIR}/../.." && pwd)"
-TEST_CLASSES="${1:-PythonGrpcInteropTest,RustGrpcInteropTest,GoGrpcInteropTest,KotlinGrpcInteropTest}"
+TEST_CLASSES="${1:-PythonAsyncGrpcTest,PythonSyncGrpcTest,RustGrpcTest,GoGrpcTest,KotlinGrpcTest}"
python -m pip install "grpcio>=1.62.2,<1.71"
python -m pip install -v -e "${ROOT_DIR}/python"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]