keksmd opened a new pull request, #52218:
URL: https://github.com/apache/spark/pull/52218
What changes were proposed in this pull request?
This PR introduces a new DataSource V2 streaming connector for RabbitMQ
Streams while keeping the existing legacy V1 implementation for backward
compatibility.
Key changes:
• Added RmqSourceProvider (TableProvider) with schema inference
and table instantiation.
• Implemented RmqTable with declared MICRO_BATCH_READ capability.
• Added RmqScanBuilder and RmqScan to construct streaming scans.
• Implemented RmqMicroBatchStream that manages offsets, plans
input partitions, and creates partition readers.
• Introduced RmqInputPartition and RmqReaderFactory for
converting RabbitMQ messages into Spark InternalRows.
• Defined RmqLongOffset as a simple JSON-serializable offset
representation.
• Added INFO/DEBUG logging for debugging offsets and message
handling.
• Left the old V1 Source implementation in the codebase to avoid
breaking existing jobs.
Why are the changes needed?
• The V1 API (Source) is deprecated in Spark 3.5.x and no longer
recommended for new connectors.
• Migrating to DataSource V2 provides:
• Compatibility with Spark 3.5.x and future versions.
• Cleaner integration with Structured Streaming (offset
management, checkpointing).
• Easier path to future features such as RabbitMQ super streams
(partitioned parallelism).
• Keeping the V1 implementation allows a smoother migration path
for existing users.
Does this PR introduce any user-facing change?
Yes.
Users can now use new sources:
```
spark.readStream
.format("org.apache.spark.sql.v2.rabbitmq.RmqStreamingSourceProviderV2")
.option("rmq.host", "rabbitmq-service")
.option("rmq.queuename", "events-stream")
.load()
```
OR
```
spark.readStream
.format("org.apache.spark.sql.rabbitmq.RmqStreamingSourceProvider")
.option("rmq.host", "rabbitmq-service")
.option("rmq.queuename", "events-stream")
.load()
```
This runs on the V2 API and works with Spark 3.5.x micro-batch streaming.
The old V1 implementation is still available for backward compatibility.
How was this patch tested?
• No new automated tests have been added yet.
Manual verification was done with a RabbitMQ broker:
• Produced messages into a stream queue.
• Consumed them using the new V2 connector in a Spark Structured
Streaming query.
• Validated offset persistence and basic restart behavior.
• Test coverage for V2 integration should be added in a follow-up
PR.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: ChatGPT-5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]