This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bc97568 api(rust): change api signature (#487)
0bc97568 is described below

commit 0bc97568082051f7c0f96f59f7b12a415ce642f7
Author: SSpirits <[email protected]>
AuthorDate: Fri Apr 21 12:10:23 2023 +0800

    api(rust): change api signature (#487)
    
    * api(rust): change api signature
    
    * chore(rust): rename SendResult to SendReceipt
---
 rust/Cargo.toml                  |  2 +-
 rust/examples/producer.rs        |  2 +-
 rust/examples/simple_consumer.rs | 51 +++++++++++++++++++++++-----------------
 rust/src/client.rs               | 21 +++++++++++------
 rust/src/error.rs                | 38 ++++++++++++++++++++++++++----
 rust/src/lib.rs                  |  6 ++---
 rust/src/model/common.rs         | 26 ++++++++++++++++++++
 rust/src/producer.rs             | 20 +++++++---------
 rust/src/session.rs              | 27 ++++++++++++---------
 rust/src/simple_consumer.rs      | 22 +++++++++++++----
 10 files changed, 150 insertions(+), 65 deletions(-)

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 689ab0e5..fc84e0d8 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -26,7 +26,7 @@ authors = [
 
 license = "MIT/Apache-2.0"
 readme = "./README.md"
-repository = "https://github.com/apache/rocketmq-clients";
+repository = "https://github.com/apache/rocketmq-clients/tree/master/rust";
 documentation = "https://docs.rs/rocketmq";
 description = "Rust client for Apache RocketMQ"
 keywords = ["rocketmq", "api", "client", "sdk", "grpc"]
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index a7ad0ab7..acacabb8 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -46,6 +46,6 @@ async fn main() {
     debug_assert!(result.is_ok(), "send message failed: {:?}", result);
     println!(
         "send message success, message_id={}",
-        result.unwrap().message_id
+        result.unwrap().message_id()
     );
 }
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index 4011556d..33aae9d4 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -29,33 +29,42 @@ async fn main() {
     // set which rocketmq proxy to connect
     let mut client_option = ClientOption::default();
     client_option.set_access_url("localhost:8081");
+    client_option.set_enable_tls(false);
 
     // build and start simple consumer
     let consumer = SimpleConsumer::new(consumer_option, 
client_option).unwrap();
     consumer.start().await.unwrap();
 
-    // pop message from rocketmq proxy
-    let receive_result = consumer
-        .receive(
-            "test_topic".to_string(),
-            &FilterExpression::new(FilterType::Tag, "test_tag"),
-        )
-        .await;
-    debug_assert!(
-        receive_result.is_ok(),
-        "receive message failed: {:?}",
-        receive_result.unwrap_err()
-    );
-
-    let messages = receive_result.unwrap();
-    for message in messages {
-        println!("receive message: {:?}", message);
-        // ack message to rocketmq proxy
-        let ack_result = consumer.ack(message).await;
+    loop {
+        // pop message from rocketmq proxy
+        let receive_result = consumer
+            .receive(
+                "test_topic".to_string(),
+                &FilterExpression::new(FilterType::Tag, "test_tag"),
+            )
+            .await;
         debug_assert!(
-            ack_result.is_ok(),
-            "ack message failed: {:?}",
-            ack_result.unwrap_err()
+            receive_result.is_ok(),
+            "receive message failed: {:?}",
+            receive_result.unwrap_err()
         );
+
+        let messages = receive_result.unwrap();
+
+        if messages.is_empty() {
+            println!("no message received");
+            return;
+        }
+
+        for message in messages {
+            println!("receive message: {:?}", message);
+            // ack message to rocketmq proxy
+            let ack_result = consumer.ack(&message).await;
+            debug_assert!(
+                ack_result.is_ok(),
+                "ack message failed: {:?}",
+                ack_result.unwrap_err()
+            );
+        }
     }
 }
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 11e4419a..99d410f2 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -28,14 +28,14 @@ use tokio::sync::oneshot;
 
 use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{ClientType, Endpoints, Route, RouteStatus};
+use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, 
SendReceipt};
 use crate::model::message::AckMessageEntry;
 use crate::pb;
 use crate::pb::receive_message_response::Content;
 use crate::pb::{
     AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, 
HeartbeatRequest,
     HeartbeatResponse, Message, MessageQueue, QueryRouteRequest, 
ReceiveMessageRequest, Resource,
-    SendMessageRequest, SendResultEntry, Status, TelemetryCommand,
+    SendMessageRequest, Status, TelemetryCommand,
 };
 #[double]
 use crate::session::SessionManager;
@@ -207,7 +207,7 @@ impl Client {
     pub(crate) fn topic_route_from_cache(&self, topic: &str) -> 
Option<Arc<Route>> {
         self.route_table.lock().get(topic).and_then(|route_status| {
             if let RouteStatus::Found(route) = route_status {
-                debug!(self.logger, "get route for topic={} from cache", 
topic);
+                // debug!(self.logger, "get route for topic={} from cache", 
topic);
                 Some(Arc::clone(route))
             } else {
                 None
@@ -359,7 +359,7 @@ impl Client {
         &self,
         endpoints: &Endpoints,
         messages: Vec<Message>,
-    ) -> Result<Vec<SendResultEntry>, ClientError> {
+    ) -> Result<Vec<SendReceipt>, ClientError> {
         self.send_message_inner(
             self.get_session_with_endpoints(endpoints).await.unwrap(),
             messages,
@@ -371,7 +371,7 @@ impl Client {
         &self,
         mut rpc_client: T,
         messages: Vec<Message>,
-    ) -> Result<Vec<SendResultEntry>, ClientError> {
+    ) -> Result<Vec<SendReceipt>, ClientError> {
         let message_count = messages.len();
         let request = SendMessageRequest { messages };
         let response = rpc_client.send_message(request).await?;
@@ -381,7 +381,11 @@ impl Client {
             error!(self.logger, "server do not return illegal send result, 
this may be a bug. except result count: {}, found: {}", response.entries.len(), 
message_count);
         }
 
-        Ok(response.entries)
+        Ok(response
+            .entries
+            .iter()
+            .map(SendReceipt::from_pb_send_result)
+            .collect())
     }
 
     #[allow(dead_code)]
@@ -431,6 +435,9 @@ impl Client {
         for response in responses {
             match response.content.unwrap() {
                 Content::Status(status) => {
+                    if status.code() == Code::MessageNotFound {
+                        return Ok(vec![]);
+                    }
                     Self::handle_response_status(Some(status), 
OPERATION_RECEIVE_MESSAGE)?;
                 }
                 Content::Message(message) => {
@@ -445,7 +452,7 @@ impl Client {
     #[allow(dead_code)]
     pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
         &self,
-        ack_entry: T,
+        ack_entry: &T,
     ) -> Result<AckMessageResultEntry, ClientError> {
         let result = self
             .ack_message_inner(
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 9eb3d519..59d1eeb4 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -14,10 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+//! Error data model of RocketMQ rust client.
+
 use std::error::Error;
 use std::fmt;
 use std::fmt::{Debug, Display, Formatter};
 
+/// Error type using by [`ClientError`].
 #[derive(thiserror::Error, Debug, PartialEq, Eq)]
 pub enum ErrorKind {
     #[error("Failed to parse config")]
@@ -51,6 +55,7 @@ pub enum ErrorKind {
     Unknown,
 }
 
+/// Error returned by producer or consumer.
 pub struct ClientError {
     pub(crate) kind: ErrorKind,
     pub(crate) message: String,
@@ -62,7 +67,7 @@ pub struct ClientError {
 impl Error for ClientError {}
 
 impl ClientError {
-    pub fn new(kind: ErrorKind, message: &str, operation: &'static str) -> 
Self {
+    pub(crate) fn new(kind: ErrorKind, message: &str, operation: &'static str) 
-> Self {
         Self {
             kind,
             message: message.to_string(),
@@ -72,7 +77,32 @@ impl ClientError {
         }
     }
 
-    pub fn with_operation(mut self, operation: &'static str) -> Self {
+    /// Error type
+    pub fn kind(&self) -> &ErrorKind {
+        &self.kind
+    }
+
+    /// Error message
+    pub fn message(&self) -> &str {
+        &self.message
+    }
+
+    /// Name of operation that produced this error
+    pub fn operation(&self) -> &str {
+        self.operation
+    }
+
+    /// Error context, formatted in key-value pairs
+    pub fn context(&self) -> &Vec<(&'static str, String)> {
+        &self.context
+    }
+
+    /// Source error
+    pub fn source(&self) -> Option<&anyhow::Error> {
+        self.source.as_ref()
+    }
+
+    pub(crate) fn with_operation(mut self, operation: &'static str) -> Self {
         if !self.operation.is_empty() {
             self.context.push(("called", self.operation.to_string()));
         }
@@ -81,12 +111,12 @@ impl ClientError {
         self
     }
 
-    pub fn with_context(mut self, key: &'static str, value: impl Into<String>) 
-> Self {
+    pub(crate) fn with_context(mut self, key: &'static str, value: impl 
Into<String>) -> Self {
         self.context.push((key, value.into()));
         self
     }
 
-    pub fn set_source(mut self, src: impl Into<anyhow::Error>) -> Self {
+    pub(crate) fn set_source(mut self, src: impl Into<anyhow::Error>) -> Self {
         debug_assert!(self.source.is_none(), "the source error has been set");
 
         self.source = Some(src.into());
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 1a13a5a9..fcac3202 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -64,7 +64,7 @@
 //!     debug_assert!(result.is_ok(), "send message failed: {:?}", result);
 //!     println!(
 //!         "send message success, message_id={}",
-//!         result.unwrap().message_id
+//!         result.unwrap().message_id()
 //!     );
 //! }
 //! ```
@@ -108,7 +108,7 @@
 //!     for message in messages {
 //!         println!("receive message: {:?}", message);
 //!         // ack message to rocketmq proxy
-//!         let ack_result = consumer.ack(message).await;
+//!         let ack_result = consumer.ack(&message).await;
 //!         debug_assert!(
 //!             ack_result.is_ok(),
 //!             "ack message failed: {:?}",
@@ -121,7 +121,7 @@
 
 #[allow(dead_code)]
 pub mod conf;
-mod error;
+pub mod error;
 #[allow(dead_code)]
 mod log;
 
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 4023b959..deaa1235 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -228,6 +228,32 @@ impl FilterExpression {
     }
 }
 
+/// Send result returned by producer.
+#[derive(Clone, Debug)]
+pub struct SendReceipt {
+    pub(crate) message_id: String,
+    pub(crate) transaction_id: String,
+}
+
+impl SendReceipt {
+    pub(crate) fn from_pb_send_result(entry: &pb::SendResultEntry) -> Self {
+        SendReceipt {
+            message_id: entry.message_id.clone(),
+            transaction_id: entry.transaction_id.clone(),
+        }
+    }
+
+    /// Get message id
+    pub fn message_id(&self) -> &str {
+        &self.message_id
+    }
+
+    /// Get transaction id
+    pub fn transaction_id(&self) -> &str {
+        &self.transaction_id
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::error::ErrorKind;
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 697b70e3..8de941c8 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -25,9 +25,9 @@ use slog::{info, Logger};
 use crate::client::Client;
 use crate::conf::{ClientOption, ProducerOption};
 use crate::error::{ClientError, ErrorKind};
-use crate::model::common::ClientType;
+use crate::model::common::{ClientType, SendReceipt};
 use crate::model::message;
-use crate::pb::{Encoding, Resource, SendResultEntry, SystemProperties};
+use crate::pb::{Encoding, Resource, SystemProperties};
 use crate::util::{
     build_endpoints_by_message_queue, build_producer_settings, 
select_message_queue,
     select_message_queue_by_message_group, HOST_NAME,
@@ -185,7 +185,7 @@ impl Producer {
     pub async fn send_one(
         &self,
         message: impl message::Message,
-    ) -> Result<SendResultEntry, ClientError> {
+    ) -> Result<SendReceipt, ClientError> {
         let results = self.send(vec![message]).await?;
         Ok(results[0].clone())
     }
@@ -198,7 +198,7 @@ impl Producer {
     pub async fn send(
         &self,
         messages: Vec<impl message::Message>,
-    ) -> Result<Vec<SendResultEntry>, ClientError> {
+    ) -> Result<Vec<SendReceipt>, ClientError> {
         let (topic, message_group, mut pb_messages) =
             self.transform_messages_to_protobuf(messages)?;
 
@@ -222,12 +222,13 @@ impl Producer {
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use crate::error::ErrorKind;
     use crate::log::terminal_logger;
     use crate::model::common::Route;
     use crate::model::message::{MessageBuilder, MessageImpl};
-    use crate::pb::{Broker, Code, MessageQueue, Status};
-    use std::sync::Arc;
+    use crate::pb::{Broker, MessageQueue};
 
     use super::*;
 
@@ -389,14 +390,9 @@ mod tests {
             }))
         });
         producer.client.expect_send_message().returning(|_, _| {
-            Ok(vec![SendResultEntry {
-                status: Some(Status {
-                    code: Code::Ok as i32,
-                    message: "".to_string(),
-                }),
+            Ok(vec![SendReceipt {
                 message_id: "".to_string(),
                 transaction_id: "".to_string(),
-                offset: 0,
             }])
         });
         producer
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 6ef9b97d..0660931f 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -22,7 +22,7 @@ use slog::{debug, error, info, o, Logger};
 use tokio::sync::{mpsc, Mutex};
 use tokio_stream::wrappers::ReceiverStream;
 use tokio_stream::StreamExt;
-use tonic::metadata::AsciiMetadataValue;
+use tonic::metadata::{AsciiMetadataValue, MetadataMap};
 use tonic::transport::{Channel, Endpoint};
 
 use crate::conf::ClientOption;
@@ -193,8 +193,15 @@ impl Session {
         self.endpoints.endpoint_url()
     }
 
-    fn sign<T>(&self, mut request: tonic::Request<T>) -> tonic::Request<T> {
+    fn sign<T>(&self, message: T) -> tonic::Request<T> {
+        let mut request = tonic::Request::new(message);
         let metadata = request.metadata_mut();
+        self.sign_without_timeout(metadata);
+        request.set_timeout(*self.option.timeout());
+        request
+    }
+
+    fn sign_without_timeout(&self, metadata: &mut MetadataMap) {
         let _ = AsciiMetadataValue::try_from(&self.client_id)
             .map(|v| metadata.insert("x-mq-client-id", v));
 
@@ -210,9 +217,6 @@ impl Session {
             "x-mq-protocol-version",
             AsciiMetadataValue::from_static(PROTOCOL_VERSION),
         );
-
-        request.set_timeout(*self.option.timeout());
-        request
     }
 
     pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> 
Result<(), ClientError> {
@@ -226,7 +230,8 @@ impl Session {
             .set_source(e)
         })?;
 
-        let request = self.sign(tonic::Request::new(ReceiverStream::new(rx)));
+        let mut request = tonic::Request::new(ReceiverStream::new(rx));
+        self.sign_without_timeout(request.metadata_mut());
         let response = self.stub.telemetry(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
@@ -299,7 +304,7 @@ impl RPCClient for Session {
         &mut self,
         request: QueryRouteRequest,
     ) -> Result<QueryRouteResponse, ClientError> {
-        let request = self.sign(tonic::Request::new(request));
+        let request = self.sign(request);
         let response = self.stub.query_route(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
@@ -315,7 +320,7 @@ impl RPCClient for Session {
         &mut self,
         request: HeartbeatRequest,
     ) -> Result<HeartbeatResponse, ClientError> {
-        let request = self.sign(tonic::Request::new(request));
+        let request = self.sign(request);
         let response = self.stub.heartbeat(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
@@ -331,7 +336,7 @@ impl RPCClient for Session {
         &mut self,
         request: SendMessageRequest,
     ) -> Result<SendMessageResponse, ClientError> {
-        let request = self.sign(tonic::Request::new(request));
+        let request = self.sign(request);
         let response = self.stub.send_message(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
@@ -348,7 +353,7 @@ impl RPCClient for Session {
         request: ReceiveMessageRequest,
     ) -> Result<Vec<ReceiveMessageResponse>, ClientError> {
         let batch_size = request.batch_size;
-        let mut request = self.sign(tonic::Request::new(request));
+        let mut request = self.sign(request);
         request.set_timeout(*self.option.long_polling_timeout());
         let mut stream = self
             .stub
@@ -383,7 +388,7 @@ impl RPCClient for Session {
         &mut self,
         request: AckMessageRequest,
     ) -> Result<AckMessageResponse, ClientError> {
-        let request = self.sign(tonic::Request::new(request));
+        let request = self.sign(request);
         let response = self.stub.ack_message(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index eefa93b1..cf74e820 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -48,6 +48,7 @@ pub struct SimpleConsumer {
 }
 
 impl SimpleConsumer {
+    const OPERATION_NEW_SIMPLE_CONSUMER: &'static str = "simple_consumer.new";
     const OPERATION_START_SIMPLE_CONSUMER: &'static str = 
"simple_consumer.start";
     const OPERATION_RECEIVE_MESSAGE: &'static str = 
"simple_consumer.receive_message";
 
@@ -56,6 +57,14 @@ impl SimpleConsumer {
         option: SimpleConsumerOption,
         client_option: ClientOption,
     ) -> Result<Self, ClientError> {
+        if option.consumer_group().is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::Config,
+                "required option is missing: consumer group is empty",
+                Self::OPERATION_NEW_SIMPLE_CONSUMER,
+            ));
+        }
+
         let client_option = ClientOption {
             client_type: ClientType::SimpleConsumer,
             group: option.consumer_group().to_string(),
@@ -120,12 +129,12 @@ impl SimpleConsumer {
     /// * `invisible_duration` - set the invisible duration of messages that 
return from the server, these messages will not be visible to other consumers 
unless timeout
     pub async fn receive_with_batch_size(
         &self,
-        topic: &str,
+        topic: impl AsRef<str>,
         expression: &FilterExpression,
         batch_size: i32,
         invisible_duration: Duration,
     ) -> Result<Vec<MessageView>, ClientError> {
-        let route = self.client.topic_route(topic, true).await?;
+        let route = self.client.topic_route(topic.as_ref(), true).await?;
         let message_queue = select_message_queue(route);
         let endpoints =
             build_endpoints_by_message_queue(&message_queue, 
Self::OPERATION_RECEIVE_MESSAGE)?;
@@ -155,7 +164,10 @@ impl SimpleConsumer {
     /// # Arguments
     ///
     /// * `ack_entry` - special message view with handle want to ack
-    pub async fn ack(&self, ack_entry: impl AckMessageEntry + 'static) -> 
Result<(), ClientError> {
+    pub async fn ack(
+        &self,
+        ack_entry: &(impl AckMessageEntry + 'static),
+    ) -> Result<(), ClientError> {
         self.client.ack_message(ack_entry).await?;
         Ok(())
     }
@@ -238,7 +250,7 @@ mod tests {
         });
         client
             .expect_ack_message()
-            .returning(|_: MessageView| Ok(AckMessageResultEntry::default()));
+            .returning(|_: &MessageView| Ok(AckMessageResultEntry::default()));
         let simple_consumer = SimpleConsumer {
             option: SimpleConsumerOption::default(),
             logger: terminal_logger(),
@@ -253,7 +265,7 @@ mod tests {
             .await?;
         assert_eq!(messages.len(), 1);
         simple_consumer
-            .ack(messages.into_iter().next().unwrap())
+            .ack(&messages.into_iter().next().unwrap())
             .await?;
         Ok(())
     }

Reply via email to