Thanks for bringing this issue to our attention. Great find @Joe!

Adding the instruction field to the `subscription` sounds like a good
solution. What I don't understand atm: for which case would we need to
send unnecessary tombstone? I thought that the `instruction` field helps
to avoid any unnecessary tombstone? Seems I a missing case?

Also for my own understanding: the `instruction` is only part of the
message? It is no necessary to store it in the RHS auxiliary store, right?

About right/full-outer joins. Agreed. Getting left-joins would be awesome!

About upgrading: Good call John! Adding a version byte for subscription
and response is good forward thinking. I personally prefer version
numbers, too, as they carry more information.

Thanks for all the hard to everybody involved!


-Matthias

On 6/27/19 1:44 PM, John Roesler wrote:
> Hi Adam,
> 
> Hah! Yeah, I felt a headache coming on myself when I realized this
> would be a concern.
> 
> For what it's worth, I'd also lean toward versioning. It seems more
> explicit and more likely to keep us all sane in the long run. Since we
> don't _think_ our wire protocol will be subject to a lot of revisions,
> we can just use one byte. The worst case is that we run out of numbers
> and reserve the last one to mean, "consult another field for the
> actual version number". It seems like a single byte on each message
> isn't too much to pay.
> 
> Since you point it out, we might as well put a version number on the
> SubscriptionResponseWrapper as well. It may not be needed, but if we
> ever need it, even just once, we'll be glad we have it.
> 
> Regarding the instructions field, we can also serialize the enum very
> compactly as a single byte (which is the same size a boolean takes
> anyway), so it seems like an Enum in Java-land and a byte on the wire
> is a good choice.
> 
> Agreed on the right and full outer joins, it doesn't seem necessary
> right now, although I am happy to see the left join "join" the party,
> since as you said, we were so close to it anyway. Can you also add it
> to the KIP?
> 
> Thanks as always for your awesome efforts on this,
> -John
> 
> On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare <adam.bellem...@gmail.com> 
> wrote:
>>
>> You're stretching my brain, John!
>>
>> I prefer STRATEGY 1 because it solves the problem in a simple way, and
>> allows us to deprecate support for older message types as we go (ie, we
>> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).
>>
>> STRATEGY 2 is akin to Avro schemas between two microservices - there are
>> indeed cases where a breaking change must be made, and forward
>> compatibility will provide us with no out other than requiring a full stop
>> and full upgrade for all nodes, shifting us back towards STRATEGY 1.
>>
>> My preference is STRATEGY 1 with instructions as an ENUM, and we can
>> certainly include a version. Would it make sense to include a version
>> number in  SubscriptionResponseWrapper as well? Currently we don't have any
>> instructions in there, as I removed the boolean, but it is certainly
>> plausible that it could happen in the future. I don't *think* we'll need
>> it, but I also didn't think we'd need it for SubscriptionWrapper and here
>> we are.
>>
>> Thanks for the thoughts, and the info on the right-key. That was
>> enlightening, though I can't think of a use-case for it *at this point in
>> time*. :)
>>
>> Adam
>>
>>
>>
>> On Thu, Jun 27, 2019 at 12:29 PM John Roesler <j...@confluent.io> wrote:
>>
>>> I think I agree with you, right joins (and therefore full outer joins)
>>> don't make sense here, because the result is a keyed table, where the
>>> key is the PK of the left-hand side. So, when you have a
>>> right-hand-side record with no incoming FK references, you would want
>>> to produce a join result like `nullKey: (null, rhsValue)`, but we
>>> don't currently allow null keys in Streams. It actually is possible to
>>> define them, and therefore to add right- and full-outer foreign-key
>>> joins later, but it's non-trivial in a streaming context with
>>> continuously updated results. (See the PS if you're curious what I'm
>>> thinking). You're correct, right- and full-outer joins are trivial on
>>> our current 1:1 table joins because they are equi-joins.
>>>
>>> Regarding the transition, it sounds like what you're proposing is that
>>> we would say, "adding a foreign-key join to your topology requires a
>>> full application reset (or a new application id)". This is also an
>>> acceptable constraint to place on the feature, but not strictly
>>> necessary. Since 2.3, it's now possible to give all the state in your
>>> application stable names. This means that it's no longer true that
>>> adding a node to your topology graph would break its structure, and it
>>> does become possible to add new operators and simply restart the app.
>>> Revisiting my prior thought, though, I think the problem is not
>>> specific to your feature. For example, adding a new grouped
>>> aggregation would produce a new repartition topic, but the repartition
>>> topic partitions might get assigned to old nodes in the middle of a
>>> rolling bounce, and they would need to just ignore them. This
>>> requirement is the same for the repartition topics in the FK join, so
>>> it's orthogonal to your design.
>>>
>>> Back to the first concern, though, I'm not sure I followed the
>>> explanation. As a thought experiment, let's imagine that Joe hadn't
>>> taken the time to experiment with your feature branch. We wouldn't
>>> have noticed the problem until the feature was already released in
>>> 2.4. So the wire protocol on that PK->FK subscription topic would have
>>> been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the
>>> problem once they picked up the feature, so we would want to implement
>>> your proposed fix and change the wire protocol to V2:
>>> "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we
>>> would see both 2.4 nodes encountering V2 messages and 2.5 nodes
>>> encountering V1 messages. How can they both detect that they are
>>> attempting to process a newer or older protocol? If they can detect
>>> it, then what should they do?
>>>
>>> From experience, there are two basic solutions to this problem:
>>>
>>> STRATEGY1. Add a protocol version to the message (could be a number at
>>> the start of the message payload, or it could be a number in the
>>> message headers, not sure if it matters much. Payload is probably more
>>> compact, since the header would need a name.) In this case, the 2.4
>>> worker would know that it's max protocol version is V1, and when it
>>> sees the V2 message, it knows that it can't handle it properly. Rather
>>> than doing something wrong, it would just not do anything. This means
>>> it would stop the task, if not shut down the whole instance. On the
>>> other hand, a 2.5 worker would have some defined logic for how to
>>> handle all versions (V1 and V2), so once the upgrade is complete, all
>>> messages can be processed.
>>>
>>> STRATEGY2. Make the schema forward-compatible. Basically, we ensure
>>> that new fields can only be appended to the message schema, and that
>>> older workers using only a prefix of the full message would still
>>> behave correctly. Using the example above, we'd instead evolve the
>>> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set
>>> the boolean field to true for the "new" foreign key. Then, 2.4 workers
>>> encountering the a "new FK" message would just see the prefix of the
>>> payload that makes sense to them, and they would still continue
>>> processing the messages as they always have. Only after the 2.5 code
>>> is fully rolled out to the cluster would we be sure to see the desired
>>> behavior. Note: in the reverse case, a 2.5 worker knows how to fully
>>> parse the new message format, even if it plans to ignore the BOOLEAN
>>> field.
>>>
>>> There are some tradeoffs between these strategies: STRATEGY1 ensures
>>> that all messages are only handled by workers that can properly handle
>>> them, although it results in processing stalls while there are still
>>> old nodes in the cluster. STRATEGY2 ensures that all messages can be
>>> processed by all nodes, so there are no stalls, but we can never
>>> remove fields from the message, so if there are a lot of revisions in
>>> the future, the payloads will become bloated. Also, it's not clear
>>> that you can actually pull off STRATEGY2 in all cases. If there's some
>>> new kind of message you want to send that has no way to be correctly
>>> processed at all under the 2.4 code paths, the prefix thing simply
>>> doesn't work. Etc.
>>>
>>> Also, note that you can modify the above strategies by instead
>>> designing the message fields for extensibility. E.g., if you make the
>>> instructions field an enum, then you can make sure that the default
>>> case is handled sensibly (probably similarly to STRATEGY1, just choke
>>> on unknown instructions) and that you never remove an instruction type
>>> from the enum in future versions.
>>>
>>> Does this make sense?
>>> -John
>>>
>>>
>>>
>>>
>>> PS:
>>> We can define null keys for streaming tables, but it's tricky.
>>>
>>> Specifically, you'd want to define some concept of null keys that
>>> allows all null keys to be unique, but _also_ to have a fixed
>>> identity, so that a particular null-key can be updated later. One
>>> example could be to union the existing keyspace with a new
>>> null-keyspace, where normal keys are like "key" and null-keys are like
>>> "null(identity)". Then given a query like
>>> "KTable<String,Integer>.rightJoin(KTable<Integer,Boolean>)", and
>>> inputs like:
>>> LHS:
>>> "a": 1
>>> "b": 2
>>>
>>> RHS:
>>> 1: true
>>> 3: false
>>>
>>> a full outer join would produce:
>>> "a": (1, true)
>>> "b": (2, null)
>>> null(3): (null, false)
>>>
>>> which can be correctly updated later if we get an update on the LHS:
>>> PUT("c": 3)
>>>
>>> We'd emit for the results:
>>> DELETE(null(e))
>>> EMIT("c": (3, false))
>>>
>>> Resulting in the correct result table of:
>>> "a": (1, true)
>>> "b": (2, null)
>>> "c": (3, false)
>>>
>>> As mentioned, this is tricky, and I would avoid it until we have
>>> evidence that it's actually useful to cover this part of the design
>>> space. Certainly, it would be a separate KIP if it came to that.
>>>
>>> On Wed, Jun 26, 2019 at 8:57 PM Adam Bellemare <adam.bellem...@gmail.com>
>>> wrote:
>>>>
>>>> Hi John
>>>>
>>>> Good thinking with regards to upgrade path between versions regarding
>>>> over-the-wire instructions in SubscriptionWrapper. At this point in time
>>> I
>>>> can't think of any new wire message instructions, but I would appreciate
>>> as
>>>> many eyes on it as possible. I have just included the LEFT join in the
>>> last
>>>> commit (about 10 min ago) along with INNER join. I do not think that
>>> RIGHT
>>>> join and OUTER are possible given that there is no LHS key available, so
>>>> LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. This
>>> is
>>>> in contrast to the current LHSTable.outerJoin(RHSTable), as they are both
>>>> keyed on the same key. I have buffed up the Integration tests and have
>>>> tried to make them more readable to ensure that we're covering all the
>>>> scenarios. I think that if we can get more eyes on the workflow showing
>>> the
>>>> various LHS and RHS events and outputs then that may help us validate
>>> that
>>>> we have all the scenarios covered.
>>>>
>>>> With regards to the 2.3->2.4 scenario you described, I'm not entirely
>>> sure
>>>> I follow. If they want to add a FK-join, they will need to rework their
>>>> code in the KStreams app and make a new release, since the underlying
>>>> topology would be different and new internal topics would need to be
>>>> created. In other words, I don't think a rolling upgrade where the user
>>>> introduces a FK join would be possible since their topology would
>>>> necessitate a full KStreams reset. Is this what you meant?
>>>>
>>>>
>>>>
>>>> On Wed, Jun 26, 2019 at 4:10 PM John Roesler <j...@confluent.io> wrote:
>>>>
>>>>> Thanks, Adam!
>>>>>
>>>>> One unrelated thought that has just now occurred to me is that (unlike
>>>>> the equi-joins we currently have), this join logic is potentially
>>>>> spread over multiple Streams instances, which in general means that
>>>>> the instances may be running different versions of Kafka Streams.
>>>>>
>>>>> This means that if we discover a bug that requires us to again change
>>>>> the wire message (as you did in this proposal update), we need to
>>>>> consider what should happen if the PK instance is newer than the FK
>>>>> instance, or vice-versa, during a rolling upgrade. We should think
>>>>> ahead to this condition and make sure the logic is forward compatible.
>>>>>
>>>>> Related: what about the initial case, when we release this feature
>>>>> (let's say in 2.4)? What will happen if I decide to adopt 2.4 and add
>>>>> a FK join together in one upgrade. Thus, the 2.4 member of the cluster
>>>>> is producing the SubscriptionWrapper messages, and some 2.3 members
>>>>> get the subscription topic assigned to them, but they have no idea
>>>>> what to do with it? I'm not sure this is a problem; hopefully they
>>>>> just do nothing. If it is a problem, it would be fine to say you have
>>>>> to upgrade completely to 2.4 before deploying a FK join.
>>>>>
>>>>> Just want to make sure we anticipate these issues in case it affects
>>>>> the design at all.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare <
>>> adam.bellem...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> Sigh... Forgot the link:
>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=78&selectedPageVersions=74
>>>>>>
>>>>>> I'll update it when I validate that there are no issues with
>>> removing the
>>>>>> SubscriptionResponseWrapper boolean.
>>>>>>
>>>>>> On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare <
>>> adam.bellem...@gmail.com
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>>> Maybe just call it as (k, leftval, null) or (k, null, rightval)?
>>>>>>> Done.
>>>>>>>
>>>>>>>> if you update the KIP, you might want to send a new "diff link"
>>> to
>>>>> this
>>>>>>> thread
>>>>>>> Here it is:
>>>>>>>
>>>>>>>> Looking closely at the proposal, can you explain more about the
>>>>>>> propagateIfNull field in SubscriptionResponseWrapper? It sort of
>>> looks
>>>>> like
>>>>>>> it's always going to be equal to (RHS-result != null).
>>>>>>> I believe you are correct, and I missed the forest for the trees.
>>> They
>>>>> are
>>>>>>> effectively the same thing, and I can simply remove the flag. I
>>> will
>>>>> code
>>>>>>> it up and try it out locally just to be sure.
>>>>>>>
>>>>>>> Thanks again for your help, it is greatly appreciated!
>>>>>>>
>>>>>>> On Wed, Jun 26, 2019 at 2:54 PM John Roesler <j...@confluent.io>
>>>>> wrote:
>>>>>>>
>>>>>>>> I think the "scenario trace" is very nice, but has one point that
>>> I
>>>>>>>> found confusing:
>>>>>>>>
>>>>>>>> You indicate a retraction in the join output as (k,null) and a
>>> join
>>>>>>>> result as (k, leftval, rightval), but confusingly, you also write
>>> a
>>>>>>>> join result as (k, JoinResult) when one side is null. Maybe just
>>> call
>>>>>>>> it as (k, leftval, null) or (k, null, rightval)? That way the
>>> readers
>>>>>>>> can more easily determine if the results meet their expectations
>>> for
>>>>>>>> each join type.
>>>>>>>>
>>>>>>>> (procedural note: if you update the KIP, you might want to send a
>>> new
>>>>>>>> "diff link" to this thread, since the one I posted at the
>>> beginning
>>>>>>>> would not automatically show your latest changes)
>>>>>>>>
>>>>>>>> I was initially concerned that the proposed algorithm would wind
>>> up
>>>>>>>> propagating something that looks like a left join (k, leftval,
>>> null)
>>>>>>>> under the case that Joe pointed out, but after reviewing your
>>>>>>>> scenario, I see that it will emit a tombstone (k, null) instead.
>>> This
>>>>>>>> is appropriate, and unavoidable, since we have to retract the join
>>>>>>>> result from the logical view (the join result is a logical Table).
>>>>>>>>
>>>>>>>> Looking closely at the proposal, can you explain more about the
>>>>>>>> propagateIfNull field in SubscriptionResponseWrapper?
>>>>>>>> It sort of looks like it's always going to be equal to
>>> (RHS-result !=
>>>>>>>> null).
>>>>>>>>
>>>>>>>> In other words, can we drop that field and just send back
>>> RHS-result
>>>>>>>> or null, and then handle it on the left-hand side like:
>>>>>>>> if (rhsOriginalValueHash doesn't match) {
>>>>>>>>     emit nothing, just drop the update
>>>>>>>> } else if (joinType==inner && rhsValue == null) {
>>>>>>>>     emit tombstone
>>>>>>>> } else {
>>>>>>>>     emit joiner(lhsValue, rhsValue)
>>>>>>>> }
>>>>>>>>
>>>>>>>> To your concern about emitting extra tombstones, personally, I
>>> think
>>>>>>>> it's fine. Clearly, we should try to avoid unnecessary
>>> tombstones, but
>>>>>>>> all things considered, it's not harmful to emit some unnecessary
>>>>>>>> tombstones: their payload is small, and they are trivial to handle
>>>>>>>> downstream. If users want to, they can materialize the join
>>> result to
>>>>>>>> suppress any extra tombstones, so there's a way out.
>>>>>>>>
>>>>>>>> Thanks for the awesome idea. It's better than what I was thinking.
>>>>>>>> -john
>>>>>>>>
>>>>>>>> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
>>>>>>>> <adam.bellem...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Thanks John.
>>>>>>>>>
>>>>>>>>> I'm looking forward to any feedback on this. In the meantime I
>>> will
>>>>>>>> work on
>>>>>>>>> the unit tests to ensure that we have well-defined and readable
>>>>>>>> coverage.
>>>>>>>>>
>>>>>>>>> At the moment I cannot see a way around emitting (k,null)
>>> whenever
>>>>> we
>>>>>>>> emit
>>>>>>>>> an event that lacks a matching foreign key on the RHS, except
>>> in the
>>>>>>>>> (k,null) -> (k,fk) case.
>>>>>>>>> If this LHS oldValue=null, we know we would have emitted a
>>> deletion
>>>>> and
>>>>>>>> so
>>>>>>>>> (k,null) would be emitted out of the join. In this case we don't
>>>>> need to
>>>>>>>>> send another null.
>>>>>>>>>
>>>>>>>>> Adam
>>>>>>>>>
>>>>>>>>> On Wed, Jun 26, 2019 at 11:53 AM John Roesler <
>>> j...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Adam,
>>>>>>>>>>
>>>>>>>>>> Thanks for the proposed revision to your KIP
>>>>>>>>>> (
>>>>>>>>>>
>>>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=77&selectedPageVersions=74
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> in response to the concern pointed out during code review
>>>>>>>>>> (
>>> https://github.com/apache/kafka/pull/5527#issuecomment-505137962
>>>>> )
>>>>>>>>>>
>>>>>>>>>> We should have a brief discussion thread (here) in the mailing
>>>>> list to
>>>>>>>>>> make sure everyone who wants to gets a chance to consider the
>>>>>>>>>> modification to the design.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> -John
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to