This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f54b8b77 feat: add some basic message model(id,view,message) (#462)
f54b8b77 is described below
commit f54b8b779d380263828d28ddb56782e489004b3a
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Wed Apr 5 09:57:16 2023 +0800
feat: add some basic message model(id,view,message) (#462)
* feat: add some basic message model(id,view,message)
* feat: add some basic message model(id,view,message)
---
rust/Cargo.toml | 7 ++
rust/src/lib.rs | 2 +
rust/src/models/message.rs | 47 ++++++++++
rust/src/models/message_id.rs | 140 ++++++++++++++++++++++++++++
rust/src/{lib.rs => models/message_view.rs} | 28 ++----
rust/src/{lib.rs => models/mod.rs} | 22 +----
6 files changed, 208 insertions(+), 38 deletions(-)
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 1850b95d..d4154951 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -42,6 +42,13 @@ slog-json = "2.6.1"
opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["metrics",
"grpc-tonic"] }
+byteorder = "1"
+mac_address = "1.1.4"
+hex = "0.4.3"
+time = "0.3.19"
+once_cell = "1.9.0"
+tokio-stream="0.1.12"
+
minitrace = "0.4.1"
mockall = "0.11.4"
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 35d0d907..17b9bd0d 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -33,3 +33,5 @@ pub(crate) mod producer;
// Export structs that are part of crate API.
pub use producer::Producer;
+
+pub mod models;
diff --git a/rust/src/models/message.rs b/rust/src/models/message.rs
new file mode 100644
index 00000000..f8366aa1
--- /dev/null
+++ b/rust/src/models/message.rs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+ collections::HashMap,
+ io::Write,
+ mem, process,
+ sync::Arc,
+ sync::{atomic::AtomicUsize, Weak},
+};
+pub(crate) struct MessageImpl {
+ pub(crate) keys: Vec<String>,
+ pub(crate) body: Vec<u8>,
+ pub(crate) topic: String,
+ pub(crate) tags: String,
+ pub(crate) message_group: String,
+ pub(crate) delivery_timestamp: i64,
+ pub(crate) properties: HashMap<String, String>,
+}
+
+impl MessageImpl {
+ pub fn new(topic: &str, tags: &str, keys: Vec<String>, body: &str) -> Self
{
+ MessageImpl {
+ keys: keys,
+ body: body.as_bytes().to_vec(),
+ topic: topic.to_string(),
+ tags: tags.to_string(),
+ message_group: "".to_string(),
+ delivery_timestamp: 0,
+ properties: HashMap::new(),
+ }
+ }
+}
diff --git a/rust/src/models/message_id.rs b/rust/src/models/message_id.rs
new file mode 100644
index 00000000..92d4e446
--- /dev/null
+++ b/rust/src/models/message_id.rs
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use byteorder::{BigEndian, WriteBytesExt};
+use once_cell::sync::Lazy;
+use parking_lot::Mutex;
+use std::io::Write;
+use std::process;
+use std::time::SystemTime;
+use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};
+
+/**
+ * The codec for the message-id.
+ *
+ * <p>Codec here provides the following two functions:
+ * 1. Provide decoding function of message-id of all versions above v0.
+ * 2. Provide a generator of message-id of v1 version.
+ *
+ * <p>The message-id of versions above V1 consists of 17 bytes in total. The
first two bytes represent the version
+ * number. For V1, these two bytes are 0x0001.
+ *
+ * <h3>V1 message id example</h3>
+ *
+ * <pre>
+ * ┌──┬────────────┬────┬────────┬────────┐
+ * │01│56F7E71C361B│21BC│024CCDBE│00000000│
+ * └──┴────────────┴────┴────────┴────────┘
+ * </pre>
+ *
+ * <h3>V1 version message id generation rules</h3>
+ *
+ * <pre>
+ * process id(lower 2bytes)
+ * ▲
+ * mac address(lower 6bytes) │ sequence number(big endian)
+ * ▲ │ ▲ (4bytes)
+ * │ │ │
+ * ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
+ * 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
+ * └───────────┘ └─┘ └─┬─┘ └───┘
+ * │
+ * ▼
+ * seconds since 2021-01-01 00:00:00(UTC+0)
+ * (lower 4bytes)
+ * </pre>
+ */
+
+// inspired by https://github.com/messense/rocketmq-rs
+pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> =
Lazy::new(|| {
+ let mut wtr = Vec::new();
+ wtr.write_u8(1).unwrap();
+ //mac
+ let x = mac_address::get_mac_address().unwrap();
+ let ma = match x {
+ Some(ma) => ma,
+ None => {
+ panic!("mac address is none")
+ }
+ };
+ wtr.write_all(&ma.bytes()).unwrap();
+ //processid
+ wtr.write_u16::<byteorder::BigEndian>(process::id() as u16)
+ .unwrap();
+ let generator = UniqueIdGenerator {
+ counter: 0,
+ start_timestamp: 0,
+ next_timestamp: 0,
+ prefix: hex::encode_upper(wtr),
+ };
+ Mutex::new(generator)
+});
+
+pub struct UniqueIdGenerator {
+ counter: i16,
+ prefix: String,
+ start_timestamp: i64,
+ next_timestamp: i64,
+}
+
+impl UniqueIdGenerator {
+ pub fn generate(&mut self) -> String {
+ if SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_secs() as i64
+ > self.next_timestamp
+ {
+ // update timestamp
+ let now = OffsetDateTime::now_utc();
+ let year = now.year();
+ let month = now.month();
+ self.start_timestamp = PrimitiveDateTime::new(
+ Date::from_calendar_date(year, month, 1).unwrap(),
+ Time::from_hms(0, 0, 0).unwrap(),
+ )
+ .assume_offset(now.offset())
+ .unix_timestamp();
+ self.next_timestamp = (PrimitiveDateTime::new(
+ Date::from_calendar_date(year, month, 1).unwrap(),
+ Time::from_hms(0, 0, 0).unwrap(),
+ )
+ .assume_offset(now.offset())
+ + time::Duration::days(30))
+ .unix_timestamp();
+ }
+ self.counter = self.counter.wrapping_add(1);
+ let mut buf = Vec::new();
+ buf.write_i32::<BigEndian>(
+ ((OffsetDateTime::now_utc().unix_timestamp() -
self.start_timestamp) * 1000) as i32,
+ )
+ .unwrap();
+ buf.write_i16::<BigEndian>(self.counter).unwrap();
+ self.prefix.clone() + &hex::encode(buf)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ #[test]
+ fn text_generate_uniq_id() {
+ use super::UNIQ_ID_GENERATOR;
+ for i in 0..10 {
+ let uid = UNIQ_ID_GENERATOR.lock().generate();
+ println!("i: {}, uid: {}", i, uid);
+ }
+ }
+}
diff --git a/rust/src/lib.rs b/rust/src/models/message_view.rs
similarity index 73%
copy from rust/src/lib.rs
copy to rust/src/models/message_view.rs
index 35d0d907..f0849b0b 100644
--- a/rust/src/lib.rs
+++ b/rust/src/models/message_view.rs
@@ -14,22 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#[allow(dead_code)]
-mod conf;
-#[allow(dead_code)]
-mod error;
-#[allow(dead_code)]
-mod log;
-
-mod client;
-mod model;
-
-#[allow(clippy::all)]
-#[path = "pb/apache.rocketmq.v2.rs"]
-mod pb;
-mod session;
-
-pub(crate) mod producer;
-
-// Export structs that are part of crate API.
-pub use producer::Producer;
+#[derive(Debug)]
+pub(crate) struct MessageView {
+ pub(crate) body: Vec<u8>,
+ pub(crate) message_id: String,
+ pub(crate) topic: String,
+ pub(crate) consume_group: String,
+ pub(crate) endpoint: String,
+ pub(crate) receipt_handle: String,
+}
diff --git a/rust/src/lib.rs b/rust/src/models/mod.rs
similarity index 73%
copy from rust/src/lib.rs
copy to rust/src/models/mod.rs
index 35d0d907..8496998d 100644
--- a/rust/src/lib.rs
+++ b/rust/src/models/mod.rs
@@ -14,22 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#[allow(dead_code)]
-mod conf;
-#[allow(dead_code)]
-mod error;
-#[allow(dead_code)]
-mod log;
-
-mod client;
-mod model;
-
-#[allow(clippy::all)]
-#[path = "pb/apache.rocketmq.v2.rs"]
-mod pb;
-mod session;
-
-pub(crate) mod producer;
-
-// Export structs that are part of crate API.
-pub use producer::Producer;
+pub(crate) mod message;
+pub(crate) mod message_id;
+pub(crate) mod message_view;