Hi community!

Our team is working on the SparkReceiverIO connector for Apache Beam. We have 
published SparkReceiverIO.Read PR [1].


  *   We are working with the Spark Receiver class [2]. Receiver should ideally 
implement HasOffset interface [3] with the setStartOffset() method, so we can 
start multiple Receivers from some offset for the specific Restriction.

  *   Also SplittableDoFn implies presence of the RestrictionTracker which has 
.tryClaim(offset) method. So there should be an ability to get offset for the 
current record from the Receiver.


Let’s imagine we are dealing with a simple Receiver, that doesn’t implement 
HasOffset interface [3], and we are thinking about using the SDF approach for 
this case as well. There are some unresolved questions:

  1.  Since we don’t have the ability to start multiple Receivers from 
different offsets, we need to start only one main Receiver [0 ; +inf). What is 
the best place for doing this? (Currently, it’s the constructor of the SDF).

  2.  All records coming from the Receiver should be stored in some buffer. 
Since SDF objects are serialized, what is the best way to provide a link to the 
shared buffer for all of them?

  3.  How to correctly stop our main Receiver? (also serialization problem)

  4.  There are no tangible benefits from using SDF - we can’t parallelize 
reading, because there will be a single-thread Receiver limitation.

  5.  What if we are dealing with the Receiver that doesn’t have the ability to 
determine offset for the current record?


A possible solution that we see is to use the UnboundedSource approach, as we 
did earlier in Read from Spark Receiver via the UnboundedSource PoC branch [4]. 
It looks like we can resolve all the questions above by implementing it. But 
the UnboundedSource is deprecated.


Could someone give us advice on how can we manage working with Receivers 
without offset in our case?

Any ideas or comments would be greatly appreciated.


Thanks for your attention to it!

Elizaveta


[1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – 
https://github.com/apache/beam/pull/17828

[2] Spark Streaming Custom Receivers – 
https://spark.apache.org/docs/latest/streaming-custom-receivers.html

[3] HasOffset interface – 
https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java

[4] SparkReceiverIO: Read via UnboundedSource – 
https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0


Reply via email to