withNumShards(5) generates 5 random shards. It turns out that statistically when you generate 5 random shards and you have 5 works, the probability is reasonably high that some workers will get more than one shard (and as a result not all workers will participate). Are you able to set the number of shards larger than 5?
On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <[email protected]> wrote: > cc (dev) > > I tried to run the example with FlinkRunner in batch mode and received > again bad data spread among the workers. > > When I tried to remove number of shards for batch mode in above example, > pipeline crashed before launch > > Caused by: java.lang.IllegalStateException: Inputs to Flatten had > incompatible triggers: > AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem > entCountAtLeast(10000)), > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 > hour)))), > AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo > rever(AfterPane.elementCountAtLeast(1)), > Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()))) > > > > > > On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <[email protected]> > wrote: > >> Hi Max, >> >> I forgot to mention that example is run in streaming mode, therefore I >> can not do writes without specifying shards. FileIO explicitly asks for >> them. >> >> I am not sure where the problem is. FlinkRunner is only one I used. >> >> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <[email protected]> >> wrote: >> >>> Hi Jozef, >>> >>> This does not look like a FlinkRunner related problem, but is caused by >>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle >>> which apparently does not lead to good data spread in your case. >>> >>> Do you see the same behavior without `withNumShards(5)`? >>> >>> Thanks, >>> Max >>> >>> On 22.10.18 11:57, Jozef Vilcek wrote: >>> > Hello, >>> > >>> > I am having some trouble to get a balanced write via FileIO. Workers >>> at >>> > the shuffle side where data per window fire are written to the >>> > filesystem receive unbalanced number of events. >>> > >>> > Here is a naive code example: >>> > >>> > val read = KafkaIO.read() >>> > .withTopic("topic") >>> > .withBootstrapServers("kafka1:9092") >>> > .withKeyDeserializer(classOf[ByteArrayDeserializer]) >>> > .withValueDeserializer(classOf[ByteArrayDeserializer]) >>> > .withProcessingTime() >>> > >>> > pipeline >>> > .apply(read) >>> > .apply(MapElements.via(new >>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() { >>> > override def apply(input: KafkaRecord[Array[Byte], >>> > Array[Byte]]): String = { >>> > new String(input.getKV.getValue, "UTF-8") >>> > } >>> > })) >>> > >>> > >>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1))) >>> > .triggering(AfterWatermark.pastEndOfWindow() >>> > >>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) >>> > >>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( >>> > >>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)), >>> > >>> > >>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) >>> > .discardingFiredPanes() >>> > .withAllowedLateness(Duration.standardDays(7))) >>> > >>> > .apply(FileIO.write() >>> > .via(TextIO.sink()) >>> > .withNaming(new SafeFileNaming(outputPath, ".txt")) >>> > .withTempDirectory(tempLocation) >>> > .withNumShards(5)) >>> > >>> > >>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to >>> > number of shards), I would expect that each worker will participate on >>> > persisting shards and equally, since code uses fixed number of shards >>> > (and random shard assign?). But reality is different (see 2 >>> attachements >>> > - statistiscs from flink task reading from kafka and task writing to >>> files) >>> > >>> > What am I missing? How to achieve balanced writes? >>> > >>> > Thanks, >>> > Jozef >>> > >>> > >>> >>
