seokjin0414 commented on code in PR #3046:
URL: https://github.com/apache/iggy/pull/3046#discussion_r3173004926
##########
foreign/cpp/Cargo.toml:
##########
@@ -27,6 +27,7 @@ ignored = ["cxx-build"]
crate-type = ["staticlib"]
[dependencies]
+bytes = "1.11.1"
Review Comment:
as @slbotbm noted — cpp crate is workspace-excluded (same as python sdk), so
the pin stays.
##########
foreign/cpp/src/client.rs:
##########
@@ -127,6 +139,132 @@ impl Client {
// })
// }
+ #[allow(clippy::too_many_arguments)]
+ pub fn send_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<ffi::Message>,
+ ) -> Result<(), String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+
+ let partitioning = match partitioning_kind.as_str() {
+ "balanced" => Partitioning::balanced(),
+ "partition_id" => {
+ if partitioning_value.len() < 4 {
Review Comment:
tightened to `!= 4` — 8-byte payloads now error explicitly.
##########
foreign/cpp/src/client.rs:
##########
@@ -127,6 +139,132 @@ impl Client {
// })
// }
+ #[allow(clippy::too_many_arguments)]
+ pub fn send_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<ffi::Message>,
+ ) -> Result<(), String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+
+ let partitioning = match partitioning_kind.as_str() {
+ "balanced" => Partitioning::balanced(),
+ "partition_id" => {
+ if partitioning_value.len() < 4 {
+ return Err(
+ "Could not send messages: partition_id requires 4
bytes".to_string()
+ );
+ }
+ let id =
u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| {
+ "Could not send messages: invalid partition_id
value".to_string()
+ })?);
+ Partitioning::partition_id(id)
+ }
+ "messages_key" =>
Partitioning::messages_key(&partitioning_value).map_err(|error| {
Review Comment:
rejected at the ffi boundary now.
##########
foreign/cpp/src/client.rs:
##########
@@ -127,6 +139,132 @@ impl Client {
// })
// }
+ #[allow(clippy::too_many_arguments)]
+ pub fn send_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<ffi::Message>,
+ ) -> Result<(), String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+
+ let partitioning = match partitioning_kind.as_str() {
+ "balanced" => Partitioning::balanced(),
+ "partition_id" => {
+ if partitioning_value.len() < 4 {
+ return Err(
+ "Could not send messages: partition_id requires 4
bytes".to_string()
+ );
+ }
+ let id =
u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| {
+ "Could not send messages: invalid partition_id
value".to_string()
+ })?);
+ Partitioning::partition_id(id)
+ }
+ "messages_key" =>
Partitioning::messages_key(&partitioning_value).map_err(|error| {
+ format!("Could not send messages: invalid messages key:
{error}")
+ })?,
+ _ => {
+ return Err(format!(
+ "Could not send messages: invalid partitioning kind:
{partitioning_kind}"
+ ));
+ }
+ };
+
+ let mut iggy_messages: Vec<IggyMessage> = messages
+ .into_iter()
+ .map(IggyMessage::try_from)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .send_messages(
+ &rust_stream_id,
+ &rust_topic_id,
+ &partitioning,
+ &mut iggy_messages,
+ )
+ .await
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ Ok(())
+ })
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn poll_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partition_id: u32,
+ consumer_kind: String,
+ consumer_id: ffi::Identifier,
+ polling_strategy_kind: String,
+ polling_strategy_value: u64,
+ count: u32,
+ auto_commit: bool,
+ ) -> Result<ffi::PolledMessages, String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_consumer_id = RustIdentifier::try_from(consumer_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+
+ let consumer = Consumer {
Review Comment:
switched to `Consumer::new()` / `Consumer::group()`.
##########
foreign/cpp/src/client.rs:
##########
@@ -127,6 +139,132 @@ impl Client {
// })
// }
+ #[allow(clippy::too_many_arguments)]
+ pub fn send_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<ffi::Message>,
+ ) -> Result<(), String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+
+ let partitioning = match partitioning_kind.as_str() {
+ "balanced" => Partitioning::balanced(),
+ "partition_id" => {
+ if partitioning_value.len() < 4 {
+ return Err(
+ "Could not send messages: partition_id requires 4
bytes".to_string()
+ );
+ }
+ let id =
u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| {
+ "Could not send messages: invalid partition_id
value".to_string()
+ })?);
+ Partitioning::partition_id(id)
+ }
+ "messages_key" =>
Partitioning::messages_key(&partitioning_value).map_err(|error| {
+ format!("Could not send messages: invalid messages key:
{error}")
+ })?,
+ _ => {
+ return Err(format!(
+ "Could not send messages: invalid partitioning kind:
{partitioning_kind}"
+ ));
+ }
+ };
+
+ let mut iggy_messages: Vec<IggyMessage> = messages
+ .into_iter()
+ .map(IggyMessage::try_from)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .send_messages(
+ &rust_stream_id,
+ &rust_topic_id,
+ &partitioning,
+ &mut iggy_messages,
+ )
+ .await
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ Ok(())
+ })
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn poll_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partition_id: u32,
+ consumer_kind: String,
+ consumer_id: ffi::Identifier,
+ polling_strategy_kind: String,
+ polling_strategy_value: u64,
+ count: u32,
+ auto_commit: bool,
+ ) -> Result<ffi::PolledMessages, String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_consumer_id = RustIdentifier::try_from(consumer_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+
+ let consumer = Consumer {
+ kind: match consumer_kind.as_str() {
+ "consumer" => ConsumerKind::Consumer,
+ "consumer_group" => ConsumerKind::ConsumerGroup,
+ _ => {
+ return Err(format!(
+ "Could not poll messages: invalid consumer kind:
{consumer_kind}"
+ ));
+ }
+ },
+ id: rust_consumer_id,
+ };
+
+ let strategy = match polling_strategy_kind.as_str() {
+ "offset" => PollingStrategy::offset(polling_strategy_value),
+ "timestamp" =>
PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)),
+ "first" => PollingStrategy::first(),
+ "last" => PollingStrategy::last(),
+ "next" => PollingStrategy::next(),
+ _ => {
+ return Err(format!(
+ "Could not poll messages: invalid polling strategy:
{polling_strategy_kind}"
+ ));
+ }
+ };
+
+ let opt_partition = if partition_id == u32::MAX {
Review Comment:
exposed as `ANY_PARTITION_ID` with a doc-comment.
##########
foreign/cpp/src/lib.rs:
##########
@@ -51,6 +52,36 @@ mod ffi {
partitions_count: u32,
}
+ struct Stream {
+ id: u32,
+ created_at: u64,
+ name: String,
+ size_bytes: u64,
+ messages_count: u64,
+ topics_count: u32,
+ }
+
+ struct Message {
Review Comment:
reverted to separate `IggyMessageToSend` / `IggyMessagePolled` per
@slbotbm's call. send path only exposes caller-settable fields now.
##########
foreign/cpp/src/lib.rs:
##########
@@ -129,6 +161,44 @@ mod ffi {
topic_id: Identifier,
group_id: Identifier,
) -> Result<()>;
+ fn join_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ ) -> Result<()>;
+ fn leave_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ ) -> Result<()>;
+
+ #[allow(clippy::too_many_arguments)]
+ fn poll_messages(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partition_id: u32,
+ consumer_kind: String,
+ consumer_id: Identifier,
+ polling_strategy_kind: String,
+ polling_strategy_value: u64,
+ count: u32,
+ auto_commit: bool,
+ ) -> Result<PolledMessages>;
+
+ fn new_message(self: &mut Message, payload: Vec<u8>);
Review Comment:
resolved by the split — `make_message(payload) -> IggyMessageToSend` is a
free fn, caller no longer default-constructs and mutates.
##########
foreign/cpp/src/messages.rs:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as
RustPolledMessages};
+
+impl ffi::Message {
+ pub fn new_message(&mut self, payload: Vec<u8>) {
Review Comment:
dropped from the send struct. SDK builder recomputes it anyway.
##########
foreign/cpp/src/messages.rs:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as
RustPolledMessages};
+
+impl ffi::Message {
+ pub fn new_message(&mut self, payload: Vec<u8>) {
+ let payload_length = payload.len() as u32;
+ *self = Self {
+ checksum: 0,
+ id_lo: 0,
+ id_hi: 0,
+ offset: 0,
+ timestamp: 0,
+ origin_timestamp: 0,
+ user_headers_length: 0,
+ payload_length,
+ reserved: 0,
+ payload,
+ user_headers: Vec::new(),
+ };
+ }
+}
+
+impl From<RustIggyMessage> for ffi::Message {
+ fn from(m: RustIggyMessage) -> Self {
+ let id = m.header.id;
+ ffi::Message {
+ checksum: m.header.checksum,
+ id_lo: id as u64,
Review Comment:
swapped for `u128::to_le_bytes()` + split.
##########
foreign/cpp/src/messages.rs:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as
RustPolledMessages};
+
+impl ffi::Message {
+ pub fn new_message(&mut self, payload: Vec<u8>) {
+ let payload_length = payload.len() as u32;
+ *self = Self {
+ checksum: 0,
+ id_lo: 0,
+ id_hi: 0,
+ offset: 0,
+ timestamp: 0,
+ origin_timestamp: 0,
+ user_headers_length: 0,
+ payload_length,
+ reserved: 0,
+ payload,
+ user_headers: Vec::new(),
+ };
+ }
+}
+
+impl From<RustIggyMessage> for ffi::Message {
+ fn from(m: RustIggyMessage) -> Self {
+ let id = m.header.id;
+ ffi::Message {
+ checksum: m.header.checksum,
+ id_lo: id as u64,
+ id_hi: (id >> 64) as u64,
+ offset: m.header.offset,
+ timestamp: m.header.timestamp,
+ origin_timestamp: m.header.origin_timestamp,
+ user_headers_length: m.header.user_headers_length,
+ payload_length: m.header.payload_length,
+ reserved: m.header.reserved,
+ payload: m.payload.to_vec(),
+ user_headers: m.user_headers.map(|h|
h.to_vec()).unwrap_or_default(),
+ }
+ }
+}
+
+impl TryFrom<ffi::Message> for RustIggyMessage {
Review Comment:
rejected explicitly when non-empty. parsing bytes back into
`BTreeMap<HeaderKey, HeaderValue>` is more than fits here — follow-up PR.
##########
foreign/cpp/src/messages.rs:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as
RustPolledMessages};
+
+impl ffi::Message {
+ pub fn new_message(&mut self, payload: Vec<u8>) {
+ let payload_length = payload.len() as u32;
+ *self = Self {
+ checksum: 0,
+ id_lo: 0,
+ id_hi: 0,
+ offset: 0,
+ timestamp: 0,
+ origin_timestamp: 0,
+ user_headers_length: 0,
+ payload_length,
+ reserved: 0,
+ payload,
+ user_headers: Vec::new(),
+ };
+ }
+}
+
+impl From<RustIggyMessage> for ffi::Message {
+ fn from(m: RustIggyMessage) -> Self {
+ let id = m.header.id;
+ ffi::Message {
+ checksum: m.header.checksum,
+ id_lo: id as u64,
+ id_hi: (id >> 64) as u64,
+ offset: m.header.offset,
+ timestamp: m.header.timestamp,
+ origin_timestamp: m.header.origin_timestamp,
+ user_headers_length: m.header.user_headers_length,
+ payload_length: m.header.payload_length,
+ reserved: m.header.reserved,
+ payload: m.payload.to_vec(),
+ user_headers: m.user_headers.map(|h|
h.to_vec()).unwrap_or_default(),
+ }
+ }
+}
+
+impl TryFrom<ffi::Message> for RustIggyMessage {
+ type Error = String;
+
+ fn try_from(m: ffi::Message) -> Result<Self, Self::Error> {
+ let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128);
+ let payload = Bytes::from(m.payload);
Review Comment:
removed. confirmed `IggyMessageBuilder` treats `Some(0)` and `None`
identically, so the branch was a no-op.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]