I found the problem and, of course, it was between my desk and my chair. For side outputs Flink UI correctly reports (S+N) * Slots. Since the CoProcessFunction late-join was hashed that factored to S+N. Thanks for the help!
On Tue, Jul 3, 2018 at 3:51 AM, Chesnay Schepler <ches...@apache.org> wrote: > Let's see if i understood everything correctly: > > 1) > Let's say that metadata contains N records. > > The UI output metrics indicate that *metadata *sends N records. > The UI input metrics for *join *and *late-join* do each include N records > (i.e N + whatever other data they receive). > > You expected that the output of *metadata *be 2*N since they are > broadcasted to 2 operators. > > If so, then the metrics work as intended; they count the number of records > that the operator emits; the duplication happens behind the scenes > *somewhere* outside the operator. In other words, the metric counts the > number of *Collector#collect()* calls. > > 2) > Let's say that *join *emits M records via the main output, and S records > via the side-output. > > The UI input metrics for *late-join *indicate that M records have been > received. > > You expected the input for *late-join* to be S + N instead, the > side-output + broadcast data (see 1) ). > > If so, then yeah that's weird and shouldn't happen. > > For clarification: > You use the broadcast variable for the *join *operator, but > KeyedBroadcastMetadataJoin.broadcast(metadata) for the* late-join*. > Is this intended, or just a copy&paste error? > > > On 03.07.2018 04:16, Cliff Resnick wrote: > > Our topology has a metadata source that we push via Broadcast. Because > this metadata source is critical, but sometimes late, we added a buffering > mechanism via a SideOutput. We call the initial look-up from Broadcast > "join" and the secondary, state-backed buffered lookup, "late-join" > > Today I noticed that if we implement the late join using a > KeyedBroadcastProcessFunction, (so we can set TTL timers while using > broadcast) everything seems to work. However, even though our > internal metrics show the correct numbers, the numbers in the Flink UI > falsely indicates that: > > 1) No broadcast data is sent to the late join, meaning Flink metrics > for the metadata operator does not indicate any extra records sent. > 2) Primary Join's main stream (not Side Output) is indicated as being sent > to Late Join, meaning the Flink metrics input record number from Primary > Join matches Late Join's, even though our logs and internal metrics might > show zero traffic. > > If I do the late join via CoProcessFunction using a metadata keyed stream > instead of broadcast, then the Flink UI shows the correct numbers > (unfortunately there is another side issue when we take that tack but I > won't go into that here). > > I hope this was not too confusing. Again the issue is not that this does > not work -- it just looks like it does not work in the Flink UI. > > Below is the approximate code. Perhaps I'm doing something wrong that > causes the weird reporting? > > val metadata = MetadataTable > .streamFromKafka(env) > > val broadcast = createBroadcast(metadata) > > val metadataJoined = sourceTables .union(source1Tables) > .union(source2Tables) .connect(broadcast) .process(BroadcastMetadataJoin()) > // this operator will send side output data using Metadata.sideOutputTag > > .name("join") > > val lateJoined = metadataJoined > .getSideOutput(Metadata.sideOutputTag) > .keyBy(_.primaryKey.getMetadataId) > .connect(KeyedBroadcastMetadataJoin.broadcast(metadata)) > .process(KeyedBroadcastMetadataJoin()) > .name("late-join") > > > > >