Hi Matthias

Yes, thanks for the questions - I know it's hard to keep up with all of the
various KIPs and everything.

The instructions are not stored anywhere, but are simply a way of letting
the RHS know how to handle the subscription and reply accordingly.

The only case where we send an unnecessary tombstone is (that I can
tell...) when we do the following:
RHS:
(1, bar)

LHS
(K,1)  -> Results in (K, 1, bar) being output
(K,1) -> (K,2) ->  Results in (K, null) being output for INNER (no matching
element on LHS)
(K,2) -> (K,3) ->  Results in (K, null) being output for INNER (because we
don't maintain state to know we already output the tombstone on the
previous transition).
(K,2) -> (K,9000) ->  Results in (K, null)... etc.

Byte versioning is going in today, then I hope to get back to addressing a
number of John's previous questions in the PR.

Adam


On Thu, Jun 27, 2019 at 5:47 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for bringing this issue to our attention. Great find @Joe!
>
> Adding the instruction field to the `subscription` sounds like a good
> solution. What I don't understand atm: for which case would we need to
> send unnecessary tombstone? I thought that the `instruction` field helps
> to avoid any unnecessary tombstone? Seems I a missing case?
>
> Also for my own understanding: the `instruction` is only part of the
> message? It is no necessary to store it in the RHS auxiliary store, right?
>
> About right/full-outer joins. Agreed. Getting left-joins would be awesome!
>
> About upgrading: Good call John! Adding a version byte for subscription
> and response is good forward thinking. I personally prefer version
> numbers, too, as they carry more information.
>
> Thanks for all the hard to everybody involved!
>
>
> -Matthias
>
> On 6/27/19 1:44 PM, John Roesler wrote:
> > Hi Adam,
> >
> > Hah! Yeah, I felt a headache coming on myself when I realized this
> > would be a concern.
> >
> > For what it's worth, I'd also lean toward versioning. It seems more
> > explicit and more likely to keep us all sane in the long run. Since we
> > don't _think_ our wire protocol will be subject to a lot of revisions,
> > we can just use one byte. The worst case is that we run out of numbers
> > and reserve the last one to mean, "consult another field for the
> > actual version number". It seems like a single byte on each message
> > isn't too much to pay.
> >
> > Since you point it out, we might as well put a version number on the
> > SubscriptionResponseWrapper as well. It may not be needed, but if we
> > ever need it, even just once, we'll be glad we have it.
> >
> > Regarding the instructions field, we can also serialize the enum very
> > compactly as a single byte (which is the same size a boolean takes
> > anyway), so it seems like an Enum in Java-land and a byte on the wire
> > is a good choice.
> >
> > Agreed on the right and full outer joins, it doesn't seem necessary
> > right now, although I am happy to see the left join "join" the party,
> > since as you said, we were so close to it anyway. Can you also add it
> > to the KIP?
> >
> > Thanks as always for your awesome efforts on this,
> > -John
> >
> > On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
> >>
> >> You're stretching my brain, John!
> >>
> >> I prefer STRATEGY 1 because it solves the problem in a simple way, and
> >> allows us to deprecate support for older message types as we go (ie, we
> >> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).
> >>
> >> STRATEGY 2 is akin to Avro schemas between two microservices - there are
> >> indeed cases where a breaking change must be made, and forward
> >> compatibility will provide us with no out other than requiring a full
> stop
> >> and full upgrade for all nodes, shifting us back towards STRATEGY 1.
> >>
> >> My preference is STRATEGY 1 with instructions as an ENUM, and we can
> >> certainly include a version. Would it make sense to include a version
> >> number in  SubscriptionResponseWrapper as well? Currently we don't have
> any
> >> instructions in there, as I removed the boolean, but it is certainly
> >> plausible that it could happen in the future. I don't *think* we'll need
> >> it, but I also didn't think we'd need it for SubscriptionWrapper and
> here
> >> we are.
> >>
> >> Thanks for the thoughts, and the info on the right-key. That was
> >> enlightening, though I can't think of a use-case for it *at this point
> in
> >> time*. :)
> >>
> >> Adam
> >>
> >>
> >>
> >> On Thu, Jun 27, 2019 at 12:29 PM John Roesler <j...@confluent.io>
> wrote:
> >>
> >>> I think I agree with you, right joins (and therefore full outer joins)
> >>> don't make sense here, because the result is a keyed table, where the
> >>> key is the PK of the left-hand side. So, when you have a
> >>> right-hand-side record with no incoming FK references, you would want
> >>> to produce a join result like `nullKey: (null, rhsValue)`, but we
> >>> don't currently allow null keys in Streams. It actually is possible to
> >>> define them, and therefore to add right- and full-outer foreign-key
> >>> joins later, but it's non-trivial in a streaming context with
> >>> continuously updated results. (See the PS if you're curious what I'm
> >>> thinking). You're correct, right- and full-outer joins are trivial on
> >>> our current 1:1 table joins because they are equi-joins.
> >>>
> >>> Regarding the transition, it sounds like what you're proposing is that
> >>> we would say, "adding a foreign-key join to your topology requires a
> >>> full application reset (or a new application id)". This is also an
> >>> acceptable constraint to place on the feature, but not strictly
> >>> necessary. Since 2.3, it's now possible to give all the state in your
> >>> application stable names. This means that it's no longer true that
> >>> adding a node to your topology graph would break its structure, and it
> >>> does become possible to add new operators and simply restart the app.
> >>> Revisiting my prior thought, though, I think the problem is not
> >>> specific to your feature. For example, adding a new grouped
> >>> aggregation would produce a new repartition topic, but the repartition
> >>> topic partitions might get assigned to old nodes in the middle of a
> >>> rolling bounce, and they would need to just ignore them. This
> >>> requirement is the same for the repartition topics in the FK join, so
> >>> it's orthogonal to your design.
> >>>
> >>> Back to the first concern, though, I'm not sure I followed the
> >>> explanation. As a thought experiment, let's imagine that Joe hadn't
> >>> taken the time to experiment with your feature branch. We wouldn't
> >>> have noticed the problem until the feature was already released in
> >>> 2.4. So the wire protocol on that PK->FK subscription topic would have
> >>> been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the
> >>> problem once they picked up the feature, so we would want to implement
> >>> your proposed fix and change the wire protocol to V2:
> >>> "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we
> >>> would see both 2.4 nodes encountering V2 messages and 2.5 nodes
> >>> encountering V1 messages. How can they both detect that they are
> >>> attempting to process a newer or older protocol? If they can detect
> >>> it, then what should they do?
> >>>
> >>> From experience, there are two basic solutions to this problem:
> >>>
> >>> STRATEGY1. Add a protocol version to the message (could be a number at
> >>> the start of the message payload, or it could be a number in the
> >>> message headers, not sure if it matters much. Payload is probably more
> >>> compact, since the header would need a name.) In this case, the 2.4
> >>> worker would know that it's max protocol version is V1, and when it
> >>> sees the V2 message, it knows that it can't handle it properly. Rather
> >>> than doing something wrong, it would just not do anything. This means
> >>> it would stop the task, if not shut down the whole instance. On the
> >>> other hand, a 2.5 worker would have some defined logic for how to
> >>> handle all versions (V1 and V2), so once the upgrade is complete, all
> >>> messages can be processed.
> >>>
> >>> STRATEGY2. Make the schema forward-compatible. Basically, we ensure
> >>> that new fields can only be appended to the message schema, and that
> >>> older workers using only a prefix of the full message would still
> >>> behave correctly. Using the example above, we'd instead evolve the
> >>> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set
> >>> the boolean field to true for the "new" foreign key. Then, 2.4 workers
> >>> encountering the a "new FK" message would just see the prefix of the
> >>> payload that makes sense to them, and they would still continue
> >>> processing the messages as they always have. Only after the 2.5 code
> >>> is fully rolled out to the cluster would we be sure to see the desired
> >>> behavior. Note: in the reverse case, a 2.5 worker knows how to fully
> >>> parse the new message format, even if it plans to ignore the BOOLEAN
> >>> field.
> >>>
> >>> There are some tradeoffs between these strategies: STRATEGY1 ensures
> >>> that all messages are only handled by workers that can properly handle
> >>> them, although it results in processing stalls while there are still
> >>> old nodes in the cluster. STRATEGY2 ensures that all messages can be
> >>> processed by all nodes, so there are no stalls, but we can never
> >>> remove fields from the message, so if there are a lot of revisions in
> >>> the future, the payloads will become bloated. Also, it's not clear
> >>> that you can actually pull off STRATEGY2 in all cases. If there's some
> >>> new kind of message you want to send that has no way to be correctly
> >>> processed at all under the 2.4 code paths, the prefix thing simply
> >>> doesn't work. Etc.
> >>>
> >>> Also, note that you can modify the above strategies by instead
> >>> designing the message fields for extensibility. E.g., if you make the
> >>> instructions field an enum, then you can make sure that the default
> >>> case is handled sensibly (probably similarly to STRATEGY1, just choke
> >>> on unknown instructions) and that you never remove an instruction type
> >>> from the enum in future versions.
> >>>
> >>> Does this make sense?
> >>> -John
> >>>
> >>>
> >>>
> >>>
> >>> PS:
> >>> We can define null keys for streaming tables, but it's tricky.
> >>>
> >>> Specifically, you'd want to define some concept of null keys that
> >>> allows all null keys to be unique, but _also_ to have a fixed
> >>> identity, so that a particular null-key can be updated later. One
> >>> example could be to union the existing keyspace with a new
> >>> null-keyspace, where normal keys are like "key" and null-keys are like
> >>> "null(identity)". Then given a query like
> >>> "KTable<String,Integer>.rightJoin(KTable<Integer,Boolean>)", and
> >>> inputs like:
> >>> LHS:
> >>> "a": 1
> >>> "b": 2
> >>>
> >>> RHS:
> >>> 1: true
> >>> 3: false
> >>>
> >>> a full outer join would produce:
> >>> "a": (1, true)
> >>> "b": (2, null)
> >>> null(3): (null, false)
> >>>
> >>> which can be correctly updated later if we get an update on the LHS:
> >>> PUT("c": 3)
> >>>
> >>> We'd emit for the results:
> >>> DELETE(null(e))
> >>> EMIT("c": (3, false))
> >>>
> >>> Resulting in the correct result table of:
> >>> "a": (1, true)
> >>> "b": (2, null)
> >>> "c": (3, false)
> >>>
> >>> As mentioned, this is tricky, and I would avoid it until we have
> >>> evidence that it's actually useful to cover this part of the design
> >>> space. Certainly, it would be a separate KIP if it came to that.
> >>>
> >>> On Wed, Jun 26, 2019 at 8:57 PM Adam Bellemare <
> adam.bellem...@gmail.com>
> >>> wrote:
> >>>>
> >>>> Hi John
> >>>>
> >>>> Good thinking with regards to upgrade path between versions regarding
> >>>> over-the-wire instructions in SubscriptionWrapper. At this point in
> time
> >>> I
> >>>> can't think of any new wire message instructions, but I would
> appreciate
> >>> as
> >>>> many eyes on it as possible. I have just included the LEFT join in the
> >>> last
> >>>> commit (about 10 min ago) along with INNER join. I do not think that
> >>> RIGHT
> >>>> join and OUTER are possible given that there is no LHS key available,
> so
> >>>> LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense.
> This
> >>> is
> >>>> in contrast to the current LHSTable.outerJoin(RHSTable), as they are
> both
> >>>> keyed on the same key. I have buffed up the Integration tests and have
> >>>> tried to make them more readable to ensure that we're covering all the
> >>>> scenarios. I think that if we can get more eyes on the workflow
> showing
> >>> the
> >>>> various LHS and RHS events and outputs then that may help us validate
> >>> that
> >>>> we have all the scenarios covered.
> >>>>
> >>>> With regards to the 2.3->2.4 scenario you described, I'm not entirely
> >>> sure
> >>>> I follow. If they want to add a FK-join, they will need to rework
> their
> >>>> code in the KStreams app and make a new release, since the underlying
> >>>> topology would be different and new internal topics would need to be
> >>>> created. In other words, I don't think a rolling upgrade where the
> user
> >>>> introduces a FK join would be possible since their topology would
> >>>> necessitate a full KStreams reset. Is this what you meant?
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Jun 26, 2019 at 4:10 PM John Roesler <j...@confluent.io>
> wrote:
> >>>>
> >>>>> Thanks, Adam!
> >>>>>
> >>>>> One unrelated thought that has just now occurred to me is that
> (unlike
> >>>>> the equi-joins we currently have), this join logic is potentially
> >>>>> spread over multiple Streams instances, which in general means that
> >>>>> the instances may be running different versions of Kafka Streams.
> >>>>>
> >>>>> This means that if we discover a bug that requires us to again change
> >>>>> the wire message (as you did in this proposal update), we need to
> >>>>> consider what should happen if the PK instance is newer than the FK
> >>>>> instance, or vice-versa, during a rolling upgrade. We should think
> >>>>> ahead to this condition and make sure the logic is forward
> compatible.
> >>>>>
> >>>>> Related: what about the initial case, when we release this feature
> >>>>> (let's say in 2.4)? What will happen if I decide to adopt 2.4 and add
> >>>>> a FK join together in one upgrade. Thus, the 2.4 member of the
> cluster
> >>>>> is producing the SubscriptionWrapper messages, and some 2.3 members
> >>>>> get the subscription topic assigned to them, but they have no idea
> >>>>> what to do with it? I'm not sure this is a problem; hopefully they
> >>>>> just do nothing. If it is a problem, it would be fine to say you have
> >>>>> to upgrade completely to 2.4 before deploying a FK join.
> >>>>>
> >>>>> Just want to make sure we anticipate these issues in case it affects
> >>>>> the design at all.
> >>>>>
> >>>>> Thanks,
> >>>>> -John
> >>>>>
> >>>>> On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare <
> >>> adam.bellem...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Sigh... Forgot the link:
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=78&selectedPageVersions=74
> >>>>>>
> >>>>>> I'll update it when I validate that there are no issues with
> >>> removing the
> >>>>>> SubscriptionResponseWrapper boolean.
> >>>>>>
> >>>>>> On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare <
> >>> adam.bellem...@gmail.com
> >>>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>>> Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> >>>>>>> Done.
> >>>>>>>
> >>>>>>>> if you update the KIP, you might want to send a new "diff link"
> >>> to
> >>>>> this
> >>>>>>> thread
> >>>>>>> Here it is:
> >>>>>>>
> >>>>>>>> Looking closely at the proposal, can you explain more about the
> >>>>>>> propagateIfNull field in SubscriptionResponseWrapper? It sort of
> >>> looks
> >>>>> like
> >>>>>>> it's always going to be equal to (RHS-result != null).
> >>>>>>> I believe you are correct, and I missed the forest for the trees.
> >>> They
> >>>>> are
> >>>>>>> effectively the same thing, and I can simply remove the flag. I
> >>> will
> >>>>> code
> >>>>>>> it up and try it out locally just to be sure.
> >>>>>>>
> >>>>>>> Thanks again for your help, it is greatly appreciated!
> >>>>>>>
> >>>>>>> On Wed, Jun 26, 2019 at 2:54 PM John Roesler <j...@confluent.io>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> I think the "scenario trace" is very nice, but has one point that
> >>> I
> >>>>>>>> found confusing:
> >>>>>>>>
> >>>>>>>> You indicate a retraction in the join output as (k,null) and a
> >>> join
> >>>>>>>> result as (k, leftval, rightval), but confusingly, you also write
> >>> a
> >>>>>>>> join result as (k, JoinResult) when one side is null. Maybe just
> >>> call
> >>>>>>>> it as (k, leftval, null) or (k, null, rightval)? That way the
> >>> readers
> >>>>>>>> can more easily determine if the results meet their expectations
> >>> for
> >>>>>>>> each join type.
> >>>>>>>>
> >>>>>>>> (procedural note: if you update the KIP, you might want to send a
> >>> new
> >>>>>>>> "diff link" to this thread, since the one I posted at the
> >>> beginning
> >>>>>>>> would not automatically show your latest changes)
> >>>>>>>>
> >>>>>>>> I was initially concerned that the proposed algorithm would wind
> >>> up
> >>>>>>>> propagating something that looks like a left join (k, leftval,
> >>> null)
> >>>>>>>> under the case that Joe pointed out, but after reviewing your
> >>>>>>>> scenario, I see that it will emit a tombstone (k, null) instead.
> >>> This
> >>>>>>>> is appropriate, and unavoidable, since we have to retract the join
> >>>>>>>> result from the logical view (the join result is a logical Table).
> >>>>>>>>
> >>>>>>>> Looking closely at the proposal, can you explain more about the
> >>>>>>>> propagateIfNull field in SubscriptionResponseWrapper?
> >>>>>>>> It sort of looks like it's always going to be equal to
> >>> (RHS-result !=
> >>>>>>>> null).
> >>>>>>>>
> >>>>>>>> In other words, can we drop that field and just send back
> >>> RHS-result
> >>>>>>>> or null, and then handle it on the left-hand side like:
> >>>>>>>> if (rhsOriginalValueHash doesn't match) {
> >>>>>>>>     emit nothing, just drop the update
> >>>>>>>> } else if (joinType==inner && rhsValue == null) {
> >>>>>>>>     emit tombstone
> >>>>>>>> } else {
> >>>>>>>>     emit joiner(lhsValue, rhsValue)
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> To your concern about emitting extra tombstones, personally, I
> >>> think
> >>>>>>>> it's fine. Clearly, we should try to avoid unnecessary
> >>> tombstones, but
> >>>>>>>> all things considered, it's not harmful to emit some unnecessary
> >>>>>>>> tombstones: their payload is small, and they are trivial to handle
> >>>>>>>> downstream. If users want to, they can materialize the join
> >>> result to
> >>>>>>>> suppress any extra tombstones, so there's a way out.
> >>>>>>>>
> >>>>>>>> Thanks for the awesome idea. It's better than what I was thinking.
> >>>>>>>> -john
> >>>>>>>>
> >>>>>>>> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
> >>>>>>>> <adam.bellem...@gmail.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks John.
> >>>>>>>>>
> >>>>>>>>> I'm looking forward to any feedback on this. In the meantime I
> >>> will
> >>>>>>>> work on
> >>>>>>>>> the unit tests to ensure that we have well-defined and readable
> >>>>>>>> coverage.
> >>>>>>>>>
> >>>>>>>>> At the moment I cannot see a way around emitting (k,null)
> >>> whenever
> >>>>> we
> >>>>>>>> emit
> >>>>>>>>> an event that lacks a matching foreign key on the RHS, except
> >>> in the
> >>>>>>>>> (k,null) -> (k,fk) case.
> >>>>>>>>> If this LHS oldValue=null, we know we would have emitted a
> >>> deletion
> >>>>> and
> >>>>>>>> so
> >>>>>>>>> (k,null) would be emitted out of the join. In this case we don't
> >>>>> need to
> >>>>>>>>> send another null.
> >>>>>>>>>
> >>>>>>>>> Adam
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 26, 2019 at 11:53 AM John Roesler <
> >>> j...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Adam,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the proposed revision to your KIP
> >>>>>>>>>> (
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=77&selectedPageVersions=74
> >>>>>>>>>> )
> >>>>>>>>>>
> >>>>>>>>>> in response to the concern pointed out during code review
> >>>>>>>>>> (
> >>> https://github.com/apache/kafka/pull/5527#issuecomment-505137962
> >>>>> )
> >>>>>>>>>>
> >>>>>>>>>> We should have a brief discussion thread (here) in the mailing
> >>>>> list to
> >>>>>>>>>> make sure everyone who wants to gets a chance to consider the
> >>>>>>>>>> modification to the design.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> -John
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
>
>

Reply via email to