I found the problem and, of course, it was between my desk and my chair.
For side outputs Flink UI correctly reports (S+N) * Slots. Since the
CoProcessFunction late-join was hashed that factored to S+N. Thanks for the
help!

On Tue, Jul 3, 2018 at 3:51 AM, Chesnay Schepler <ches...@apache.org> wrote:

> Let's see if i understood everything correctly:
>
> 1)
> Let's say that metadata contains N records.
>
> The UI output metrics indicate that *metadata *sends N records.
> The UI input metrics for *join *and *late-join* do each include N records
> (i.e N + whatever other data they receive).
>
> You expected that the output of *metadata *be 2*N since they are
> broadcasted to 2 operators.
>
> If so, then the metrics work as intended; they count the number of records
> that the operator emits; the duplication happens behind the scenes
> *somewhere* outside the operator. In other words, the metric counts the
> number of *Collector#collect()* calls.
>
> 2)
> Let's say that *join *emits M records via the main output, and S records
> via the side-output.
>
> The UI input metrics for *late-join *indicate that M records have been
> received.
>
> You expected the input for *late-join* to be S + N instead, the
> side-output + broadcast data (see 1) ).
>
> If so, then yeah that's weird and shouldn't happen.
>
> For clarification:
> You use the broadcast variable for the *join *operator, but
> KeyedBroadcastMetadataJoin.broadcast(metadata) for the* late-join*.
> Is this intended, or just a copy&paste error?
>
>
> On 03.07.2018 04:16, Cliff Resnick wrote:
>
> Our topology has a metadata source that we push via Broadcast. Because
> this metadata source is critical, but sometimes late, we added a buffering
> mechanism via a SideOutput. We call the initial look-up from Broadcast
> "join"  and the secondary, state-backed buffered  lookup, "late-join"
>
> Today I noticed that if we implement the late join using a
> KeyedBroadcastProcessFunction, (so we can set TTL timers while using
> broadcast) everything seems to work. However, even though our
> internal metrics show the correct numbers, the numbers in the Flink UI
> falsely indicates that:
>
> 1) No broadcast data is sent to the late join, meaning Flink metrics
> for the metadata operator does not indicate any extra records sent.
> 2) Primary Join's main stream (not Side Output) is indicated as being sent
> to Late Join, meaning the Flink metrics input record number from Primary
> Join matches Late Join's, even though our logs and internal metrics might
> show zero traffic.
>
> If I do the late join via CoProcessFunction using a metadata keyed stream
> instead of broadcast, then the Flink UI shows the correct numbers
> (unfortunately there is another side issue when we take that tack but I
> won't go into that here).
>
> I hope this was not too confusing. Again the issue is not that this does
> not work -- it just looks like it does not work in the Flink UI.
>
> Below is the approximate code. Perhaps I'm doing something wrong that
> causes the weird reporting?
>
> val metadata = MetadataTable
>   .streamFromKafka(env)
>
> val broadcast = createBroadcast(metadata)
>
> val metadataJoined = sourceTables .union(source1Tables)
> .union(source2Tables) .connect(broadcast) .process(BroadcastMetadataJoin())
> // this operator will send side output data using Metadata.sideOutputTag
>
>   .name("join")
>
> val lateJoined = metadataJoined
>   .getSideOutput(Metadata.sideOutputTag)
>   .keyBy(_.primaryKey.getMetadataId)
>   .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
>   .process(KeyedBroadcastMetadataJoin())
>   .name("late-join")
>
>
>
>
>

Reply via email to