On Wed, Aug 3, 2022 at 10:03 AM Chamikara Jayalath <chamik...@google.com>
wrote:

>
>
> On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva <
> elizaveta.lomt...@akvelon.com> wrote:
>
>> Hi community!
>>
>> Our team is working on the SparkReceiverIO connector for Apache Beam. We
>> have published SparkReceiverIO.Read PR [1].
>>
>>
>>    -
>>
>>    We are working with the Spark Receiver class [2]. Receiver should
>>    ideally implement HasOffset interface [3] with the setStartOffset()
>>    method, so we can start multiple Receivers from some offset for the
>>    specific Restriction.
>>    -
>>
>>    Also SplittableDoFn implies presence of the RestrictionTracker which
>>    has .tryClaim(offset) method. So there should be an ability to get
>>    offset for the current record from the Receiver.
>>
>>
>> Let’s imagine we are dealing with a simple Receiver, that doesn’t
>> implement HasOffset interface [3], and we are thinking about using the
>> SDF approach for this case as well. There are some unresolved questions:
>>
>>    1.
>>
>>    Since we don’t have the ability to start multiple Receivers from
>>    different offsets, we need to start only one main Receiver [0 ;
>>    +inf). What is the best place for doing this? (Currently, it’s the
>>    constructor of the SDF).
>>
>>
> When you provide the initial restriction, you can simply provide a tracker
> (OffsetRangeTracker) that only contains one offset, so the range will be
> [0, 1].
>

Sorry, should be [0, 1).
Also, probably makes sense to develop an UnsplittableRangeTracker that can
also be used in other contexts. We have something similar for Python with
the old RangeTracker interface.
https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/range_trackers.py#L301



> Within the splitRestriction() method, return the same restriction.
>
> This will result in a SDF that will not be split either through initial
> splitting or dynamic work rebalancing.
>
> This will not be very efficient, but hopefully this will generalize your
> solution for all receivers, whether they support offsets or not.
>
>
>>
>>    1.
>>    2.
>>
>>    All records coming from the Receiver should be stored in some buffer.
>>    Since SDF objects are serialized, what is the best way to provide a
>>    link to the shared buffer for all of them?
>>
>>
>>    1.
>>    2.
>>
>>    How to correctly stop our main Receiver? (also serialization problem)
>>    3.
>>
>>    There are no tangible benefits from using SDF - we can’t parallelize
>>    reading, because there will be a single-thread Receiver limitation.
>>    4.
>>
>>    What if we are dealing with the Receiver that doesn’t have the
>>    ability to determine offset for the current record?
>>
>>
> Could you clarify why these questions could be addressed for
> UnboundedSource but not SDF. AFAICT these should be addressed with either
> source framework (probably in a similar way). For example, both types of
> source objects get serialized by runners.
>
> Thanks,
> Cham
>
>
>>
>>    1.
>>
>>
>> A possible solution that we see is to use the UnboundedSource approach,
>> as we did earlier in Read from Spark Receiver via the UnboundedSource PoC
>> branch [4]. It looks like we can resolve all the questions above by
>> implementing it. But the UnboundedSource is deprecated.
>>
>> Could someone give us advice on how can we manage working with Receivers
>> without offset in our case?
>>
>> Any ideas or comments would be greatly appreciated.
>>
>> Thanks for your attention to it!
>>
>> Elizaveta
>>
>> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 –
>> https://github.com/apache/beam/pull/17828
>>
>> [2] Spark Streaming Custom Receivers –
>> https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>
>> [3] HasOffset interface –
>> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
>>
>> [4] SparkReceiverIO: Read via UnboundedSource –
>> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0
>>
>>
>>

Reply via email to