The bucketing "error" is likely related to what windowing strategy/pipeline
shape you have. Have you tried running your SDF inside an empty pipeline
possibly followed by a ParDo to log what records you are seeing?

On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
[email protected]> wrote:

> Thank's for the willingness to help out. The general context is that we
> are developing a set of new Beam based connectors/readers.
>
> I had hoped that SDF was ready for use with Dataflow--just because the
> interface is nice to work with. In general, would you recommend that we
> look at the legacy source APIs for building our connectors/readers?
>

I would not. A few contributors have been making rapid progress over the
past few months to finish SDFs with Python done from an API standpoint
(there is some additional integration/scaling testing going on), Java is
missing progress reporting from the API and watermark estimation but I was
hoping to finish those API pieces this month and Go has started on the
batch API implementation.


>
> Anyways, I have pasted the skeleton of the SDF below (I apologize for the
> bad formatting--still learning the grips of communicating code via e-mail).
> . We have used the overall pattern from the file watcher. I.e. the SDF
> creates "poll requests" at regular intervals which a downstream parDo
> executes. The SDF uses the built-in OffserRange as the basis for the range
> tracker.
>
> I am happy to receive any pointers on improvements, changes, debugging
> paths.
>
> /**
>  * This function generates an unbounded stream of source queries.
>  */
> @DoFn.UnboundedPerElement
> public class GenerateTsPointRequestsUnboundFn extends
> DoFn<RequestParameters, RequestParameters> {
>
>     @Setup
>     public void setup() {
>         validate();
>     }
>
>     @ProcessElement
>     public ProcessContinuation processElement(@Element Element
> inputElement,
>
> RestrictionTracker<OffsetRange, Long> tracker,
>
> OutputReceiver<outputElement> out,
>                                               ProcessContext context)
> throws Exception {
>
>         long startRange = tracker.currentRestriction().getFrom();
>         long endRange = tracker.currentRestriction().getTo();
>
>         while (startRange < (System.currentTimeMillis() -
> readerConfig.getPollOffset().get().toMillis())) {
>             // Set the query's max end to current time - offset.
>             if (endRange > (System.currentTimeMillis() -
> readerConfig.getPollOffset().get().toMillis())) {
>                 endRange = (System.currentTimeMillis() -
> readerConfig.getPollOffset().get().toMillis());
>             }
>
>             if (tracker.tryClaim(endRange - 1)) {
>

Why do you try and claim to the endRange here? Shouldn't you claim
subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
..., [start+pollsize*N, end)?

Also, if start is significantly smaller then current time, you could
implement the @SplitRestriction method.
https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990


>
>
> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>                 out.outputWithTimestamp(buildOutputElement(inputElement,
> startRange, endRange),
>                         org.joda.time.Instant.ofEpochMilli(startRange));
>
>                 // Update the start and end range for the next iteration
>                 startRange = endRange;
>                 endRange = tracker.currentRestriction().getTo();
>             } else {
>                 LOG.info(localLoggingPrefix + "Stopping work due to
> checkpointing or splitting.");
>                 return ProcessContinuation.stop();
>             }
>
>             if (startRange >= tracker.currentRestriction().getTo()) {
>                 LOG.info(localLoggingPrefix + "Completed the request time
> range. Will stop the reader.");
>                 return ProcessContinuation.stop();
>             }
>
>             return
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>                     readerConfig.getPollInterval().get().toMillis()));
>         }
>
>         return
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>                 readerConfig.getPollInterval().get().toMillis()));
>     }
>
>     private OutputElement buildOutputElement(Element element,
>                                                      long start,
>                                                      long end) {
>         return outputElement
>                 .withParameter(START_KEY, start)
>                 .withParameter(END_KEY, end);
>     }
>
>     @GetInitialRestriction
>     public OffsetRange getInitialRestriction(Element element) throws
> Exception {
>         return new OffsetRange(startTimestamp, endTimestamp);
>     }
> }
>
>
> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:
>
>> SplittableDoFn has experimental support within Dataflow so the way you
>> may be using it could be correct but unsupported.
>>
>> Can you provide snippets/details of your splittable dofn implementation?
>>
>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>> [email protected]> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>
>>> This happens at the very startup of the job execution, and I am unable
>>> to find any pointer as to where in the code/job definition the origin of
>>> the conflict is. The same job runs just fine in the DirectRunner.
>>>
>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>> both a windowing transform and without a windowing transform--both fail
>>> with the same result on Dataflow.
>>>
>>> This is my first foray into splittable DoFn territory so I am sure I
>>> have just made some basic missteps.
>>>
>>> Cheers,
>>> Kjetil
>>>
>>>
>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | [email protected]
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | [email protected]
> www.cognite.com | LIBERATE YOUR DATA™
>
>

Reply via email to