This is an automated email from the ASF dual-hosted git repository.
spetz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 5fbeddca4 fix(connectors): reject duplicate iggy_sink_open and
iggy_source_open (#3179)
5fbeddca4 is described below
commit 5fbeddca49375c35096a38dadde3c164f175820a
Author: Mukunda Rao Katta <[email protected]>
AuthorDate: Fri May 1 05:29:43 2026 -0700
fix(connectors): reject duplicate iggy_sink_open and iggy_source_open
(#3179)
The `sink_connector!` and `source_connector!` macros silently overwrote
an existing `INSTANCES` entry when `iggy_sink_open` or
`iggy_source_open` was called twice with the same id. Any in-flight
buffered data and the prior task were lost without any signal to the
caller. Returning -1 on a duplicate id surfaces the mistake so callers
drain or close explicitly first.
Closes #3168
---
core/connectors/sdk/src/sink.rs | 7 +++++++
core/connectors/sdk/src/source.rs | 7 +++++++
2 files changed, 14 insertions(+)
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 9d21f05ee..6e3d83bb1 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -236,6 +236,13 @@ macro_rules! sink_connector {
config_len: usize,
log_callback: LogCallback,
) -> i32 {
+ if INSTANCES.contains_key(&id) {
+ // Duplicate id: caller did not close before reopening. Without
+ // this guard the existing entry would be silently overwritten,
+ // discarding any in-flight buffered data and orphaning tasks.
+ return -1;
+ }
+
let mut container = SinkContainer::new(id);
let result = container.open(id, config_ptr, config_len,
log_callback, <$type>::new);
INSTANCES.insert(id, container);
diff --git a/core/connectors/sdk/src/source.rs
b/core/connectors/sdk/src/source.rs
index d5486f24f..0209027e0 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -226,6 +226,13 @@ macro_rules! source_connector {
state_len: usize,
log_callback: LogCallback,
) -> i32 {
+ if INSTANCES.contains_key(&id) {
+ // Duplicate id: caller did not close before reopening. Without
+ // this guard the existing entry would be silently overwritten,
+ // discarding any in-flight buffered data and orphaning tasks.
+ return -1;
+ }
+
let mut container = SourceContainer::new(id);
let result = container.open(
id,