The bucketing "error" is likely related to what windowing strategy/pipeline shape you have. Have you tried running your SDF inside an empty pipeline possibly followed by a ParDo to log what records you are seeing?
On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < [email protected]> wrote: > Thank's for the willingness to help out. The general context is that we > are developing a set of new Beam based connectors/readers. > > I had hoped that SDF was ready for use with Dataflow--just because the > interface is nice to work with. In general, would you recommend that we > look at the legacy source APIs for building our connectors/readers? > I would not. A few contributors have been making rapid progress over the past few months to finish SDFs with Python done from an API standpoint (there is some additional integration/scaling testing going on), Java is missing progress reporting from the API and watermark estimation but I was hoping to finish those API pieces this month and Go has started on the batch API implementation. > > Anyways, I have pasted the skeleton of the SDF below (I apologize for the > bad formatting--still learning the grips of communicating code via e-mail). > . We have used the overall pattern from the file watcher. I.e. the SDF > creates "poll requests" at regular intervals which a downstream parDo > executes. The SDF uses the built-in OffserRange as the basis for the range > tracker. > > I am happy to receive any pointers on improvements, changes, debugging > paths. > > /** > * This function generates an unbounded stream of source queries. > */ > @DoFn.UnboundedPerElement > public class GenerateTsPointRequestsUnboundFn extends > DoFn<RequestParameters, RequestParameters> { > > @Setup > public void setup() { > validate(); > } > > @ProcessElement > public ProcessContinuation processElement(@Element Element > inputElement, > > RestrictionTracker<OffsetRange, Long> tracker, > > OutputReceiver<outputElement> out, > ProcessContext context) > throws Exception { > > long startRange = tracker.currentRestriction().getFrom(); > long endRange = tracker.currentRestriction().getTo(); > > while (startRange < (System.currentTimeMillis() - > readerConfig.getPollOffset().get().toMillis())) { > // Set the query's max end to current time - offset. > if (endRange > (System.currentTimeMillis() - > readerConfig.getPollOffset().get().toMillis())) { > endRange = (System.currentTimeMillis() - > readerConfig.getPollOffset().get().toMillis()); > } > > if (tracker.tryClaim(endRange - 1)) { > Why do you try and claim to the endRange here? Shouldn't you claim subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2), ..., [start+pollsize*N, end)? Also, if start is significantly smaller then current time, you could implement the @SplitRestriction method. https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 > > > context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); > out.outputWithTimestamp(buildOutputElement(inputElement, > startRange, endRange), > org.joda.time.Instant.ofEpochMilli(startRange)); > > // Update the start and end range for the next iteration > startRange = endRange; > endRange = tracker.currentRestriction().getTo(); > } else { > LOG.info(localLoggingPrefix + "Stopping work due to > checkpointing or splitting."); > return ProcessContinuation.stop(); > } > > if (startRange >= tracker.currentRestriction().getTo()) { > LOG.info(localLoggingPrefix + "Completed the request time > range. Will stop the reader."); > return ProcessContinuation.stop(); > } > > return > ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( > readerConfig.getPollInterval().get().toMillis())); > } > > return > ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( > readerConfig.getPollInterval().get().toMillis())); > } > > private OutputElement buildOutputElement(Element element, > long start, > long end) { > return outputElement > .withParameter(START_KEY, start) > .withParameter(END_KEY, end); > } > > @GetInitialRestriction > public OffsetRange getInitialRestriction(Element element) throws > Exception { > return new OffsetRange(startTimestamp, endTimestamp); > } > } > > > On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote: > >> SplittableDoFn has experimental support within Dataflow so the way you >> may be using it could be correct but unsupported. >> >> Can you provide snippets/details of your splittable dofn implementation? >> >> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >> [email protected]> wrote: >> >>> >>> Hi, >>> >>> I am looking for pointers to a Dataflow runner error message: Workflow >>> failed. Causes: Step s22 has conflicting bucketing functions, >>> >>> This happens at the very startup of the job execution, and I am unable >>> to find any pointer as to where in the code/job definition the origin of >>> the conflict is. The same job runs just fine in the DirectRunner. >>> >>> The job contains a splittable DoFn (unbound) and I have tried it with >>> both a windowing transform and without a windowing transform--both fail >>> with the same result on Dataflow. >>> >>> This is my first foray into splittable DoFn territory so I am sure I >>> have just made some basic missteps. >>> >>> Cheers, >>> Kjetil >>> >>> >>> >>> >>> -- >>> >>> *Kjetil Halvorsen* >>> Chief Architect, Enterprise Integration >>> +47 48 01 13 75 | [email protected] >>> www.cognite.com | LIBERATE YOUR DATA™ >>> >>> > > -- > > *Kjetil Halvorsen* > Chief Architect, Enterprise Integration > +47 48 01 13 75 | [email protected] > www.cognite.com | LIBERATE YOUR DATA™ > >
