Hi Matthias Yes, thanks for the questions - I know it's hard to keep up with all of the various KIPs and everything.
The instructions are not stored anywhere, but are simply a way of letting the RHS know how to handle the subscription and reply accordingly. The only case where we send an unnecessary tombstone is (that I can tell...) when we do the following: RHS: (1, bar) LHS (K,1) -> Results in (K, 1, bar) being output (K,1) -> (K,2) -> Results in (K, null) being output for INNER (no matching element on LHS) (K,2) -> (K,3) -> Results in (K, null) being output for INNER (because we don't maintain state to know we already output the tombstone on the previous transition). (K,2) -> (K,9000) -> Results in (K, null)... etc. Byte versioning is going in today, then I hope to get back to addressing a number of John's previous questions in the PR. Adam On Thu, Jun 27, 2019 at 5:47 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for bringing this issue to our attention. Great find @Joe! > > Adding the instruction field to the `subscription` sounds like a good > solution. What I don't understand atm: for which case would we need to > send unnecessary tombstone? I thought that the `instruction` field helps > to avoid any unnecessary tombstone? Seems I a missing case? > > Also for my own understanding: the `instruction` is only part of the > message? It is no necessary to store it in the RHS auxiliary store, right? > > About right/full-outer joins. Agreed. Getting left-joins would be awesome! > > About upgrading: Good call John! Adding a version byte for subscription > and response is good forward thinking. I personally prefer version > numbers, too, as they carry more information. > > Thanks for all the hard to everybody involved! > > > -Matthias > > On 6/27/19 1:44 PM, John Roesler wrote: > > Hi Adam, > > > > Hah! Yeah, I felt a headache coming on myself when I realized this > > would be a concern. > > > > For what it's worth, I'd also lean toward versioning. It seems more > > explicit and more likely to keep us all sane in the long run. Since we > > don't _think_ our wire protocol will be subject to a lot of revisions, > > we can just use one byte. The worst case is that we run out of numbers > > and reserve the last one to mean, "consult another field for the > > actual version number". It seems like a single byte on each message > > isn't too much to pay. > > > > Since you point it out, we might as well put a version number on the > > SubscriptionResponseWrapper as well. It may not be needed, but if we > > ever need it, even just once, we'll be glad we have it. > > > > Regarding the instructions field, we can also serialize the enum very > > compactly as a single byte (which is the same size a boolean takes > > anyway), so it seems like an Enum in Java-land and a byte on the wire > > is a good choice. > > > > Agreed on the right and full outer joins, it doesn't seem necessary > > right now, although I am happy to see the left join "join" the party, > > since as you said, we were so close to it anyway. Can you also add it > > to the KIP? > > > > Thanks as always for your awesome efforts on this, > > -John > > > > On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> > >> You're stretching my brain, John! > >> > >> I prefer STRATEGY 1 because it solves the problem in a simple way, and > >> allows us to deprecate support for older message types as we go (ie, we > >> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1). > >> > >> STRATEGY 2 is akin to Avro schemas between two microservices - there are > >> indeed cases where a breaking change must be made, and forward > >> compatibility will provide us with no out other than requiring a full > stop > >> and full upgrade for all nodes, shifting us back towards STRATEGY 1. > >> > >> My preference is STRATEGY 1 with instructions as an ENUM, and we can > >> certainly include a version. Would it make sense to include a version > >> number in SubscriptionResponseWrapper as well? Currently we don't have > any > >> instructions in there, as I removed the boolean, but it is certainly > >> plausible that it could happen in the future. I don't *think* we'll need > >> it, but I also didn't think we'd need it for SubscriptionWrapper and > here > >> we are. > >> > >> Thanks for the thoughts, and the info on the right-key. That was > >> enlightening, though I can't think of a use-case for it *at this point > in > >> time*. :) > >> > >> Adam > >> > >> > >> > >> On Thu, Jun 27, 2019 at 12:29 PM John Roesler <j...@confluent.io> > wrote: > >> > >>> I think I agree with you, right joins (and therefore full outer joins) > >>> don't make sense here, because the result is a keyed table, where the > >>> key is the PK of the left-hand side. So, when you have a > >>> right-hand-side record with no incoming FK references, you would want > >>> to produce a join result like `nullKey: (null, rhsValue)`, but we > >>> don't currently allow null keys in Streams. It actually is possible to > >>> define them, and therefore to add right- and full-outer foreign-key > >>> joins later, but it's non-trivial in a streaming context with > >>> continuously updated results. (See the PS if you're curious what I'm > >>> thinking). You're correct, right- and full-outer joins are trivial on > >>> our current 1:1 table joins because they are equi-joins. > >>> > >>> Regarding the transition, it sounds like what you're proposing is that > >>> we would say, "adding a foreign-key join to your topology requires a > >>> full application reset (or a new application id)". This is also an > >>> acceptable constraint to place on the feature, but not strictly > >>> necessary. Since 2.3, it's now possible to give all the state in your > >>> application stable names. This means that it's no longer true that > >>> adding a node to your topology graph would break its structure, and it > >>> does become possible to add new operators and simply restart the app. > >>> Revisiting my prior thought, though, I think the problem is not > >>> specific to your feature. For example, adding a new grouped > >>> aggregation would produce a new repartition topic, but the repartition > >>> topic partitions might get assigned to old nodes in the middle of a > >>> rolling bounce, and they would need to just ignore them. This > >>> requirement is the same for the repartition topics in the FK join, so > >>> it's orthogonal to your design. > >>> > >>> Back to the first concern, though, I'm not sure I followed the > >>> explanation. As a thought experiment, let's imagine that Joe hadn't > >>> taken the time to experiment with your feature branch. We wouldn't > >>> have noticed the problem until the feature was already released in > >>> 2.4. So the wire protocol on that PK->FK subscription topic would have > >>> been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the > >>> problem once they picked up the feature, so we would want to implement > >>> your proposed fix and change the wire protocol to V2: > >>> "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we > >>> would see both 2.4 nodes encountering V2 messages and 2.5 nodes > >>> encountering V1 messages. How can they both detect that they are > >>> attempting to process a newer or older protocol? If they can detect > >>> it, then what should they do? > >>> > >>> From experience, there are two basic solutions to this problem: > >>> > >>> STRATEGY1. Add a protocol version to the message (could be a number at > >>> the start of the message payload, or it could be a number in the > >>> message headers, not sure if it matters much. Payload is probably more > >>> compact, since the header would need a name.) In this case, the 2.4 > >>> worker would know that it's max protocol version is V1, and when it > >>> sees the V2 message, it knows that it can't handle it properly. Rather > >>> than doing something wrong, it would just not do anything. This means > >>> it would stop the task, if not shut down the whole instance. On the > >>> other hand, a 2.5 worker would have some defined logic for how to > >>> handle all versions (V1 and V2), so once the upgrade is complete, all > >>> messages can be processed. > >>> > >>> STRATEGY2. Make the schema forward-compatible. Basically, we ensure > >>> that new fields can only be appended to the message schema, and that > >>> older workers using only a prefix of the full message would still > >>> behave correctly. Using the example above, we'd instead evolve the > >>> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set > >>> the boolean field to true for the "new" foreign key. Then, 2.4 workers > >>> encountering the a "new FK" message would just see the prefix of the > >>> payload that makes sense to them, and they would still continue > >>> processing the messages as they always have. Only after the 2.5 code > >>> is fully rolled out to the cluster would we be sure to see the desired > >>> behavior. Note: in the reverse case, a 2.5 worker knows how to fully > >>> parse the new message format, even if it plans to ignore the BOOLEAN > >>> field. > >>> > >>> There are some tradeoffs between these strategies: STRATEGY1 ensures > >>> that all messages are only handled by workers that can properly handle > >>> them, although it results in processing stalls while there are still > >>> old nodes in the cluster. STRATEGY2 ensures that all messages can be > >>> processed by all nodes, so there are no stalls, but we can never > >>> remove fields from the message, so if there are a lot of revisions in > >>> the future, the payloads will become bloated. Also, it's not clear > >>> that you can actually pull off STRATEGY2 in all cases. If there's some > >>> new kind of message you want to send that has no way to be correctly > >>> processed at all under the 2.4 code paths, the prefix thing simply > >>> doesn't work. Etc. > >>> > >>> Also, note that you can modify the above strategies by instead > >>> designing the message fields for extensibility. E.g., if you make the > >>> instructions field an enum, then you can make sure that the default > >>> case is handled sensibly (probably similarly to STRATEGY1, just choke > >>> on unknown instructions) and that you never remove an instruction type > >>> from the enum in future versions. > >>> > >>> Does this make sense? > >>> -John > >>> > >>> > >>> > >>> > >>> PS: > >>> We can define null keys for streaming tables, but it's tricky. > >>> > >>> Specifically, you'd want to define some concept of null keys that > >>> allows all null keys to be unique, but _also_ to have a fixed > >>> identity, so that a particular null-key can be updated later. One > >>> example could be to union the existing keyspace with a new > >>> null-keyspace, where normal keys are like "key" and null-keys are like > >>> "null(identity)". Then given a query like > >>> "KTable<String,Integer>.rightJoin(KTable<Integer,Boolean>)", and > >>> inputs like: > >>> LHS: > >>> "a": 1 > >>> "b": 2 > >>> > >>> RHS: > >>> 1: true > >>> 3: false > >>> > >>> a full outer join would produce: > >>> "a": (1, true) > >>> "b": (2, null) > >>> null(3): (null, false) > >>> > >>> which can be correctly updated later if we get an update on the LHS: > >>> PUT("c": 3) > >>> > >>> We'd emit for the results: > >>> DELETE(null(e)) > >>> EMIT("c": (3, false)) > >>> > >>> Resulting in the correct result table of: > >>> "a": (1, true) > >>> "b": (2, null) > >>> "c": (3, false) > >>> > >>> As mentioned, this is tricky, and I would avoid it until we have > >>> evidence that it's actually useful to cover this part of the design > >>> space. Certainly, it would be a separate KIP if it came to that. > >>> > >>> On Wed, Jun 26, 2019 at 8:57 PM Adam Bellemare < > adam.bellem...@gmail.com> > >>> wrote: > >>>> > >>>> Hi John > >>>> > >>>> Good thinking with regards to upgrade path between versions regarding > >>>> over-the-wire instructions in SubscriptionWrapper. At this point in > time > >>> I > >>>> can't think of any new wire message instructions, but I would > appreciate > >>> as > >>>> many eyes on it as possible. I have just included the LEFT join in the > >>> last > >>>> commit (about 10 min ago) along with INNER join. I do not think that > >>> RIGHT > >>>> join and OUTER are possible given that there is no LHS key available, > so > >>>> LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. > This > >>> is > >>>> in contrast to the current LHSTable.outerJoin(RHSTable), as they are > both > >>>> keyed on the same key. I have buffed up the Integration tests and have > >>>> tried to make them more readable to ensure that we're covering all the > >>>> scenarios. I think that if we can get more eyes on the workflow > showing > >>> the > >>>> various LHS and RHS events and outputs then that may help us validate > >>> that > >>>> we have all the scenarios covered. > >>>> > >>>> With regards to the 2.3->2.4 scenario you described, I'm not entirely > >>> sure > >>>> I follow. If they want to add a FK-join, they will need to rework > their > >>>> code in the KStreams app and make a new release, since the underlying > >>>> topology would be different and new internal topics would need to be > >>>> created. In other words, I don't think a rolling upgrade where the > user > >>>> introduces a FK join would be possible since their topology would > >>>> necessitate a full KStreams reset. Is this what you meant? > >>>> > >>>> > >>>> > >>>> On Wed, Jun 26, 2019 at 4:10 PM John Roesler <j...@confluent.io> > wrote: > >>>> > >>>>> Thanks, Adam! > >>>>> > >>>>> One unrelated thought that has just now occurred to me is that > (unlike > >>>>> the equi-joins we currently have), this join logic is potentially > >>>>> spread over multiple Streams instances, which in general means that > >>>>> the instances may be running different versions of Kafka Streams. > >>>>> > >>>>> This means that if we discover a bug that requires us to again change > >>>>> the wire message (as you did in this proposal update), we need to > >>>>> consider what should happen if the PK instance is newer than the FK > >>>>> instance, or vice-versa, during a rolling upgrade. We should think > >>>>> ahead to this condition and make sure the logic is forward > compatible. > >>>>> > >>>>> Related: what about the initial case, when we release this feature > >>>>> (let's say in 2.4)? What will happen if I decide to adopt 2.4 and add > >>>>> a FK join together in one upgrade. Thus, the 2.4 member of the > cluster > >>>>> is producing the SubscriptionWrapper messages, and some 2.3 members > >>>>> get the subscription topic assigned to them, but they have no idea > >>>>> what to do with it? I'm not sure this is a problem; hopefully they > >>>>> just do nothing. If it is a problem, it would be fine to say you have > >>>>> to upgrade completely to 2.4 before deploying a FK join. > >>>>> > >>>>> Just want to make sure we anticipate these issues in case it affects > >>>>> the design at all. > >>>>> > >>>>> Thanks, > >>>>> -John > >>>>> > >>>>> On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare < > >>> adam.bellem...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>> Sigh... Forgot the link: > >>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=78&selectedPageVersions=74 > >>>>>> > >>>>>> I'll update it when I validate that there are no issues with > >>> removing the > >>>>>> SubscriptionResponseWrapper boolean. > >>>>>> > >>>>>> On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare < > >>> adam.bellem...@gmail.com > >>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>>> Maybe just call it as (k, leftval, null) or (k, null, rightval)? > >>>>>>> Done. > >>>>>>> > >>>>>>>> if you update the KIP, you might want to send a new "diff link" > >>> to > >>>>> this > >>>>>>> thread > >>>>>>> Here it is: > >>>>>>> > >>>>>>>> Looking closely at the proposal, can you explain more about the > >>>>>>> propagateIfNull field in SubscriptionResponseWrapper? It sort of > >>> looks > >>>>> like > >>>>>>> it's always going to be equal to (RHS-result != null). > >>>>>>> I believe you are correct, and I missed the forest for the trees. > >>> They > >>>>> are > >>>>>>> effectively the same thing, and I can simply remove the flag. I > >>> will > >>>>> code > >>>>>>> it up and try it out locally just to be sure. > >>>>>>> > >>>>>>> Thanks again for your help, it is greatly appreciated! > >>>>>>> > >>>>>>> On Wed, Jun 26, 2019 at 2:54 PM John Roesler <j...@confluent.io> > >>>>> wrote: > >>>>>>> > >>>>>>>> I think the "scenario trace" is very nice, but has one point that > >>> I > >>>>>>>> found confusing: > >>>>>>>> > >>>>>>>> You indicate a retraction in the join output as (k,null) and a > >>> join > >>>>>>>> result as (k, leftval, rightval), but confusingly, you also write > >>> a > >>>>>>>> join result as (k, JoinResult) when one side is null. Maybe just > >>> call > >>>>>>>> it as (k, leftval, null) or (k, null, rightval)? That way the > >>> readers > >>>>>>>> can more easily determine if the results meet their expectations > >>> for > >>>>>>>> each join type. > >>>>>>>> > >>>>>>>> (procedural note: if you update the KIP, you might want to send a > >>> new > >>>>>>>> "diff link" to this thread, since the one I posted at the > >>> beginning > >>>>>>>> would not automatically show your latest changes) > >>>>>>>> > >>>>>>>> I was initially concerned that the proposed algorithm would wind > >>> up > >>>>>>>> propagating something that looks like a left join (k, leftval, > >>> null) > >>>>>>>> under the case that Joe pointed out, but after reviewing your > >>>>>>>> scenario, I see that it will emit a tombstone (k, null) instead. > >>> This > >>>>>>>> is appropriate, and unavoidable, since we have to retract the join > >>>>>>>> result from the logical view (the join result is a logical Table). > >>>>>>>> > >>>>>>>> Looking closely at the proposal, can you explain more about the > >>>>>>>> propagateIfNull field in SubscriptionResponseWrapper? > >>>>>>>> It sort of looks like it's always going to be equal to > >>> (RHS-result != > >>>>>>>> null). > >>>>>>>> > >>>>>>>> In other words, can we drop that field and just send back > >>> RHS-result > >>>>>>>> or null, and then handle it on the left-hand side like: > >>>>>>>> if (rhsOriginalValueHash doesn't match) { > >>>>>>>> emit nothing, just drop the update > >>>>>>>> } else if (joinType==inner && rhsValue == null) { > >>>>>>>> emit tombstone > >>>>>>>> } else { > >>>>>>>> emit joiner(lhsValue, rhsValue) > >>>>>>>> } > >>>>>>>> > >>>>>>>> To your concern about emitting extra tombstones, personally, I > >>> think > >>>>>>>> it's fine. Clearly, we should try to avoid unnecessary > >>> tombstones, but > >>>>>>>> all things considered, it's not harmful to emit some unnecessary > >>>>>>>> tombstones: their payload is small, and they are trivial to handle > >>>>>>>> downstream. If users want to, they can materialize the join > >>> result to > >>>>>>>> suppress any extra tombstones, so there's a way out. > >>>>>>>> > >>>>>>>> Thanks for the awesome idea. It's better than what I was thinking. > >>>>>>>> -john > >>>>>>>> > >>>>>>>> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare > >>>>>>>> <adam.bellem...@gmail.com> wrote: > >>>>>>>>> > >>>>>>>>> Thanks John. > >>>>>>>>> > >>>>>>>>> I'm looking forward to any feedback on this. In the meantime I > >>> will > >>>>>>>> work on > >>>>>>>>> the unit tests to ensure that we have well-defined and readable > >>>>>>>> coverage. > >>>>>>>>> > >>>>>>>>> At the moment I cannot see a way around emitting (k,null) > >>> whenever > >>>>> we > >>>>>>>> emit > >>>>>>>>> an event that lacks a matching foreign key on the RHS, except > >>> in the > >>>>>>>>> (k,null) -> (k,fk) case. > >>>>>>>>> If this LHS oldValue=null, we know we would have emitted a > >>> deletion > >>>>> and > >>>>>>>> so > >>>>>>>>> (k,null) would be emitted out of the join. In this case we don't > >>>>> need to > >>>>>>>>> send another null. > >>>>>>>>> > >>>>>>>>> Adam > >>>>>>>>> > >>>>>>>>> On Wed, Jun 26, 2019 at 11:53 AM John Roesler < > >>> j...@confluent.io> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Adam, > >>>>>>>>>> > >>>>>>>>>> Thanks for the proposed revision to your KIP > >>>>>>>>>> ( > >>>>>>>>>> > >>>>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=77&selectedPageVersions=74 > >>>>>>>>>> ) > >>>>>>>>>> > >>>>>>>>>> in response to the concern pointed out during code review > >>>>>>>>>> ( > >>> https://github.com/apache/kafka/pull/5527#issuecomment-505137962 > >>>>> ) > >>>>>>>>>> > >>>>>>>>>> We should have a brief discussion thread (here) in the mailing > >>>>> list to > >>>>>>>>>> make sure everyone who wants to gets a chance to consider the > >>>>>>>>>> modification to the design. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> -John > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > >