slbotbm commented on code in PR #3046:
URL: https://github.com/apache/iggy/pull/3046#discussion_r3214985434
##########
foreign/cpp/src/client.rs:
##########
@@ -127,6 +139,132 @@ impl Client {
// })
// }
+ #[allow(clippy::too_many_arguments)]
+ pub fn send_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<ffi::Message>,
+ ) -> Result<(), String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+
+ let partitioning = match partitioning_kind.as_str() {
+ "balanced" => Partitioning::balanced(),
+ "partition_id" => {
+ if partitioning_value.len() < 4 {
+ return Err(
+ "Could not send messages: partition_id requires 4
bytes".to_string()
+ );
+ }
+ let id =
u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| {
+ "Could not send messages: invalid partition_id
value".to_string()
+ })?);
+ Partitioning::partition_id(id)
+ }
+ "messages_key" =>
Partitioning::messages_key(&partitioning_value).map_err(|error| {
+ format!("Could not send messages: invalid messages key:
{error}")
+ })?,
+ _ => {
+ return Err(format!(
+ "Could not send messages: invalid partitioning kind:
{partitioning_kind}"
+ ));
+ }
+ };
+
+ let mut iggy_messages: Vec<IggyMessage> = messages
+ .into_iter()
+ .map(IggyMessage::try_from)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .send_messages(
+ &rust_stream_id,
+ &rust_topic_id,
+ &partitioning,
+ &mut iggy_messages,
+ )
+ .await
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ Ok(())
+ })
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn poll_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partition_id: u32,
+ consumer_kind: String,
+ consumer_id: ffi::Identifier,
+ polling_strategy_kind: String,
+ polling_strategy_value: u64,
+ count: u32,
+ auto_commit: bool,
+ ) -> Result<ffi::PolledMessages, String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_consumer_id = RustIdentifier::try_from(consumer_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+
+ let consumer = Consumer {
+ kind: match consumer_kind.as_str() {
+ "consumer" => ConsumerKind::Consumer,
+ "consumer_group" => ConsumerKind::ConsumerGroup,
+ _ => {
+ return Err(format!(
+ "Could not poll messages: invalid consumer kind:
{consumer_kind}"
+ ));
+ }
+ },
+ id: rust_consumer_id,
+ };
+
+ let strategy = match polling_strategy_kind.as_str() {
+ "offset" => PollingStrategy::offset(polling_strategy_value),
+ "timestamp" =>
PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)),
+ "first" => PollingStrategy::first(),
+ "last" => PollingStrategy::last(),
+ "next" => PollingStrategy::next(),
+ _ => {
+ return Err(format!(
+ "Could not poll messages: invalid polling strategy:
{polling_strategy_kind}"
+ ));
+ }
+ };
+
+ let opt_partition = if partition_id == u32::MAX {
Review Comment:
Since this is a constant that is used only once, I would rather not define
it as a separate constant. The magic number approach seems fine to me since
this is only the low level api. Once the high-level api is created, this can be
documented explicitly there. What do you think @hubcio ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]