[ 
https://issues.apache.org/jira/browse/BEAM-14394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531353#comment-17531353
 ] 

Bruno Volpato da Cunha commented on BEAM-14394:
-----------------------------------------------

In short, yes. There is a for loop that was parallelized using the common 
ForkJoinPool.

 

There are different ways to explore such workload, but I am not sure it gets to 
a point where this usage should be illegal. Let me know if you think 
differently.

Maybe Beam could either synchronize outputs for the same bundle? Or somehow 
fail nicely if the thread used to call output is different?

 

To illustrate, the following sample code is flaky:
{code:java}
public class Concurrency {

    public static final int FAN_OUT_MULTIPLIER = 32;

    public static void main(String[] args) {

        Pipeline p = Pipeline.create();

        List<Integer> values = new ArrayList<>();
        for (int i = 0; i < FAN_OUT_MULTIPLIER; i++) {
            values.add(i);
        }

        p.apply(Create.of(values))
         .apply("Fan-Out", ParDo.of(new DoFn<Integer, Integer>() {

             @ProcessElement
             public void processElement(ProcessContext context) {
                 values.parallelStream()
                       .forEach(i -> {
                           context.output(context.element() * 
FAN_OUT_MULTIPLIER + i);
                       });
             }
         }))
         .apply("Group", ParDo.of(new DoFn<Integer, Integer>() {
             private Map<Integer, Integer> map;

             @StartBundle
             public void startBundle() {
                 this.map = Maps.newHashMap();
             }

             @ProcessElement
             public void processElement(ProcessContext context) {
                 this.map.computeIfAbsent(context.element() % 
FAN_OUT_MULTIPLIER, k -> context.element());
             }
         }));

        System.out.println("Finished with state " + p.run().waitUntilFinish());
    }
}{code}

> BigQueryIO not thread safe, may fail with ConcurrentModificationException
> -------------------------------------------------------------------------
>
>                 Key: BEAM-14394
>                 URL: https://issues.apache.org/jira/browse/BEAM-14394
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.38.0
>            Reporter: Bruno Volpato da Cunha
>            Priority: P2
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> I am using the Java SDK with a ParDo, in which the code has some parallelism 
> (basically, a parallel Stream over an Iterable).
> This ParDo is outputting to a side output that is redirected to BigQuery, 
> using Streaming Inserts, with the following parameters:
>  
> {code:java}
> return BigQueryIO.<T>write()
>     .to(dynamicDestinations)
>     .withMethod(Write.Method.STREAMING_INSERTS)
>     .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>     .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>     .withCustomGcsTempLocation(StaticValueProvider.of(gcsTempLocation))
>     .withFormatFunction(formatFunction)
>     .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
>     .withExtendedErrorInfo(); {code}
>  
>  
> Sometimes, the following error is observed:
>  
> {code:java}
> org.apache.beam.sdk.util.UserCodeException: 
> java.util.ConcurrentModificationException
>       at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:85)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>       at 
> org.apache.beam.sdk.transforms.FlatMapElements$2.processElement(FlatMapElements.java:156)
>       at 
> org.apache.beam.sdk.transforms.FlatMapElements$2$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at <<Some Parallel Custom Code.java>>
>       at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>       at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>       at <<Some Parallel Custom Code.java>>
>         at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926)
>       at 
> java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
>       at 
> java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919)
>       at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>       at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>       at <<Some Parallel Custom Code.java>>
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>         at <<Some Parallel Custom Code.java>> 
>         at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:373)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
>       at 
> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.util.ConcurrentModificationException
>       at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:104)
> " {code}
>  
>  
> Unfortunately, I was trying to come up with a toy example to post here but 
> could not reproduce it there.
>  
> When investigating the class CreateTables, it instantiates one 
> _Map<DestinationT, TableDestination> destinations = Maps.newHashMap()_ ** per 
> bundle, and _processElement_ can happen concurrently, following the user's 
> code.
> The current implementation for the Map is a HashMap. I believe that either 
> synchronizing the _destinations.computeIfAbsent_ call or simply initializing 
> it with Maps.newConcurrentMap() would be enough.
>  
> I am open to implementing this change and leaning towards just moving to a 
> ConcurrentHashMap, so just checking if I am not missing any detail.
> (Edit: see [https://github.com/apache/beam/pull/17532] )



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to