Hi community! Our team is working on the SparkReceiverIO connector for Apache Beam. We have published SparkReceiverIO.Read PR [1].
* We are working with the Spark Receiver class [2]. Receiver should ideally implement HasOffset interface [3] with the setStartOffset() method, so we can start multiple Receivers from some offset for the specific Restriction. * Also SplittableDoFn implies presence of the RestrictionTracker which has .tryClaim(offset) method. So there should be an ability to get offset for the current record from the Receiver. Let’s imagine we are dealing with a simple Receiver, that doesn’t implement HasOffset interface [3], and we are thinking about using the SDF approach for this case as well. There are some unresolved questions: 1. Since we don’t have the ability to start multiple Receivers from different offsets, we need to start only one main Receiver [0 ; +inf). What is the best place for doing this? (Currently, it’s the constructor of the SDF). 2. All records coming from the Receiver should be stored in some buffer. Since SDF objects are serialized, what is the best way to provide a link to the shared buffer for all of them? 3. How to correctly stop our main Receiver? (also serialization problem) 4. There are no tangible benefits from using SDF - we can’t parallelize reading, because there will be a single-thread Receiver limitation. 5. What if we are dealing with the Receiver that doesn’t have the ability to determine offset for the current record? A possible solution that we see is to use the UnboundedSource approach, as we did earlier in Read from Spark Receiver via the UnboundedSource PoC branch [4]. It looks like we can resolve all the questions above by implementing it. But the UnboundedSource is deprecated. Could someone give us advice on how can we manage working with Receivers without offset in our case? Any ideas or comments would be greatly appreciated. Thanks for your attention to it! Elizaveta [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – https://github.com/apache/beam/pull/17828 [2] Spark Streaming Custom Receivers – https://spark.apache.org/docs/latest/streaming-custom-receivers.html [3] HasOffset interface – https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java [4] SparkReceiverIO: Read via UnboundedSource – https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0