This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch wasm in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fc921efe0d9907e9d75b3ec24eb3a93b0f584a2d Author: Maciej Modzelewski <[email protected]> AuthorDate: Thu Apr 2 12:31:57 2026 +0200 wasm support --- Cargo.lock | 4 ++ Cargo.toml | 2 +- core/common/Cargo.toml | 25 ++++++--- core/common/src/alloc/buffer.rs | 65 ++++++++++++++++------ core/common/src/alloc/mod.rs | 9 +++ core/common/src/error/client_error.rs | 3 +- core/common/src/lib.rs | 8 +++ core/common/src/locking/mod.rs | 6 ++ core/common/src/locking/{mod.rs => wasm_lock.rs} | 45 +++++++++------ core/common/src/traits/binary_client.rs | 3 +- core/common/src/traits/binary_impls/cluster.rs | 3 +- .../src/traits/binary_impls/consumer_groups.rs | 3 +- .../src/traits/binary_impls/consumer_offsets.rs | 3 +- core/common/src/traits/binary_impls/messages.rs | 3 +- core/common/src/traits/binary_impls/partitions.rs | 3 +- .../traits/binary_impls/personal_access_tokens.rs | 3 +- core/common/src/traits/binary_impls/segments.rs | 3 +- core/common/src/traits/binary_impls/streams.rs | 3 +- core/common/src/traits/binary_impls/system.rs | 3 +- core/common/src/traits/binary_impls/topics.rs | 3 +- core/common/src/traits/binary_impls/users.rs | 3 +- core/common/src/traits/binary_transport.rs | 3 +- core/common/src/traits/client.rs | 3 +- core/common/src/traits/cluster_client.rs | 3 +- core/common/src/traits/consumer_group_client.rs | 3 +- core/common/src/traits/consumer_offset_client.rs | 3 +- core/common/src/traits/message_client.rs | 3 +- core/common/src/traits/partition_client.rs | 3 +- .../src/traits/personal_access_token_client.rs | 3 +- core/common/src/traits/segment_client.rs | 3 +- core/common/src/traits/stream_client.rs | 3 +- core/common/src/traits/system_client.rs | 3 +- core/common/src/traits/topic_client.rs | 3 +- core/common/src/traits/user_client.rs | 3 +- .../websocket_config/websocket_client_config.rs | 17 ++++++ .../common/src/types/message/messages_batch_mut.rs | 15 ++++- core/common/src/types/mod.rs | 1 + .../src/types/permissions/permissions_global.rs | 5 ++ .../common/src/types/personal_access_tokens/mod.rs | 9 ++- core/sdk/Cargo.toml | 18 ++++-- core/sdk/src/http/cluster.rs | 3 +- core/sdk/src/http/consumer_groups.rs | 3 +- core/sdk/src/http/consumer_offsets.rs | 3 +- core/sdk/src/http/http_client.rs | 28 +++++++++- core/sdk/src/http/http_transport.rs | 3 +- core/sdk/src/http/messages.rs | 3 +- core/sdk/src/http/partitions.rs | 3 +- core/sdk/src/http/personal_access_tokens.rs | 3 +- core/sdk/src/http/segments.rs | 3 +- core/sdk/src/http/streams.rs | 3 +- core/sdk/src/http/system.rs | 3 +- core/sdk/src/http/topics.rs | 3 +- core/sdk/src/http/users.rs | 3 +- core/sdk/src/lib.rs | 34 ++++++++--- core/sdk/src/prelude.rs | 51 ++++++++++------- 55 files changed, 330 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4be27b7fb..aac971bf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5220,6 +5220,8 @@ dependencies = [ "flume 0.12.0", "futures", "futures-util", + "getrandom 0.2.17", + "getrandom 0.3.4", "iggy_common", "mockall", "quinn", @@ -5442,6 +5444,8 @@ dependencies = [ "crossbeam", "derive_more", "err_trail", + "getrandom 0.2.17", + "getrandom 0.3.4", "human-repr", "humantime", "iggy_binary_protocol", diff --git a/Cargo.toml b/Cargo.toml index 877491457..c1c369f96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,7 +233,7 @@ rand_xoshiro = "0.8.0" rayon = "1.11.0" rcgen = "0.14.7" regex = "1.12.3" -reqwest = { version = "0.13.2", default-features = false, features = ["json", "rustls"] } +reqwest = { version = "0.13.2", default-features = false, features = ["json", "query", "rustls"] } reqwest-middleware = { version = "0.5.1", features = ["json", "query"] } reqwest-retry = "0.9.1" reqwest-tracing = "0.7.0" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index e600fc4b0..6eda54c14 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -40,9 +40,6 @@ bytemuck = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } -comfy-table = { workspace = true } -compio = { workspace = true } -crossbeam = { workspace = true } derive_more = { workspace = true } err_trail = { workspace = true } human-repr = { workspace = true } @@ -50,25 +47,35 @@ humantime = { workspace = true } iggy_binary_protocol = { workspace = true } iobuf = { workspace = true } lending-iterator = { workspace = true } -moka = { workspace = true } once_cell = { workspace = true } papaya = { workspace = true } -rcgen = { workspace = true } -ring = { workspace = true } -rustls = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } strum = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true } tracing = { workspace = true } -tungstenite = { workspace = true } twox-hash = { workspace = true } ulid = { workspace = true } uuid = { workspace = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +comfy-table = { workspace = true } +compio = { workspace = true } +crossbeam = { workspace = true } +moka = { workspace = true } +rcgen = { workspace = true } +ring = { workspace = true } +rustls = { workspace = true } +tokio = { workspace = true } +tungstenite = { workspace = true } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom_02 = { package = "getrandom", version = "0.2", features = ["js"] } +getrandom_03 = { package = "getrandom", version = "0.3", features = ["wasm_js"] } +uuid = { workspace = true, features = ["js"] } + [target.'cfg(unix)'.dependencies] nix = { workspace = true } diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index 1291febb4..bad1b2d16 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -16,15 +16,15 @@ * under the License. */ +#[cfg(not(target_arch = "wasm32"))] use crate::alloc::memory_pool::{ALIGNMENT, AlignedBuffer}; +#[cfg(target_arch = "wasm32")] +use crate::alloc::wasm_alloc_types::{ALIGNMENT, AlignedBuffer}; +#[cfg(not(target_arch = "wasm32"))] use super::memory_pool::{AlignedBufferExt, memory_pool}; use bytes::Bytes; -use compio::buf::{IoBuf, IoBufMut, SetLen}; -use std::{ - mem::MaybeUninit, - ops::{Deref, DerefMut}, -}; +use std::ops::{Deref, DerefMut}; /// A buffer wrapper that participates in memory pooling. /// @@ -51,6 +51,7 @@ impl PooledBuffer { /// # Arguments /// /// * `capacity` - The capacity of the buffer + #[cfg(not(target_arch = "wasm32"))] pub fn with_capacity(capacity: usize) -> Self { let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity.max(ALIGNMENT)); let original_capacity = buffer.capacity(); @@ -75,6 +76,20 @@ impl PooledBuffer { } } + /// Creates a new buffer with the specified capacity (no pooling on WASM). + #[cfg(target_arch = "wasm32")] + pub fn with_capacity(capacity: usize) -> Self { + let capacity = capacity.max(ALIGNMENT); + let aligned_capacity = capacity.next_multiple_of(ALIGNMENT).max(ALIGNMENT); + let buffer = AlignedBuffer::with_capacity(ALIGNMENT, aligned_capacity); + Self { + from_pool: false, + original_capacity: buffer.capacity(), + original_bucket_idx: None, + inner: buffer, + } + } + /// Creates a new pooled buffer from an existing `AlignedBuffer`. /// /// # Arguments @@ -101,6 +116,7 @@ impl PooledBuffer { /// Checks if the buffer needs to be resized and updates the memory pool accordingly. /// This shall be called after operations that might cause a resize. + #[cfg(not(target_arch = "wasm32"))] pub fn check_for_resize(&mut self) { if !self.from_pool { return; @@ -129,6 +145,10 @@ impl PooledBuffer { } } + /// No-op on WASM (no memory pool). + #[cfg(target_arch = "wasm32")] + pub fn check_for_resize(&mut self) {} + /// Wrapper for reserve which might cause resize pub fn reserve(&mut self, additional: usize) { let before_cap = self.inner.capacity(); @@ -269,6 +289,7 @@ impl PooledBuffer { let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); // Update pool accounting + #[cfg(not(target_arch = "wasm32"))] if self.from_pool && let Some(bucket_idx) = self.original_bucket_idx { @@ -298,6 +319,7 @@ impl DerefMut for PooledBuffer { } } +#[cfg(not(target_arch = "wasm32"))] impl Drop for PooledBuffer { fn drop(&mut self) { if self.from_pool { @@ -327,22 +349,29 @@ impl From<AlignedBuffer> for PooledBuffer { } } -impl SetLen for PooledBuffer { - unsafe fn set_len(&mut self, len: usize) { - unsafe { self.inner.set_len(len) }; +#[cfg(not(target_arch = "wasm32"))] +mod compio_impls { + use super::PooledBuffer; + use compio::buf::{IoBuf, IoBufMut, SetLen}; + use std::mem::MaybeUninit; + + impl SetLen for PooledBuffer { + unsafe fn set_len(&mut self, len: usize) { + unsafe { self.inner.set_len(len) }; + } } -} -impl IoBuf for PooledBuffer { - fn as_init(&self) -> &[u8] { - &self.inner[..] + impl IoBuf for PooledBuffer { + fn as_init(&self) -> &[u8] { + &self.inner[..] + } } -} -impl IoBufMut for PooledBuffer { - fn as_uninit(&mut self) -> &mut [MaybeUninit<u8>] { - let ptr = self.inner.as_mut_ptr().cast::<MaybeUninit<u8>>(); - let cap = self.inner.capacity(); - unsafe { std::slice::from_raw_parts_mut(ptr, cap) } + impl IoBufMut for PooledBuffer { + fn as_uninit(&mut self) -> &mut [MaybeUninit<u8>] { + let ptr = self.inner.as_mut_ptr().cast::<MaybeUninit<u8>>(); + let cap = self.inner.capacity(); + unsafe { std::slice::from_raw_parts_mut(ptr, cap) } + } } } diff --git a/core/common/src/alloc/mod.rs b/core/common/src/alloc/mod.rs index 3d80f4dff..6c585f6ff 100644 --- a/core/common/src/alloc/mod.rs +++ b/core/common/src/alloc/mod.rs @@ -17,4 +17,13 @@ */ pub(crate) mod buffer; +#[cfg(not(target_arch = "wasm32"))] pub(crate) mod memory_pool; + +// On WASM, provide the core alloc types that buffer.rs needs without the pool. +#[cfg(target_arch = "wasm32")] +pub(crate) mod wasm_alloc_types { + use aligned_vec::{AVec, ConstAlign}; + pub const ALIGNMENT: usize = 4096; + pub type AlignedBuffer = AVec<u8, ConstAlign<4096>>; +} diff --git a/core/common/src/error/client_error.rs b/core/common/src/error/client_error.rs index 31fe8122a..d846ad75f 100644 --- a/core/common/src/error/client_error.rs +++ b/core/common/src/error/client_error.rs @@ -16,8 +16,9 @@ * under the License. */ +use std::io; + use thiserror::Error; -use tokio::io; use crate::IggyError; diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 3e9b20bdf..ebaf47ac4 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -18,11 +18,14 @@ */ mod alloc; +#[cfg(not(target_arch = "wasm32"))] mod certificates; +#[cfg(not(target_arch = "wasm32"))] mod deduplication; mod error; pub mod http; mod macros; +#[cfg(not(target_arch = "wasm32"))] mod sender; pub mod sharding; mod traits; @@ -35,9 +38,12 @@ pub use error::iggy_error::{IggyError, IggyErrorDiscriminants}; // Locking is feature gated, thus only mod level re-export. pub mod locking; pub use alloc::buffer::PooledBuffer; +#[cfg(not(target_arch = "wasm32"))] pub use alloc::memory_pool::{MEMORY_POOL, MemoryPool, MemoryPoolConfigOther, memory_pool}; +#[cfg(not(target_arch = "wasm32"))] pub use certificates::generate_self_signed_certificate; pub use chrono::{DateTime, Duration as ChronoDuration, Utc}; +#[cfg(not(target_arch = "wasm32"))] pub use deduplication::MessageDeduplicator; pub use http::consumer_groups::*; pub use http::consumer_offsets::*; @@ -49,6 +55,7 @@ pub use http::streams::*; pub use http::system::*; pub use http::topics::*; pub use http::users::*; +#[cfg(not(target_arch = "wasm32"))] pub use sender::{ QuicSender, Sender, SenderKind, TcpSender, TcpTlsSender, WebSocketSender, WebSocketTlsSender, }; @@ -110,6 +117,7 @@ pub use types::permissions::permissions_global::*; pub use types::permissions::personal_access_token::*; pub use types::personal_access_tokens::*; pub use types::segment::Segment; +#[cfg(not(target_arch = "wasm32"))] pub use types::segment_storage::*; pub use types::send_messages2; pub use types::send_messages2::*; diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs index 2670f1e6b..af82aa51f 100644 --- a/core/common/src/locking/mod.rs +++ b/core/common/src/locking/mod.rs @@ -18,9 +18,15 @@ use std::ops::{Deref, DerefMut}; +#[cfg(not(target_arch = "wasm32"))] mod tokio_lock; +#[cfg(target_arch = "wasm32")] +mod wasm_lock; +#[cfg(not(target_arch = "wasm32"))] pub type IggyRwLock<T> = tokio_lock::IggyTokioRwLock<T>; +#[cfg(target_arch = "wasm32")] +pub type IggyRwLock<T> = wasm_lock::IggyWasmRwLock<T>; #[allow(async_fn_in_trait)] pub trait IggyRwLockFn<T> { diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/wasm_lock.rs similarity index 61% copy from core/common/src/locking/mod.rs copy to core/common/src/locking/wasm_lock.rs index 2670f1e6b..6b0d58c26 100644 --- a/core/common/src/locking/mod.rs +++ b/core/common/src/locking/wasm_lock.rs @@ -16,32 +16,43 @@ * under the License. */ -use std::ops::{Deref, DerefMut}; +use crate::locking::IggyRwLockFn; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; -mod tokio_lock; +#[derive(Debug)] +pub struct IggyWasmRwLock<T>(Arc<RwLock<T>>); -pub type IggyRwLock<T> = tokio_lock::IggyTokioRwLock<T>; - -#[allow(async_fn_in_trait)] -pub trait IggyRwLockFn<T> { - type ReadGuard<'a>: Deref<Target = T> +impl<T> IggyRwLockFn<T> for IggyWasmRwLock<T> { + type ReadGuard<'a> + = RwLockReadGuard<'a, T> where - T: 'a, - Self: 'a; - type WriteGuard<'a>: DerefMut<Target = T> + T: 'a; + type WriteGuard<'a> + = RwLockWriteGuard<'a, T> where - T: 'a, - Self: 'a; + T: 'a; - fn new(data: T) -> Self - where - Self: Sized; + fn new(data: T) -> Self { + IggyWasmRwLock(Arc::new(RwLock::new(data))) + } async fn read<'a>(&'a self) -> Self::ReadGuard<'a> where - T: 'a; + T: 'a, + { + self.0.read().expect("RwLock poisoned") + } async fn write<'a>(&'a self) -> Self::WriteGuard<'a> where - T: 'a; + T: 'a, + { + self.0.write().expect("RwLock poisoned") + } +} + +impl<T> Clone for IggyWasmRwLock<T> { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } } diff --git a/core/common/src/traits/binary_client.rs b/core/common/src/traits/binary_client.rs index 94f97c06f..9a0f7941d 100644 --- a/core/common/src/traits/binary_client.rs +++ b/core/common/src/traits/binary_client.rs @@ -20,5 +20,6 @@ use crate::{BinaryTransport, Client}; use async_trait::async_trait; /// A client that can send and receive binary messages. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait BinaryClient: BinaryTransport + Client {} diff --git a/core/common/src/traits/binary_impls/cluster.rs b/core/common/src/traits/binary_impls/cluster.rs index c51f17906..ab555b5c1 100644 --- a/core/common/src/traits/binary_impls/cluster.rs +++ b/core/common/src/traits/binary_impls/cluster.rs @@ -24,7 +24,8 @@ use iggy_binary_protocol::codes::GET_CLUSTER_METADATA_CODE; use iggy_binary_protocol::requests::system::GetClusterMetadataRequest; use iggy_binary_protocol::responses::system::get_cluster_metadata::ClusterMetadataResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> ClusterClient for B { async fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> { fail_if_not_authenticated(self).await?; diff --git a/core/common/src/traits/binary_impls/consumer_groups.rs b/core/common/src/traits/binary_impls/consumer_groups.rs index 25e21ab75..a45114b71 100644 --- a/core/common/src/traits/binary_impls/consumer_groups.rs +++ b/core/common/src/traits/binary_impls/consumer_groups.rs @@ -34,7 +34,8 @@ use iggy_binary_protocol::requests::consumer_groups::{ use iggy_binary_protocol::responses::consumer_groups::get_consumer_group::ConsumerGroupDetailsResponse; use iggy_binary_protocol::responses::consumer_groups::get_consumer_groups::GetConsumerGroupsResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> ConsumerGroupClient for B { async fn get_consumer_group( &self, diff --git a/core/common/src/traits/binary_impls/consumer_offsets.rs b/core/common/src/traits/binary_impls/consumer_offsets.rs index af665a513..b37057f2e 100644 --- a/core/common/src/traits/binary_impls/consumer_offsets.rs +++ b/core/common/src/traits/binary_impls/consumer_offsets.rs @@ -30,7 +30,8 @@ use iggy_binary_protocol::requests::consumer_offsets::{ }; use iggy_binary_protocol::responses::consumer_offsets::get_consumer_offset::ConsumerOffsetResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> ConsumerOffsetClient for B { async fn store_consumer_offset( &self, diff --git a/core/common/src/traits/binary_impls/messages.rs b/core/common/src/traits/binary_impls/messages.rs index 783e5b693..99ae9c037 100644 --- a/core/common/src/traits/binary_impls/messages.rs +++ b/core/common/src/traits/binary_impls/messages.rs @@ -33,7 +33,8 @@ use iggy_binary_protocol::requests::messages::{ FlushUnsavedBufferRequest, PollMessagesRequest, RawMessage, SendMessagesEncoder, }; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> MessageClient for B { async fn poll_messages( &self, diff --git a/core/common/src/traits/binary_impls/partitions.rs b/core/common/src/traits/binary_impls/partitions.rs index 04ea31237..278bb4a55 100644 --- a/core/common/src/traits/binary_impls/partitions.rs +++ b/core/common/src/traits/binary_impls/partitions.rs @@ -25,7 +25,8 @@ use iggy_binary_protocol::requests::partitions::{ CreatePartitionsRequest, DeletePartitionsRequest, }; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> PartitionClient for B { async fn create_partitions( &self, diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs b/core/common/src/traits/binary_impls/personal_access_tokens.rs index f85f34d36..9947c11d1 100644 --- a/core/common/src/traits/binary_impls/personal_access_tokens.rs +++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs @@ -37,7 +37,8 @@ use iggy_binary_protocol::responses::personal_access_tokens::create_personal_acc use iggy_binary_protocol::responses::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokensResponse; use iggy_binary_protocol::responses::users::login_user::IdentityResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> PersonalAccessTokenClient for B { async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { fail_if_not_authenticated(self).await?; diff --git a/core/common/src/traits/binary_impls/segments.rs b/core/common/src/traits/binary_impls/segments.rs index 413f3f03f..fdcf30f4a 100644 --- a/core/common/src/traits/binary_impls/segments.rs +++ b/core/common/src/traits/binary_impls/segments.rs @@ -23,7 +23,8 @@ use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::DELETE_SEGMENTS_CODE; use iggy_binary_protocol::requests::segments::DeleteSegmentsRequest; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> SegmentClient for B { async fn delete_segments( &self, diff --git a/core/common/src/traits/binary_impls/streams.rs b/core/common/src/traits/binary_impls/streams.rs index bbfb015d3..df62ebd2b 100644 --- a/core/common/src/traits/binary_impls/streams.rs +++ b/core/common/src/traits/binary_impls/streams.rs @@ -32,7 +32,8 @@ use iggy_binary_protocol::requests::streams::{ use iggy_binary_protocol::responses::streams::get_stream::GetStreamResponse; use iggy_binary_protocol::responses::streams::get_streams::GetStreamsResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> StreamClient for B { async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { fail_if_not_authenticated(self).await?; diff --git a/core/common/src/traits/binary_impls/system.rs b/core/common/src/traits/binary_impls/system.rs index 365f2b0b8..5f7977ccd 100644 --- a/core/common/src/traits/binary_impls/system.rs +++ b/core/common/src/traits/binary_impls/system.rs @@ -35,7 +35,8 @@ use iggy_binary_protocol::responses::clients::get_client::ClientDetailsResponse; use iggy_binary_protocol::responses::clients::get_clients::GetClientsResponse; use iggy_binary_protocol::responses::system::get_stats::StatsResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> SystemClient for B { async fn get_stats(&self) -> Result<Stats, IggyError> { let response = self diff --git a/core/common/src/traits/binary_impls/topics.rs b/core/common/src/traits/binary_impls/topics.rs index 4c5c4ddbb..66cfa0061 100644 --- a/core/common/src/traits/binary_impls/topics.rs +++ b/core/common/src/traits/binary_impls/topics.rs @@ -35,7 +35,8 @@ use iggy_binary_protocol::requests::topics::{ use iggy_binary_protocol::responses::topics::get_topic::GetTopicResponse; use iggy_binary_protocol::responses::topics::get_topics::GetTopicsResponse; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> TopicClient for B { async fn get_topic( &self, diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index 24a877a4c..74a04420a 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -35,7 +35,8 @@ use iggy_binary_protocol::requests::users::{ use iggy_binary_protocol::responses::users::login_user::IdentityResponse; use iggy_binary_protocol::responses::users::{GetUsersResponse, UserDetailsResponse}; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl<B: BinaryClient> UserClient for B { async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { fail_if_not_authenticated(self).await?; diff --git a/core/common/src/traits/binary_transport.rs b/core/common/src/traits/binary_transport.rs index 4f6af3ba8..baea855b8 100644 --- a/core/common/src/traits/binary_transport.rs +++ b/core/common/src/traits/binary_transport.rs @@ -20,7 +20,8 @@ use crate::{ClientState, DiagnosticEvent, IggyDuration, IggyError}; use async_trait::async_trait; use bytes::Bytes; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait BinaryTransport { /// Gets the state of the client. async fn get_state(&self) -> ClientState; diff --git a/core/common/src/traits/client.rs b/core/common/src/traits/client.rs index 5c43523ad..d0624e4f1 100644 --- a/core/common/src/traits/client.rs +++ b/core/common/src/traits/client.rs @@ -28,7 +28,8 @@ use std::fmt::Debug; /// The client trait which is the main interface to the Iggy server. /// It consists of multiple modules, each of which is responsible for a specific set of commands. /// Except the ping, login and get me, all the other methods require authentication. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait Client: ClusterClient + SystemClient diff --git a/core/common/src/traits/cluster_client.rs b/core/common/src/traits/cluster_client.rs index 2212925a8..e2d7b5d51 100644 --- a/core/common/src/traits/cluster_client.rs +++ b/core/common/src/traits/cluster_client.rs @@ -20,7 +20,8 @@ use crate::{ClusterMetadata, IggyError}; use async_trait::async_trait; /// This trait defines the methods to interact with the cluster module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait ClusterClient { /// Get the metadata of the cluster including node information, roles, and status. /// diff --git a/core/common/src/traits/consumer_group_client.rs b/core/common/src/traits/consumer_group_client.rs index ef470983f..493db7de8 100644 --- a/core/common/src/traits/consumer_group_client.rs +++ b/core/common/src/traits/consumer_group_client.rs @@ -20,7 +20,8 @@ use crate::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError}; use async_trait::async_trait; /// This trait defines the methods to interact with the consumer group module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait ConsumerGroupClient { /// Get the info about a specific consumer group by unique ID or name for the given stream and topic by unique IDs or names. /// diff --git a/core/common/src/traits/consumer_offset_client.rs b/core/common/src/traits/consumer_offset_client.rs index a3673ff3a..f5df17fd1 100644 --- a/core/common/src/traits/consumer_offset_client.rs +++ b/core/common/src/traits/consumer_offset_client.rs @@ -20,7 +20,8 @@ use crate::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; use async_trait::async_trait; /// This trait defines the methods to interact with the consumer offset module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait ConsumerOffsetClient { /// Store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names. /// diff --git a/core/common/src/traits/message_client.rs b/core/common/src/traits/message_client.rs index 62ebf4aec..5fea28aa1 100644 --- a/core/common/src/traits/message_client.rs +++ b/core/common/src/traits/message_client.rs @@ -21,7 +21,8 @@ use crate::{ use async_trait::async_trait; /// This trait defines the methods to interact with the messaging module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait MessageClient { /// Poll given amount of messages using the specified consumer and strategy from the specified stream and topic by unique IDs or names. /// diff --git a/core/common/src/traits/partition_client.rs b/core/common/src/traits/partition_client.rs index c3c2f540a..d24497948 100644 --- a/core/common/src/traits/partition_client.rs +++ b/core/common/src/traits/partition_client.rs @@ -20,7 +20,8 @@ use crate::{Identifier, IggyError}; use async_trait::async_trait; /// This trait defines the methods to interact with the partition module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait PartitionClient { /// Create new N partitions for a topic by unique ID or name. /// diff --git a/core/common/src/traits/personal_access_token_client.rs b/core/common/src/traits/personal_access_token_client.rs index 603352b93..687c41cb5 100644 --- a/core/common/src/traits/personal_access_token_client.rs +++ b/core/common/src/traits/personal_access_token_client.rs @@ -23,7 +23,8 @@ use crate::{ use async_trait::async_trait; /// This trait defines the methods to interact with the personal access token module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait PersonalAccessTokenClient { /// Get the info about all the personal access tokens of the currently authenticated user. async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError>; diff --git a/core/common/src/traits/segment_client.rs b/core/common/src/traits/segment_client.rs index c70964e4a..57b4422f8 100644 --- a/core/common/src/traits/segment_client.rs +++ b/core/common/src/traits/segment_client.rs @@ -20,7 +20,8 @@ use crate::{Identifier, IggyError}; use async_trait::async_trait; /// This trait defines the methods to interact with the partition module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait SegmentClient { /// Delete last N segments for a partition by unique ID or name. /// diff --git a/core/common/src/traits/stream_client.rs b/core/common/src/traits/stream_client.rs index 5d3ef3564..66ffa50d1 100644 --- a/core/common/src/traits/stream_client.rs +++ b/core/common/src/traits/stream_client.rs @@ -20,7 +20,8 @@ use crate::{Identifier, IggyError, Stream, StreamDetails}; use async_trait::async_trait; /// This trait defines the methods to interact with the stream module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait StreamClient { /// Get the info about a specific stream by unique ID or name. /// diff --git a/core/common/src/traits/system_client.rs b/core/common/src/traits/system_client.rs index a674139be..6015801e2 100644 --- a/core/common/src/traits/system_client.rs +++ b/core/common/src/traits/system_client.rs @@ -23,7 +23,8 @@ use crate::{ use async_trait::async_trait; /// This trait defines the methods to interact with the system module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait SystemClient { /// Get the stats of the system such as PID, memory usage, streams count etc. /// diff --git a/core/common/src/traits/topic_client.rs b/core/common/src/traits/topic_client.rs index bf3406ebe..b51db5d74 100644 --- a/core/common/src/traits/topic_client.rs +++ b/core/common/src/traits/topic_client.rs @@ -23,7 +23,8 @@ use async_trait::async_trait; /// This trait defines the methods to interact with the topic module. #[allow(clippy::too_many_arguments)] -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait TopicClient { /// Get the info about a specific topic by unique ID or name. /// diff --git a/core/common/src/traits/user_client.rs b/core/common/src/traits/user_client.rs index 5964a093b..7ed207a58 100644 --- a/core/common/src/traits/user_client.rs +++ b/core/common/src/traits/user_client.rs @@ -22,7 +22,8 @@ use crate::{ use async_trait::async_trait; /// This trait defines the methods to interact with the user module. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait UserClient { /// Get the info about a specific user by unique ID or username. /// diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index d158fc52e..c39e080f2 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -21,6 +21,7 @@ use crate::types::configuration::websocket_config::websocket_connection_string_o use crate::{AutoLogin, IggyDuration, WebSocketClientReconnectionConfig}; use std::fmt::{Display, Formatter}; use std::str::FromStr; +#[cfg(not(target_arch = "wasm32"))] use tungstenite::protocol::WebSocketConfig as TungsteniteConfig; /// Configuration for the WebSocket client. @@ -80,6 +81,7 @@ impl Default for WebSocketClientConfig { } } +#[cfg(not(target_arch = "wasm32"))] impl Default for WebSocketConfig { fn default() -> Self { // Use tungstenite defaults @@ -95,8 +97,23 @@ impl Default for WebSocketConfig { } } +#[cfg(target_arch = "wasm32")] +impl Default for WebSocketConfig { + fn default() -> Self { + WebSocketConfig { + read_buffer_size: Some(8192), + write_buffer_size: Some(8192), + max_write_buffer_size: Some(usize::MAX), + max_message_size: Some(64 << 20), + max_frame_size: Some(16 << 20), + accept_unmasked_frames: false, + } + } +} + impl WebSocketConfig { /// Convert to tungstenite WebSocketConfig so we can use tungstenite defaults + #[cfg(not(target_arch = "wasm32"))] pub fn to_tungstenite_config(&self) -> TungsteniteConfig { let mut config = TungsteniteConfig::default(); diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index ee0333bc4..694564a3b 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -19,16 +19,24 @@ use super::indexes_mut::IggyIndexesMut; use super::message_boundaries::IggyMessageBoundaries; use super::message_view_mut::IggyMessageViewMutIterator; +#[cfg(not(target_arch = "wasm32"))] +use crate::MessageDeduplicator; +use crate::PooledBuffer; use crate::{ IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, IggyIndexView, IggyMessage, - IggyMessageView, IggyMessageViewIterator, IggyMessagesBatch, IggyTimestamp, MAX_PAYLOAD_SIZE, + IggyMessageView, IggyMessageViewIterator, IggyMessagesBatch, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable, }; -use crate::{MessageDeduplicator, PooledBuffer, random_id}; +#[cfg(not(target_arch = "wasm32"))] +use crate::{IggyTimestamp, random_id}; +#[cfg(not(target_arch = "wasm32"))] use lending_iterator::prelude::*; use std::ops::Index; +#[cfg(not(target_arch = "wasm32"))] use std::sync::Arc; -use tracing::{error, warn}; +use tracing::error; +#[cfg(not(target_arch = "wasm32"))] +use tracing::warn; /// A container for mutable messages that are being prepared for persistence. /// @@ -135,6 +143,7 @@ impl IggyMessagesBatchMut { /// # Returns /// /// An immutable `IggyMessagesBatch` ready for persistence + #[cfg(not(target_arch = "wasm32"))] pub async fn prepare_for_persistence( &mut self, start_offset: u64, diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs index 3e7321a80..56448b382 100644 --- a/core/common/src/types/mod.rs +++ b/core/common/src/types/mod.rs @@ -30,6 +30,7 @@ pub(crate) mod partition; pub(crate) mod permissions; pub(crate) mod personal_access_tokens; pub(crate) mod segment; +#[cfg(not(target_arch = "wasm32"))] pub(crate) mod segment_storage; pub mod send_messages2; pub(crate) mod snapshot; diff --git a/core/common/src/types/permissions/permissions_global.rs b/core/common/src/types/permissions/permissions_global.rs index 42f670715..b765e6de0 100644 --- a/core/common/src/types/permissions/permissions_global.rs +++ b/core/common/src/types/permissions/permissions_global.rs @@ -16,7 +16,9 @@ * under the License. */ +#[cfg(not(target_arch = "wasm32"))] use comfy_table::Table; +#[cfg(not(target_arch = "wasm32"))] use comfy_table::presets::ASCII_NO_BORDERS; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -216,6 +218,7 @@ impl Display for Permissions { } } +#[cfg(not(target_arch = "wasm32"))] impl From<GlobalPermissions> for Table { fn from(value: GlobalPermissions) -> Self { let mut table = Self::new(); @@ -261,6 +264,7 @@ impl From<GlobalPermissions> for Table { } } +#[cfg(not(target_arch = "wasm32"))] impl From<&TopicPermissions> for Table { fn from(value: &TopicPermissions) -> Self { let mut table = Self::new(); @@ -285,6 +289,7 @@ impl From<&TopicPermissions> for Table { } } +#[cfg(not(target_arch = "wasm32"))] impl From<&StreamPermissions> for Table { fn from(value: &StreamPermissions) -> Self { let mut table = Self::new(); diff --git a/core/common/src/types/personal_access_tokens/mod.rs b/core/common/src/types/personal_access_tokens/mod.rs index 0e1e33391..01364162f 100644 --- a/core/common/src/types/personal_access_tokens/mod.rs +++ b/core/common/src/types/personal_access_tokens/mod.rs @@ -18,11 +18,15 @@ use crate::IggyExpiry; use crate::IggyTimestamp; use crate::UserId; -use crate::text::as_base64; use crate::utils::hash; -use ring::rand::SecureRandom; use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] +use crate::text::as_base64; +#[cfg(not(target_arch = "wasm32"))] +use ring::rand::SecureRandom; + +#[cfg(not(target_arch = "wasm32"))] const SIZE: usize = 50; #[derive(Clone, Debug)] @@ -35,6 +39,7 @@ pub struct PersonalAccessToken { impl PersonalAccessToken { // Raw token is generated and returned only once + #[cfg(not(target_arch = "wasm32"))] pub fn new( user_id: UserId, name: &str, diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 3627c87db..2158e8da1 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -30,30 +30,36 @@ readme = "../../README.md" [dependencies] async-broadcast = { workspace = true } -async-dropper = { workspace = true } async-trait = { workspace = true } +iggy_common = { workspace = true } +reqwest = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +tracing = { workspace = true } + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +async-dropper = { workspace = true } bon = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } flume = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } -iggy_common = { workspace = true } quinn = { workspace = true } -reqwest = { workspace = true } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } reqwest-tracing = { workspace = true } rustls = { workspace = true } -secrecy = { workspace = true } -serde = { workspace = true } tokio = { workspace = true } tokio-rustls = { workspace = true } tokio-tungstenite = { workspace = true } -tracing = { workspace = true } trait-variant = { workspace = true } tungstenite = { workspace = true } webpki-roots = { workspace = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom_02 = { package = "getrandom", version = "0.2", features = ["js"] } +getrandom_03 = { package = "getrandom", version = "0.3", features = ["wasm_js"] } + [dev-dependencies] mockall = { workspace = true } diff --git a/core/sdk/src/http/cluster.rs b/core/sdk/src/http/cluster.rs index cec3a2e3c..90d1cf38d 100644 --- a/core/sdk/src/http/cluster.rs +++ b/core/sdk/src/http/cluster.rs @@ -22,7 +22,8 @@ use async_trait::async_trait; use iggy_common::ClusterClient; use iggy_common::{ClusterMetadata, IggyError}; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ClusterClient for HttpClient { async fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> { let response = self.get("/cluster/metadata").await?; diff --git a/core/sdk/src/http/consumer_groups.rs b/core/sdk/src/http/consumer_groups.rs index c309cbd27..519820908 100644 --- a/core/sdk/src/http/consumer_groups.rs +++ b/core/sdk/src/http/consumer_groups.rs @@ -25,7 +25,8 @@ use iggy_common::Identifier; use iggy_common::create_consumer_group::CreateConsumerGroup; use iggy_common::{ConsumerGroup, ConsumerGroupDetails}; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsumerGroupClient for HttpClient { async fn get_consumer_group( &self, diff --git a/core/sdk/src/http/consumer_offsets.rs b/core/sdk/src/http/consumer_offsets.rs index fe7435506..bbf30669d 100644 --- a/core/sdk/src/http/consumer_offsets.rs +++ b/core/sdk/src/http/consumer_offsets.rs @@ -26,7 +26,8 @@ use iggy_common::get_consumer_offset::GetConsumerOffset; use iggy_common::store_consumer_offset::StoreConsumerOffset; use iggy_common::{Consumer, ConsumerOffsetInfo}; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsumerOffsetClient for HttpClient { async fn store_consumer_offset( &self, diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs index a6ddcaa65..94bd07e58 100644 --- a/core/sdk/src/http/http_client.rs +++ b/core/sdk/src/http/http_client.rs @@ -26,8 +26,11 @@ use iggy_common::{ IdentityInfo, TransportProtocol, }; use reqwest::{Response, StatusCode, Url}; +#[cfg(not(target_arch = "wasm32"))] use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +#[cfg(not(target_arch = "wasm32"))] use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; +#[cfg(not(target_arch = "wasm32"))] use reqwest_tracing::{SpanBackendWithUrl, TracingMiddleware}; use serde::Serialize; use std::ops::Deref; @@ -50,12 +53,16 @@ pub struct HttpClient { /// The URL of the Iggy API. pub api_url: Url, pub(crate) heartbeat_interval: IggyDuration, + #[cfg(not(target_arch = "wasm32"))] client: ClientWithMiddleware, + #[cfg(target_arch = "wasm32")] + client: reqwest::Client, access_token: IggyRwLock<String>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Client for HttpClient { async fn connect(&self) -> Result<(), IggyError> { HttpClient::connect(self).await @@ -83,7 +90,8 @@ impl Default for HttpClient { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl HttpTransport for HttpClient { /// Get full URL for the provided path. fn get_url(&self, path: &str) -> Result<Url, IggyError> { @@ -265,6 +273,7 @@ impl HttpClient { } /// Create a new HTTP client for interacting with the Iggy API using the provided configuration. + #[cfg(not(target_arch = "wasm32"))] pub fn create(config: Arc<HttpClientConfig>) -> Result<Self, IggyError> { let api_url = Url::parse(&config.api_url); if api_url.is_err() { @@ -286,6 +295,21 @@ impl HttpClient { }) } + /// Create a new HTTP client for WASM (no middleware, uses browser fetch). + #[cfg(target_arch = "wasm32")] + pub fn create(config: Arc<HttpClientConfig>) -> Result<Self, IggyError> { + let api_url = Url::parse(&config.api_url).map_err(|_| IggyError::CannotParseUrl)?; + let client = reqwest::Client::new(); + + Ok(Self { + api_url, + client, + heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + access_token: IggyRwLock::new("".to_string()), + events: broadcast(1000), + }) + } + /// Create a new HttpClient from a connection string. pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { if ConnectionStringUtils::parse_protocol(connection_string)? != TransportProtocol::Http { diff --git a/core/sdk/src/http/http_transport.rs b/core/sdk/src/http/http_transport.rs index eb49f41af..57d75ef43 100644 --- a/core/sdk/src/http/http_transport.rs +++ b/core/sdk/src/http/http_transport.rs @@ -20,7 +20,8 @@ use iggy_common::{IdentityInfo, IggyError}; use reqwest::{Response, Url}; use serde::Serialize; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait HttpTransport { /// Get full URL for the provided path. fn get_url(&self, path: &str) -> Result<Url, IggyError>; diff --git a/core/sdk/src/http/messages.rs b/core/sdk/src/http/messages.rs index 50153f004..4c429c67d 100644 --- a/core/sdk/src/http/messages.rs +++ b/core/sdk/src/http/messages.rs @@ -27,7 +27,8 @@ use iggy_common::IggyMessagesBatch; use iggy_common::MessageClient; use iggy_common::flush_unsaved_buffer::FlushUnsavedBuffer; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl MessageClient for HttpClient { async fn poll_messages( &self, diff --git a/core/sdk/src/http/partitions.rs b/core/sdk/src/http/partitions.rs index f74e41544..67dfc76be 100644 --- a/core/sdk/src/http/partitions.rs +++ b/core/sdk/src/http/partitions.rs @@ -24,7 +24,8 @@ use iggy_common::PartitionClient; use iggy_common::create_partitions::CreatePartitions; use iggy_common::delete_partitions::DeletePartitions; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl PartitionClient for HttpClient { async fn create_partitions( &self, diff --git a/core/sdk/src/http/personal_access_tokens.rs b/core/sdk/src/http/personal_access_tokens.rs index 902f0baf4..29d8fb80a 100644 --- a/core/sdk/src/http/personal_access_tokens.rs +++ b/core/sdk/src/http/personal_access_tokens.rs @@ -30,7 +30,8 @@ use secrecy::SecretString; const PATH: &str = "/personal-access-tokens"; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl PersonalAccessTokenClient for HttpClient { async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { let response = self.get(PATH).await?; diff --git a/core/sdk/src/http/segments.rs b/core/sdk/src/http/segments.rs index 27b245eb8..23d89f351 100644 --- a/core/sdk/src/http/segments.rs +++ b/core/sdk/src/http/segments.rs @@ -22,7 +22,8 @@ use async_trait::async_trait; use iggy_common::SegmentClient; use iggy_common::delete_segments::DeleteSegments; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl SegmentClient for HttpClient { async fn delete_segments( &self, diff --git a/core/sdk/src/http/streams.rs b/core/sdk/src/http/streams.rs index b2a5afb17..86823b3e4 100644 --- a/core/sdk/src/http/streams.rs +++ b/core/sdk/src/http/streams.rs @@ -28,7 +28,8 @@ use iggy_common::{Stream, StreamDetails}; const PATH: &str = "/streams"; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl StreamClient for HttpClient { async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { let response = self.get(&get_details_path(&stream_id.as_cow_str())).await; diff --git a/core/sdk/src/http/system.rs b/core/sdk/src/http/system.rs index 295925c7a..8428e955a 100644 --- a/core/sdk/src/http/system.rs +++ b/core/sdk/src/http/system.rs @@ -32,7 +32,8 @@ const CLIENTS: &str = "/clients"; const STATS: &str = "/stats"; const SNAPSHOT: &str = "/snapshot"; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl SystemClient for HttpClient { async fn get_stats(&self) -> Result<Stats, IggyError> { let response = self.get(STATS).await?; diff --git a/core/sdk/src/http/topics.rs b/core/sdk/src/http/topics.rs index 9ef0227d8..5e122ec23 100644 --- a/core/sdk/src/http/topics.rs +++ b/core/sdk/src/http/topics.rs @@ -25,7 +25,8 @@ use iggy_common::create_topic::CreateTopic; use iggy_common::update_topic::UpdateTopic; use iggy_common::{Topic, TopicDetails}; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TopicClient for HttpClient { async fn get_topic( &self, diff --git a/core/sdk/src/http/users.rs b/core/sdk/src/http/users.rs index 08ad3b504..aa7b3a26d 100644 --- a/core/sdk/src/http/users.rs +++ b/core/sdk/src/http/users.rs @@ -31,7 +31,8 @@ use secrecy::SecretString; const PATH: &str = "/users"; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl UserClient for HttpClient { async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { let response = self.get(&format!("{PATH}/{user_id}")).await; diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index 9fa07df91..ee643f06f 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -16,15 +16,31 @@ * under the License. */ +/// Declares modules that are only available on non-WASM targets. +macro_rules! native_modules { + ($($(#[$meta:meta])* $vis:vis mod $name:ident;)*) => { + $( + #[cfg(not(target_arch = "wasm32"))] + $(#[$meta])* + $vis mod $name; + )* + }; +} + +// Cross-platform modules. pub mod binary; -pub mod client_provider; -pub mod client_wrappers; -pub mod clients; -pub mod consumer_ext; pub mod http; -mod leader_aware; pub mod prelude; -pub mod quic; -pub mod stream_builder; -pub mod tcp; -pub mod websocket; + +// Native-only modules (require tokio, native networking, etc.). +native_modules! { + pub mod client_provider; + pub mod client_wrappers; + pub mod clients; + pub mod consumer_ext; + mod leader_aware; + pub mod quic; + pub mod stream_builder; + pub mod tcp; + pub mod websocket; +} diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 3cb66f07a..8f6c967ec 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -27,27 +27,36 @@ //! use iggy::prelude::*; //! ``` -pub use crate::client_provider; -pub use crate::client_provider::ClientProviderConfig; -pub use crate::client_wrappers::client_wrapper::ClientWrapper; -pub use crate::client_wrappers::connection_info::ConnectionInfo; -pub use crate::clients::client::IggyClient; -pub use crate::clients::client_builder::IggyClientBuilder; -pub use crate::clients::consumer::{ - AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage, -}; -pub use crate::clients::consumer_builder::IggyConsumerBuilder; -pub use crate::clients::producer::IggyProducer; -pub use crate::clients::producer_builder::IggyProducerBuilder; -pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig}; -pub use crate::clients::producer_sharding::{BalancedSharding, OrderedSharding, Sharding}; -pub use crate::consumer_ext::IggyConsumerMessageExt; -pub use crate::stream_builder::IggyConsumerConfig; -pub use crate::stream_builder::IggyStreamConsumer; -pub use crate::stream_builder::{IggyProducerConfig, IggyStreamProducer}; -pub use crate::stream_builder::{IggyStream, IggyStreamConfig}; -pub use crate::tcp::tcp_client::TcpClient; -pub use crate::websocket::websocket_client::WebSocketClient; +// Native-only re-exports (transports, high-level client, producer/consumer). +#[cfg(not(target_arch = "wasm32"))] +mod native { + pub use crate::client_provider; + pub use crate::client_provider::ClientProviderConfig; + pub use crate::client_wrappers::client_wrapper::ClientWrapper; + pub use crate::client_wrappers::connection_info::ConnectionInfo; + pub use crate::clients::client::IggyClient; + pub use crate::clients::client_builder::IggyClientBuilder; + pub use crate::clients::consumer::{ + AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage, + }; + pub use crate::clients::consumer_builder::IggyConsumerBuilder; + pub use crate::clients::producer::IggyProducer; + pub use crate::clients::producer_builder::IggyProducerBuilder; + pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig}; + pub use crate::clients::producer_sharding::{BalancedSharding, OrderedSharding, Sharding}; + pub use crate::consumer_ext::IggyConsumerMessageExt; + pub use crate::stream_builder::IggyConsumerConfig; + pub use crate::stream_builder::IggyStreamConsumer; + pub use crate::stream_builder::{IggyProducerConfig, IggyStreamProducer}; + pub use crate::stream_builder::{IggyStream, IggyStreamConfig}; + pub use crate::tcp::tcp_client::TcpClient; + pub use crate::websocket::websocket_client::WebSocketClient; +} +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; + +// Cross-platform re-exports (HTTP client, common types, traits, constants). +pub use crate::http::http_client::HttpClient; pub use iggy_common::{ Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, CacheMetrics, CacheMetricsKey, ClientError, ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus,
