keksmd opened a new pull request, #52218:
URL: https://github.com/apache/spark/pull/52218

   What changes were proposed in this pull request?
   
   This PR introduces a new DataSource V2 streaming connector for RabbitMQ 
Streams while keeping the existing legacy V1 implementation for backward 
compatibility.
   
   Key changes:
        •       Added RmqSourceProvider (TableProvider) with schema inference 
and table instantiation.
        •       Implemented RmqTable with declared MICRO_BATCH_READ capability.
        •       Added RmqScanBuilder and RmqScan to construct streaming scans.
        •       Implemented RmqMicroBatchStream that manages offsets, plans 
input partitions, and creates partition readers.
        •       Introduced RmqInputPartition and RmqReaderFactory for 
converting RabbitMQ messages into Spark InternalRows.
        •       Defined RmqLongOffset as a simple JSON-serializable offset 
representation.
        •       Added INFO/DEBUG logging for debugging offsets and message 
handling.
        •       Left the old V1 Source implementation in the codebase to avoid 
breaking existing jobs.
   
   Why are the changes needed?
        •       The V1 API (Source) is deprecated in Spark 3.5.x and no longer 
recommended for new connectors.
        •       Migrating to DataSource V2 provides:
        •       Compatibility with Spark 3.5.x and future versions.
        •       Cleaner integration with Structured Streaming (offset 
management, checkpointing).
        •       Easier path to future features such as RabbitMQ super streams 
(partitioned parallelism).
        •       Keeping the V1 implementation allows a smoother migration path 
for existing users.
   
   Does this PR introduce any user-facing change?
   
   Yes.
   Users can now use new sources:
   
   ```
   spark.readStream
    .format("org.apache.spark.sql.v2.rabbitmq.RmqStreamingSourceProviderV2")
     .option("rmq.host", "rabbitmq-service")
     .option("rmq.queuename", "events-stream")
     .load()
   ```
   OR
   ```
   spark.readStream
     .format("org.apache.spark.sql.rabbitmq.RmqStreamingSourceProvider")
     .option("rmq.host", "rabbitmq-service")
     .option("rmq.queuename", "events-stream")
     .load()
   ```
   
   This runs on the V2 API and works with Spark 3.5.x micro-batch streaming.
   The old V1 implementation is still available for backward compatibility.
   
   How was this patch tested?
        •       No new automated tests have been added yet.
   Manual verification was done with a RabbitMQ broker:
        •       Produced messages into a stream queue.
        •       Consumed them using the new V2 connector in a Spark Structured 
Streaming query.
        •       Validated offset persistence and basic restart behavior.
        •       Test coverage for V2 integration should be added in a follow-up 
PR.
   
   Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: ChatGPT-5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to