withNumShards(5) generates 5 random shards. It turns out that statistically
when you generate 5 random shards and you have 5 works, the probability is
reasonably high that some workers will get more than one shard (and as a
result not all workers will participate). Are you able to set the number of
shards larger than 5?

On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <[email protected]> wrote:

> cc (dev)
>
> I tried to run the example with FlinkRunner in batch mode and received
> again bad data spread among the workers.
>
> When I tried to remove number of shards for batch mode in above example,
> pipeline crashed before launch
>
> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
> incompatible triggers:
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> entCountAtLeast(10000)),
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> hour)))),
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> rever(AfterPane.elementCountAtLeast(1)),
> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>
>
>
>
>
> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <[email protected]>
> wrote:
>
>> Hi Max,
>>
>> I forgot to mention that example is run in streaming mode, therefore I
>> can not do writes without specifying shards. FileIO explicitly asks for
>> them.
>>
>> I am not sure where the problem is. FlinkRunner is only one I used.
>>
>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Hi Jozef,
>>>
>>> This does not look like a FlinkRunner related problem, but is caused by
>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>> which apparently does not lead to good data spread in your case.
>>>
>>> Do you see the same behavior without `withNumShards(5)`?
>>>
>>> Thanks,
>>> Max
>>>
>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>> > Hello,
>>> >
>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>> at
>>> > the shuffle side where data per window fire are written to the
>>> > filesystem receive unbalanced number of events.
>>> >
>>> > Here is a naive code example:
>>> >
>>> >      val read = KafkaIO.read()
>>> >          .withTopic("topic")
>>> >          .withBootstrapServers("kafka1:9092")
>>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>>> >          .withProcessingTime()
>>> >
>>> >      pipeline
>>> >          .apply(read)
>>> >          .apply(MapElements.via(new
>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>> >            override def apply(input: KafkaRecord[Array[Byte],
>>> > Array[Byte]]): String = {
>>> >              new String(input.getKV.getValue, "UTF-8")
>>> >            }
>>> >          }))
>>> >
>>> >
>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>> >              .triggering(AfterWatermark.pastEndOfWindow()
>>> >
>>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>> >
>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>> >
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>> >
>>> >
>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>> >              .discardingFiredPanes()
>>> >              .withAllowedLateness(Duration.standardDays(7)))
>>> >
>>> >          .apply(FileIO.write()
>>> >              .via(TextIO.sink())
>>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>> >              .withTempDirectory(tempLocation)
>>> >              .withNumShards(5))
>>> >
>>> >
>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>> > number of shards), I would expect that each worker will participate on
>>> > persisting shards and equally, since code uses fixed number of shards
>>> > (and random shard assign?). But reality is different (see 2
>>> attachements
>>> > - statistiscs from flink task reading from kafka and task writing to
>>> files)
>>> >
>>> > What am I missing? How to achieve balanced writes?
>>> >
>>> > Thanks,
>>> > Jozef
>>> >
>>> >
>>>
>>

Reply via email to