BaldDemian opened a new pull request, #3738:
URL: https://github.com/apache/fory/pull/3738
## Why?
Support Rust gRPC code generation.
## What does this PR do?
**overview**:
- support Rust unary/streaming gRPC code generation
- add Rust&Java interop tests
Note: in local environment such as Ubuntu, run the test using virtual
environment:
```shell
cd integration_tests/grpc_tests
python3 -m venv .venv
source .venv/bin/activate
./run_tests.sh
```
**details**:
- `compiler/fory_compiler/generators/rust.py`: validate that there is no
`thread_safe=false` ref usage in gRPC payload types since tonic requires all
payload types to be thread safe.
- `compiler/fory_compiler/generators/services/rust.py`: for the given IDL,
generate tonic-compatiable client and server stub code, i.e. `service.rs`,
`service_grpc.rs`.
**Note**: considering that codec logic is relatively small, codec code is
also generated as a separate `mod` inside `service_grpc.rs`, instead of
defining a new crate under `rust/`.
<details>
<summary>Example</summary>
For this simple service definition:
```
package hello;
message HelloRequest {
string name = 1;
}
message HelloResponse {
string greeting = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
```
The generated `service.rs` (ignore the copyright header):
```rust
#[::tonic::async_trait]
pub trait Greeter: ::std::marker::Send + ::std::marker::Sync + 'static {
async fn say_hello(
&self,
request: ::tonic::Request<crate::hello::HelloRequest>,
) -> ::std::result::Result<
::tonic::Response<crate::hello::HelloResponse>,
::tonic::Status,
>;
}
pub const GREETER_SERVICE_NAME: &str = "hello.Greeter";
pub const GREETER_SAY_HELLO_PATH: &str = "/hello.Greeter/SayHello";
```
The generated `service_grpc.rs` (also ignore the copyright header):
```rust
pub mod codec {
pub trait ForyGrpcPayload: Sized + ::std::marker::Send + 'static {
fn encode_fory_payload(
&self,
) -> ::std::result::Result<::std::vec::Vec<u8>, ::fory::Error>;
fn decode_fory_payload(
payload: &[u8],
) -> ::std::result::Result<Self, ::fory::Error>;
}
#[derive(Debug, Clone)]
pub struct ForyCodec<Encode, Decode> {
marker: ::std::marker::PhantomData<(Encode, Decode)>,
}
impl<Encode, Decode> ForyCodec<Encode, Decode> {
pub fn new() -> Self {
Self {
marker: ::std::marker::PhantomData,
}
}
}
impl<Encode, Decode> ::std::default::Default for ForyCodec<Encode,
Decode> {
fn default() -> Self {
Self::new()
}
}
impl<Encode, Decode> ::tonic::codec::Codec for ForyCodec<Encode,
Decode>
where
Encode: ForyGrpcPayload,
Decode: ForyGrpcPayload,
{
type Encode = Encode;
type Decode = Decode;
type Encoder = ForyEncoder<Encode>;
type Decoder = ForyDecoder<Decode>;
fn encoder(&mut self) -> Self::Encoder {
ForyEncoder::default()
}
fn decoder(&mut self) -> Self::Decoder {
ForyDecoder::default()
}
}
#[derive(Debug, Clone)]
pub struct ForyEncoder<T> {
marker: ::std::marker::PhantomData<T>,
}
impl<T> ::std::default::Default for ForyEncoder<T> {
fn default() -> Self {
Self {
marker: ::std::marker::PhantomData,
}
}
}
#[derive(Debug, Clone)]
pub struct ForyDecoder<T> {
marker: ::std::marker::PhantomData<T>,
}
impl<T> ::std::default::Default for ForyDecoder<T> {
fn default() -> Self {
Self {
marker: ::std::marker::PhantomData,
}
}
}
fn fory_error_to_tonic_status(error: ::fory::Error) -> ::tonic::Status
{
::tonic::Status::internal(error.to_string())
}
impl<T> ::tonic::codec::Encoder for ForyEncoder<T>
where
T: ForyGrpcPayload,
{
type Item = T;
type Error = ::tonic::Status;
fn encode(
&mut self,
item: Self::Item,
dst: &mut ::tonic::codec::EncodeBuf<'_>,
) -> ::std::result::Result<(), Self::Error> {
let bytes =
item.encode_fory_payload().map_err(fory_error_to_tonic_status)?;
::tonic::codec::EncodeBuf::reserve(dst, bytes.len());
::bytes::BufMut::put_slice(dst, &bytes);
Ok(())
}
}
impl<T> ::tonic::codec::Decoder for ForyDecoder<T>
where
T: ForyGrpcPayload,
{
type Item = T;
type Error = ::tonic::Status;
fn decode(
&mut self,
src: &mut ::tonic::codec::DecodeBuf<'_>,
) -> ::std::result::Result<::std::option::Option<Self::Item>,
Self::Error> {
let len = ::bytes::Buf::remaining(src);
if len == 0 {
return Ok(None);
}
let chunk = ::bytes::Buf::chunk(src);
if chunk.len() == len {
let result =
T::decode_fory_payload(chunk).map_err(fory_error_to_tonic_status);
::bytes::Buf::advance(src, len);
return result.map(Some);
}
let payload = ::bytes::Buf::copy_to_bytes(src, len);
T::decode_fory_payload(&payload)
.map(Some)
.map_err(fory_error_to_tonic_status)
}
}
}
impl codec::ForyGrpcPayload for crate::hello::HelloRequest {
fn encode_fory_payload(&self) ->
::std::result::Result<::std::vec::Vec<u8>, ::fory::Error> {
self.to_bytes()
}
fn decode_fory_payload(payload: &[u8]) -> ::std::result::Result<Self,
::fory::Error> {
Self::from_bytes(payload)
}
}
impl codec::ForyGrpcPayload for crate::hello::HelloResponse {
fn encode_fory_payload(&self) ->
::std::result::Result<::std::vec::Vec<u8>, ::fory::Error> {
self.to_bytes()
}
fn decode_fory_payload(payload: &[u8]) -> ::std::result::Result<Self,
::fory::Error> {
Self::from_bytes(payload)
}
}
pub mod greeter_client {
#[derive(Debug, Clone)]
pub struct GreeterClient<T> {
inner: ::tonic::client::Grpc<T>,
}
impl GreeterClient<::tonic::transport::Channel> {
pub async fn connect<D>(
dst: D,
) -> ::std::result::Result<Self, ::tonic::transport::Error>
where
D: ::std::convert::TryInto<::tonic::transport::Endpoint>,
D::Error: Into<::tonic::codegen::StdError>,
{
let conn =
::tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> GreeterClient<T>
where
T: ::tonic::client::GrpcService<::tonic::body::Body>,
T::Error: Into<::tonic::codegen::StdError>,
T::ResponseBody: ::tonic::codegen::Body<Data =
::tonic::codegen::Bytes>
+ ::std::marker::Send
+ 'static,
<T::ResponseBody as ::tonic::codegen::Body>::Error:
Into<::tonic::codegen::StdError> + ::std::marker::Send,
{
pub fn new(inner: T) -> Self {
let inner = ::tonic::client::Grpc::new(inner);
Self { inner }
}
pub async fn say_hello(
&mut self,
request: impl ::tonic::IntoRequest<crate::hello::HelloRequest>,
) -> ::std::result::Result<
::tonic::Response<crate::hello::HelloResponse>,
::tonic::Status,
> {
self.inner.ready().await.map_err(|e| {
::tonic::Status::unknown(format!("Service was not ready:
{}", e.into()))
})?;
let codec = super::codec::ForyCodec::<
crate::hello::HelloRequest,
crate::hello::HelloResponse,
>::default();
let path =
::tonic::codegen::http::uri::PathAndQuery::from_static(
crate::service::GREETER_SAY_HELLO_PATH,
);
let mut req = request.into_request();
req.extensions_mut().insert(::tonic::codegen::GrpcMethod::new(
crate::service::GREETER_SERVICE_NAME,
"SayHello",
));
self.inner.unary(req, path, codec).await
}
}
}
pub mod greeter_server {
#[derive(Debug)]
pub struct GreeterServer<T> {
inner: ::std::sync::Arc<T>,
}
impl<T> GreeterServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(::std::sync::Arc::new(inner))
}
pub fn from_arc(inner: ::std::sync::Arc<T>) -> Self {
Self { inner }
}
}
impl<T, B>
::tonic::codegen::Service<::tonic::codegen::http::Request<B>> for
GreeterServer<T>
where
T: crate::service::Greeter,
B: ::tonic::codegen::Body + ::std::marker::Send + 'static,
B::Error: Into<::tonic::codegen::StdError> + ::std::marker::Send +
'static,
{
type Response =
::tonic::codegen::http::Response<::tonic::body::Body>;
type Error = ::std::convert::Infallible;
type Future = ::tonic::codegen::BoxFuture<Self::Response,
Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut ::std::task::Context<'_>,
) -> ::std::task::Poll<::std::result::Result<(), Self::Error>> {
::std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: ::tonic::codegen::http::Request<B>) ->
Self::Future {
match req.uri().path() {
crate::service::GREETER_SAY_HELLO_PATH => {
struct SayHelloSvc<T: crate::service::Greeter>(pub
::std::sync::Arc<T>);
impl<T: crate::service::Greeter>
::tonic::server::UnaryService<crate::hello::HelloRequest>
for SayHelloSvc<T>
{
type Response = crate::hello::HelloResponse;
type Future = ::tonic::codegen::BoxFuture<
::tonic::Response<Self::Response>,
::tonic::Status,
>;
fn call(
&mut self,
request:
::tonic::Request<crate::hello::HelloRequest>,
) -> Self::Future {
let inner = ::std::sync::Arc::clone(&self.0);
let fut = async move {
<T as
crate::service::Greeter>::say_hello(&inner, request).await
};
::std::boxed::Box::pin(fut)
}
}
let inner = self.inner.clone();
let fut = async move {
let method = SayHelloSvc(inner);
let codec = super::codec::ForyCodec::<
crate::hello::HelloResponse,
crate::hello::HelloRequest,
>::default();
let mut grpc = ::tonic::server::Grpc::new(codec);
let res = grpc.unary(method, req).await;
Ok(res)
};
::std::boxed::Box::pin(fut)
}
_ => ::std::boxed::Box::pin(async move {
let mut response =
::tonic::codegen::http::Response::new(
::tonic::body::Body::default(),
);
let headers = response.headers_mut();
headers.insert(
::tonic::Status::GRPC_STATUS,
(::tonic::Code::Unimplemented as i32).into(),
);
headers.insert(
::tonic::codegen::http::header::CONTENT_TYPE,
::tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
}),
}
}
}
impl<T> ::std::clone::Clone for GreeterServer<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
pub const SERVICE_NAME: &str = crate::service::GREETER_SERVICE_NAME;
impl<T> ::tonic::server::NamedService for GreeterServer<T> {
const NAME: &'static str = SERVICE_NAME;
}
}
```
</details>
-
`integration_tests/grpc_tests/java/src/test/java/org/apache/fory/grpc_tests/GrpcInteropTest.java`:
add interop testing logic for Java and Rust; shared logic for Rust and Python
is extracted as functions to prevent redundancy.
- `integration_tests/grpc_tests/rust/interop`: implement gRPC handlers and
assertions so that Rust&Java interop testing can work.
- `integration_tests/grpc_tests/rust/pb`,
`integration_tests/grpc_tests/rust/fdl`,
`integration_tests/grpc_tests/rust/fbs`: placeholder crates to hold the
generated gRPC code.
**next:**
- add documentation when above implementation is stable.
- performance (mainly deserializaion) benchmarking and tuning.
## Related issues
https://github.com/apache/fory/issues/3275
## AI Contribution Checklist
<!-- Full requirements and disclosure template:
https://github.com/apache/fory/blob/main/AI_POLICY.md#9-contributor-checklist-for-ai-assisted-prs
-->
- [ ] Substantial AI assistance was used in this PR: `yes` / `no`
- [ ] If `yes`, I included a completed [AI Contribution
Checklist](https://github.com/apache/fory/blob/main/AI_POLICY.md#9-contributor-checklist-for-ai-assisted-prs)
in this PR description and the required `AI Usage Disclosure`.
- [ ] If `yes`, my PR description includes the required `ai_review` summary
and screenshot evidence of the final clean AI review results from both fresh
reviewers on the current PR diff or current HEAD after the latest code changes.
<!-- If substantial AI assistance = `yes`, paste the completed checklist and
disclosure block here, including the final ai_review summary and screenshot
evidence from both fresh reviewers on the current PR diff or current HEAD after
the latest code changes. -->
## Does this PR introduce any user-facing change?
N/A.
## Benchmark
N/A.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]