Hm which runner is this, does this reproduce with direct runner? The NullPointerException is particularly worrying, I'd like to investigate it.
On Thu, Apr 12, 2018 at 6:49 PM Jiayuan Ma <[email protected]> wrote: > Hi Eugene, > > Thanks for your reply. I'm no longer having the previous error. I think > that error might be because I didn't do a clean build after upgrading SDK from > 2.3.0 to 2.4.0. > > However, I'm having other exceptions with my SDF. > > java.lang.OutOfMemoryError: unable to create new native thread > java.lang.Thread.start0(Native Method) > java.lang.Thread.start(Thread.java:714) > java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) > java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1587) > java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) > java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) > java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729) > org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed(OutputAndTimeBoundedSplittableProcessElementInvoker.java:265) > org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.tryClaim(RestrictionTracker.java:75) > > and > > java.lang.NullPointerException > org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:96) > org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:216) > org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:369) > > The old pipeline I'm trying to optimize is like > > .apply(GroupByKey.create()) > .apply(ParDo.of(new DoFn<KV<String, Iterable<Object>>, KV<String, > KV<String, String>>> { > @ProcessElement > public void process(...) { > Iterable<Object> groupedValues = context.element().getValue(); > for (final Object o1 : groupedValues) { > for (final Object o2 : groupedValues) { > .... > } > } > } > })) > > The optimization I'm doing right now with SDF is roughly like > > @ProcessElement > public void processElement(ProcessContext context, OffsetRangeTracker > tracker) { > final Iterable<Object> groupedValues = context.element().getValue(); > final Iterator<Object> it = actions.iterator(); > > long index = tracker.currentRestriction().getFrom(); > Iterators.advance(it, Math.toIntExact(index)); > > for (; it.hasNext() && tracker.tryClaim(index); ++index) { > final Object o1 = it.next(); > for (final Object o2 : actions) { > ... same old logic ... > } > } > } > > @GetInitialRestriction > public OffsetRange getInitialRestriction(final KV<String, Iterable<Object>> > groupedValues) { > final long size = Iterables.size(groupedValues.getValue()); > return new OffsetRange(0, size); > } > > @SplitRestriction > public void splitRestriction(final KV<String, Iterable<Object>> groupedValues, > final OffsetRange range, final > OutputReceiver<OffsetRange> receiver) { > > final long size = Iterables.size(groupedValues.getValue()); > > for (final OffsetRange p : range.split(1000000 / size, 10)) { > receiver.output(p); > } > } > > @NewTracker > public OffsetRangeTracker newTracker(OffsetRange range) { > return new OffsetRangeTracker(range); > } > > > Jiayuan > > > > > On Wed, Apr 11, 2018 at 3:54 PM, Eugene Kirpichov <[email protected]> > wrote: > >> Hi! This looks concerning. Can you show a full code example please? Does >> it run in direct runner? >> >> On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <[email protected]> wrote: >> >>> Hi all, >>> >>> I'm trying to use ReplicateFn mentioned in this >>> <https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up >>> a nested for loop. The use case is exactly the same as "*Counting >>> friends in common (cross join by key)*" section. However, I have >>> trouble to make it work with beam 2.4.0 SDK. >>> >>> I'm implementing @SplitRestriction as follows: >>> >>> @SplitRestriction >>> public void splitRestriction(A element, OffsetRange range, >>> OutputReceiver<OffsetRange> out) { >>> for (final OffsetRange p : range.split(1000, 10)) { >>> out.output(p); >>> } >>> } >>> >>> Dataflow runner throws exception like this: >>> >>> java.util.NoSuchElementException >>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63) >>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308) >>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294) >>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97) >>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71) >>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61) >>> com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323) >>> com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43) >>> com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48) >>> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200) >>> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158) >>> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211) >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137) >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959) >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) >>> >>> I also tried the following as suggested by the javadoc >>> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html> >>> but >>> it has errors during pipeline construction. >>> >>> @SplitRestriction >>> public List<OffsetRange> splitRestriction(A element, OffsetRange range) >>> { >>> return range.split(1000, 10); >>> } >>> >>> Without implementing @SplitRestriction, my pipeline can run without any >>> errors. However, I think the SDF is not really splitted by default, which >>> defeats the purpose of improving performance. >>> >>> Does anyone know if @SplitRestriction is currently supported by >>> Dataflow runner? How can I write a working version with the latest SDK? >>> >>> Thanks, >>> Jiayuan >>> >> >
