Let's see if i understood everything correctly:
1)
Let's say that metadata contains N records.
The UI output metrics indicate that /metadata /sends N records.
The UI input metrics for /join /and /late-join/ do each include N
records (i.e N + whatever other data they receive).
You expected that the output of /metadata /be 2*N since they are
broadcasted to 2 operators.
If so, then the metrics work as intended; they count the number of
records that the operator emits; the duplication happens behind the
scenes /somewhere/ outside the operator. In other words, the metric
counts the number of /Collector#collect()/ calls.
2)
Let's say that /join /emits M records via the main output, and S records
via the side-output.
The UI input metrics for /late-join /indicate that M records have been
received.
You expected the input for /late-join/ to be S + N instead, the
side-output + broadcast data (see 1) ).
If so, then yeah that's weird and shouldn't happen.
For clarification:
You use the broadcast variable for the /join /operator, but
KeyedBroadcastMetadataJoin.broadcast(metadata) for the/late-join/.
Is this intended, or just a copy&paste error?
On 03.07.2018 04:16, Cliff Resnick wrote:
Our topology has a metadata source that we push via Broadcast. Because
this metadata source is critical, but sometimes late, we added a
buffering mechanism via a SideOutput. We call the initial look-up from
Broadcast "join" and the secondary, state-backed buffered lookup,
"late-join"
Today I noticed that if we implement the late join using a
KeyedBroadcastProcessFunction, (so we can set TTL timers while using
broadcast) everything seems to work. However, even though our
internal metrics show the correct numbers, the numbers in the Flink UI
falsely indicates that:
1) No broadcast data is sent to the late join, meaning Flink metrics
for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being
sent to Late Join, meaning the Flink metrics input record number from
Primary Join matches Late Join's, even though our logs and internal
metrics might show zero traffic.
If I do the late join via CoProcessFunction using a metadata keyed
stream instead of broadcast, then the Flink UI shows the correct
numbers (unfortunately there is another side issue when we take that
tack but I won't go into that here).
I hope this was not too confusing. Again the issue is not that this
does not work -- it just looks like it does not work in the Flink UI.
Below is the approximate code. Perhaps I'm doing something wrong that
causes the weird reporting?
val metadata = MetadataTable
.streamFromKafka(env)
val broadcast = createBroadcast(metadata)
val metadataJoined = sourceTables .union(source1Tables)
.union(source2Tables) .connect(broadcast)
.process(BroadcastMetadataJoin()) // this operator will send side
output data using Metadata.sideOutputTag
.name("join")
val lateJoined = metadataJoined .getSideOutput(Metadata.sideOutputTag)
.keyBy(_.primaryKey.getMetadataId)
.connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
.process(KeyedBroadcastMetadataJoin()) .name("late-join")