On Wed, Aug 3, 2022 at 10:03 AM Chamikara Jayalath <chamik...@google.com> wrote:
> > > On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva < > elizaveta.lomt...@akvelon.com> wrote: > >> Hi community! >> >> Our team is working on the SparkReceiverIO connector for Apache Beam. We >> have published SparkReceiverIO.Read PR [1]. >> >> >> - >> >> We are working with the Spark Receiver class [2]. Receiver should >> ideally implement HasOffset interface [3] with the setStartOffset() >> method, so we can start multiple Receivers from some offset for the >> specific Restriction. >> - >> >> Also SplittableDoFn implies presence of the RestrictionTracker which >> has .tryClaim(offset) method. So there should be an ability to get >> offset for the current record from the Receiver. >> >> >> Let’s imagine we are dealing with a simple Receiver, that doesn’t >> implement HasOffset interface [3], and we are thinking about using the >> SDF approach for this case as well. There are some unresolved questions: >> >> 1. >> >> Since we don’t have the ability to start multiple Receivers from >> different offsets, we need to start only one main Receiver [0 ; >> +inf). What is the best place for doing this? (Currently, it’s the >> constructor of the SDF). >> >> > When you provide the initial restriction, you can simply provide a tracker > (OffsetRangeTracker) that only contains one offset, so the range will be > [0, 1]. > Sorry, should be [0, 1). Also, probably makes sense to develop an UnsplittableRangeTracker that can also be used in other contexts. We have something similar for Python with the old RangeTracker interface. https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/range_trackers.py#L301 > Within the splitRestriction() method, return the same restriction. > > This will result in a SDF that will not be split either through initial > splitting or dynamic work rebalancing. > > This will not be very efficient, but hopefully this will generalize your > solution for all receivers, whether they support offsets or not. > > >> >> 1. >> 2. >> >> All records coming from the Receiver should be stored in some buffer. >> Since SDF objects are serialized, what is the best way to provide a >> link to the shared buffer for all of them? >> >> >> 1. >> 2. >> >> How to correctly stop our main Receiver? (also serialization problem) >> 3. >> >> There are no tangible benefits from using SDF - we can’t parallelize >> reading, because there will be a single-thread Receiver limitation. >> 4. >> >> What if we are dealing with the Receiver that doesn’t have the >> ability to determine offset for the current record? >> >> > Could you clarify why these questions could be addressed for > UnboundedSource but not SDF. AFAICT these should be addressed with either > source framework (probably in a similar way). For example, both types of > source objects get serialized by runners. > > Thanks, > Cham > > >> >> 1. >> >> >> A possible solution that we see is to use the UnboundedSource approach, >> as we did earlier in Read from Spark Receiver via the UnboundedSource PoC >> branch [4]. It looks like we can resolve all the questions above by >> implementing it. But the UnboundedSource is deprecated. >> >> Could someone give us advice on how can we manage working with Receivers >> without offset in our case? >> >> Any ideas or comments would be greatly appreciated. >> >> Thanks for your attention to it! >> >> Elizaveta >> >> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – >> https://github.com/apache/beam/pull/17828 >> >> [2] Spark Streaming Custom Receivers – >> https://spark.apache.org/docs/latest/streaming-custom-receivers.html >> >> [3] HasOffset interface – >> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java >> >> [4] SparkReceiverIO: Read via UnboundedSource – >> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0 >> >> >>