This is an automated email from the ASF dual-hosted git repository.

spetz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fbeddca4 fix(connectors): reject duplicate iggy_sink_open and 
iggy_source_open (#3179)
5fbeddca4 is described below

commit 5fbeddca49375c35096a38dadde3c164f175820a
Author: Mukunda Rao Katta <[email protected]>
AuthorDate: Fri May 1 05:29:43 2026 -0700

    fix(connectors): reject duplicate iggy_sink_open and iggy_source_open 
(#3179)
    
    The `sink_connector!` and `source_connector!` macros silently overwrote
    an existing `INSTANCES` entry when `iggy_sink_open` or
    `iggy_source_open` was called twice with the same id. Any in-flight
    buffered data and the prior task were lost without any signal to the
    caller. Returning -1 on a duplicate id surfaces the mistake so callers
    drain or close explicitly first.
    Closes #3168
---
 core/connectors/sdk/src/sink.rs   | 7 +++++++
 core/connectors/sdk/src/source.rs | 7 +++++++
 2 files changed, 14 insertions(+)

diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 9d21f05ee..6e3d83bb1 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -236,6 +236,13 @@ macro_rules! sink_connector {
             config_len: usize,
             log_callback: LogCallback,
         ) -> i32 {
+            if INSTANCES.contains_key(&id) {
+                // Duplicate id: caller did not close before reopening. Without
+                // this guard the existing entry would be silently overwritten,
+                // discarding any in-flight buffered data and orphaning tasks.
+                return -1;
+            }
+
             let mut container = SinkContainer::new(id);
             let result = container.open(id, config_ptr, config_len, 
log_callback, <$type>::new);
             INSTANCES.insert(id, container);
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
index d5486f24f..0209027e0 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -226,6 +226,13 @@ macro_rules! source_connector {
             state_len: usize,
             log_callback: LogCallback,
         ) -> i32 {
+            if INSTANCES.contains_key(&id) {
+                // Duplicate id: caller did not close before reopening. Without
+                // this guard the existing entry would be silently overwritten,
+                // discarding any in-flight buffered data and orphaning tasks.
+                return -1;
+            }
+
             let mut container = SourceContainer::new(id);
             let result = container.open(
                 id,

Reply via email to