You're stretching my brain, John!

I prefer STRATEGY 1 because it solves the problem in a simple way, and
allows us to deprecate support for older message types as we go (ie, we
only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).

STRATEGY 2 is akin to Avro schemas between two microservices - there are
indeed cases where a breaking change must be made, and forward
compatibility will provide us with no out other than requiring a full stop
and full upgrade for all nodes, shifting us back towards STRATEGY 1.

My preference is STRATEGY 1 with instructions as an ENUM, and we can
certainly include a version. Would it make sense to include a version
number in  SubscriptionResponseWrapper as well? Currently we don't have any
instructions in there, as I removed the boolean, but it is certainly
plausible that it could happen in the future. I don't *think* we'll need
it, but I also didn't think we'd need it for SubscriptionWrapper and here
we are.

Thanks for the thoughts, and the info on the right-key. That was
enlightening, though I can't think of a use-case for it *at this point in
time*. :)

Adam



On Thu, Jun 27, 2019 at 12:29 PM John Roesler <j...@confluent.io> wrote:

> I think I agree with you, right joins (and therefore full outer joins)
> don't make sense here, because the result is a keyed table, where the
> key is the PK of the left-hand side. So, when you have a
> right-hand-side record with no incoming FK references, you would want
> to produce a join result like `nullKey: (null, rhsValue)`, but we
> don't currently allow null keys in Streams. It actually is possible to
> define them, and therefore to add right- and full-outer foreign-key
> joins later, but it's non-trivial in a streaming context with
> continuously updated results. (See the PS if you're curious what I'm
> thinking). You're correct, right- and full-outer joins are trivial on
> our current 1:1 table joins because they are equi-joins.
>
> Regarding the transition, it sounds like what you're proposing is that
> we would say, "adding a foreign-key join to your topology requires a
> full application reset (or a new application id)". This is also an
> acceptable constraint to place on the feature, but not strictly
> necessary. Since 2.3, it's now possible to give all the state in your
> application stable names. This means that it's no longer true that
> adding a node to your topology graph would break its structure, and it
> does become possible to add new operators and simply restart the app.
> Revisiting my prior thought, though, I think the problem is not
> specific to your feature. For example, adding a new grouped
> aggregation would produce a new repartition topic, but the repartition
> topic partitions might get assigned to old nodes in the middle of a
> rolling bounce, and they would need to just ignore them. This
> requirement is the same for the repartition topics in the FK join, so
> it's orthogonal to your design.
>
> Back to the first concern, though, I'm not sure I followed the
> explanation. As a thought experiment, let's imagine that Joe hadn't
> taken the time to experiment with your feature branch. We wouldn't
> have noticed the problem until the feature was already released in
> 2.4. So the wire protocol on that PK->FK subscription topic would have
> been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the
> problem once they picked up the feature, so we would want to implement
> your proposed fix and change the wire protocol to V2:
> "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we
> would see both 2.4 nodes encountering V2 messages and 2.5 nodes
> encountering V1 messages. How can they both detect that they are
> attempting to process a newer or older protocol? If they can detect
> it, then what should they do?
>
> From experience, there are two basic solutions to this problem:
>
> STRATEGY1. Add a protocol version to the message (could be a number at
> the start of the message payload, or it could be a number in the
> message headers, not sure if it matters much. Payload is probably more
> compact, since the header would need a name.) In this case, the 2.4
> worker would know that it's max protocol version is V1, and when it
> sees the V2 message, it knows that it can't handle it properly. Rather
> than doing something wrong, it would just not do anything. This means
> it would stop the task, if not shut down the whole instance. On the
> other hand, a 2.5 worker would have some defined logic for how to
> handle all versions (V1 and V2), so once the upgrade is complete, all
> messages can be processed.
>
> STRATEGY2. Make the schema forward-compatible. Basically, we ensure
> that new fields can only be appended to the message schema, and that
> older workers using only a prefix of the full message would still
> behave correctly. Using the example above, we'd instead evolve the
> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set
> the boolean field to true for the "new" foreign key. Then, 2.4 workers
> encountering the a "new FK" message would just see the prefix of the
> payload that makes sense to them, and they would still continue
> processing the messages as they always have. Only after the 2.5 code
> is fully rolled out to the cluster would we be sure to see the desired
> behavior. Note: in the reverse case, a 2.5 worker knows how to fully
> parse the new message format, even if it plans to ignore the BOOLEAN
> field.
>
> There are some tradeoffs between these strategies: STRATEGY1 ensures
> that all messages are only handled by workers that can properly handle
> them, although it results in processing stalls while there are still
> old nodes in the cluster. STRATEGY2 ensures that all messages can be
> processed by all nodes, so there are no stalls, but we can never
> remove fields from the message, so if there are a lot of revisions in
> the future, the payloads will become bloated. Also, it's not clear
> that you can actually pull off STRATEGY2 in all cases. If there's some
> new kind of message you want to send that has no way to be correctly
> processed at all under the 2.4 code paths, the prefix thing simply
> doesn't work. Etc.
>
> Also, note that you can modify the above strategies by instead
> designing the message fields for extensibility. E.g., if you make the
> instructions field an enum, then you can make sure that the default
> case is handled sensibly (probably similarly to STRATEGY1, just choke
> on unknown instructions) and that you never remove an instruction type
> from the enum in future versions.
>
> Does this make sense?
> -John
>
>
>
>
> PS:
> We can define null keys for streaming tables, but it's tricky.
>
> Specifically, you'd want to define some concept of null keys that
> allows all null keys to be unique, but _also_ to have a fixed
> identity, so that a particular null-key can be updated later. One
> example could be to union the existing keyspace with a new
> null-keyspace, where normal keys are like "key" and null-keys are like
> "null(identity)". Then given a query like
> "KTable<String,Integer>.rightJoin(KTable<Integer,Boolean>)", and
> inputs like:
> LHS:
> "a": 1
> "b": 2
>
> RHS:
> 1: true
> 3: false
>
> a full outer join would produce:
> "a": (1, true)
> "b": (2, null)
> null(3): (null, false)
>
> which can be correctly updated later if we get an update on the LHS:
> PUT("c": 3)
>
> We'd emit for the results:
> DELETE(null(e))
> EMIT("c": (3, false))
>
> Resulting in the correct result table of:
> "a": (1, true)
> "b": (2, null)
> "c": (3, false)
>
> As mentioned, this is tricky, and I would avoid it until we have
> evidence that it's actually useful to cover this part of the design
> space. Certainly, it would be a separate KIP if it came to that.
>
> On Wed, Jun 26, 2019 at 8:57 PM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
> >
> > Hi John
> >
> > Good thinking with regards to upgrade path between versions regarding
> > over-the-wire instructions in SubscriptionWrapper. At this point in time
> I
> > can't think of any new wire message instructions, but I would appreciate
> as
> > many eyes on it as possible. I have just included the LEFT join in the
> last
> > commit (about 10 min ago) along with INNER join. I do not think that
> RIGHT
> > join and OUTER are possible given that there is no LHS key available, so
> > LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. This
> is
> > in contrast to the current LHSTable.outerJoin(RHSTable), as they are both
> > keyed on the same key. I have buffed up the Integration tests and have
> > tried to make them more readable to ensure that we're covering all the
> > scenarios. I think that if we can get more eyes on the workflow showing
> the
> > various LHS and RHS events and outputs then that may help us validate
> that
> > we have all the scenarios covered.
> >
> > With regards to the 2.3->2.4 scenario you described, I'm not entirely
> sure
> > I follow. If they want to add a FK-join, they will need to rework their
> > code in the KStreams app and make a new release, since the underlying
> > topology would be different and new internal topics would need to be
> > created. In other words, I don't think a rolling upgrade where the user
> > introduces a FK join would be possible since their topology would
> > necessitate a full KStreams reset. Is this what you meant?
> >
> >
> >
> > On Wed, Jun 26, 2019 at 4:10 PM John Roesler <j...@confluent.io> wrote:
> >
> > > Thanks, Adam!
> > >
> > > One unrelated thought that has just now occurred to me is that (unlike
> > > the equi-joins we currently have), this join logic is potentially
> > > spread over multiple Streams instances, which in general means that
> > > the instances may be running different versions of Kafka Streams.
> > >
> > > This means that if we discover a bug that requires us to again change
> > > the wire message (as you did in this proposal update), we need to
> > > consider what should happen if the PK instance is newer than the FK
> > > instance, or vice-versa, during a rolling upgrade. We should think
> > > ahead to this condition and make sure the logic is forward compatible.
> > >
> > > Related: what about the initial case, when we release this feature
> > > (let's say in 2.4)? What will happen if I decide to adopt 2.4 and add
> > > a FK join together in one upgrade. Thus, the 2.4 member of the cluster
> > > is producing the SubscriptionWrapper messages, and some 2.3 members
> > > get the subscription topic assigned to them, but they have no idea
> > > what to do with it? I'm not sure this is a problem; hopefully they
> > > just do nothing. If it is a problem, it would be fine to say you have
> > > to upgrade completely to 2.4 before deploying a FK join.
> > >
> > > Just want to make sure we anticipate these issues in case it affects
> > > the design at all.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare <
> adam.bellem...@gmail.com>
> > > wrote:
> > > >
> > > > Sigh... Forgot the link:
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=78&selectedPageVersions=74
> > > >
> > > > I'll update it when I validate that there are no issues with
> removing the
> > > > SubscriptionResponseWrapper boolean.
> > > >
> > > > On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare <
> adam.bellem...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > >Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> > > > > Done.
> > > > >
> > > > > > if you update the KIP, you might want to send a new "diff link"
> to
> > > this
> > > > > thread
> > > > > Here it is:
> > > > >
> > > > > > Looking closely at the proposal, can you explain more about the
> > > > > propagateIfNull field in SubscriptionResponseWrapper? It sort of
> looks
> > > like
> > > > > it's always going to be equal to (RHS-result != null).
> > > > > I believe you are correct, and I missed the forest for the trees.
> They
> > > are
> > > > > effectively the same thing, and I can simply remove the flag. I
> will
> > > code
> > > > > it up and try it out locally just to be sure.
> > > > >
> > > > > Thanks again for your help, it is greatly appreciated!
> > > > >
> > > > > On Wed, Jun 26, 2019 at 2:54 PM John Roesler <j...@confluent.io>
> > > wrote:
> > > > >
> > > > >> I think the "scenario trace" is very nice, but has one point that
> I
> > > > >> found confusing:
> > > > >>
> > > > >> You indicate a retraction in the join output as (k,null) and a
> join
> > > > >> result as (k, leftval, rightval), but confusingly, you also write
> a
> > > > >> join result as (k, JoinResult) when one side is null. Maybe just
> call
> > > > >> it as (k, leftval, null) or (k, null, rightval)? That way the
> readers
> > > > >> can more easily determine if the results meet their expectations
> for
> > > > >> each join type.
> > > > >>
> > > > >> (procedural note: if you update the KIP, you might want to send a
> new
> > > > >> "diff link" to this thread, since the one I posted at the
> beginning
> > > > >> would not automatically show your latest changes)
> > > > >>
> > > > >> I was initially concerned that the proposed algorithm would wind
> up
> > > > >> propagating something that looks like a left join (k, leftval,
> null)
> > > > >> under the case that Joe pointed out, but after reviewing your
> > > > >> scenario, I see that it will emit a tombstone (k, null) instead.
> This
> > > > >> is appropriate, and unavoidable, since we have to retract the join
> > > > >> result from the logical view (the join result is a logical Table).
> > > > >>
> > > > >> Looking closely at the proposal, can you explain more about the
> > > > >> propagateIfNull field in SubscriptionResponseWrapper?
> > > > >> It sort of looks like it's always going to be equal to
> (RHS-result !=
> > > > >> null).
> > > > >>
> > > > >> In other words, can we drop that field and just send back
> RHS-result
> > > > >> or null, and then handle it on the left-hand side like:
> > > > >> if (rhsOriginalValueHash doesn't match) {
> > > > >>     emit nothing, just drop the update
> > > > >> } else if (joinType==inner && rhsValue == null) {
> > > > >>     emit tombstone
> > > > >> } else {
> > > > >>     emit joiner(lhsValue, rhsValue)
> > > > >> }
> > > > >>
> > > > >> To your concern about emitting extra tombstones, personally, I
> think
> > > > >> it's fine. Clearly, we should try to avoid unnecessary
> tombstones, but
> > > > >> all things considered, it's not harmful to emit some unnecessary
> > > > >> tombstones: their payload is small, and they are trivial to handle
> > > > >> downstream. If users want to, they can materialize the join
> result to
> > > > >> suppress any extra tombstones, so there's a way out.
> > > > >>
> > > > >> Thanks for the awesome idea. It's better than what I was thinking.
> > > > >> -john
> > > > >>
> > > > >> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
> > > > >> <adam.bellem...@gmail.com> wrote:
> > > > >> >
> > > > >> > Thanks John.
> > > > >> >
> > > > >> > I'm looking forward to any feedback on this. In the meantime I
> will
> > > > >> work on
> > > > >> > the unit tests to ensure that we have well-defined and readable
> > > > >> coverage.
> > > > >> >
> > > > >> > At the moment I cannot see a way around emitting (k,null)
> whenever
> > > we
> > > > >> emit
> > > > >> > an event that lacks a matching foreign key on the RHS, except
> in the
> > > > >> > (k,null) -> (k,fk) case.
> > > > >> > If this LHS oldValue=null, we know we would have emitted a
> deletion
> > > and
> > > > >> so
> > > > >> > (k,null) would be emitted out of the join. In this case we don't
> > > need to
> > > > >> > send another null.
> > > > >> >
> > > > >> > Adam
> > > > >> >
> > > > >> > On Wed, Jun 26, 2019 at 11:53 AM John Roesler <
> j...@confluent.io>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Hi Adam,
> > > > >> > >
> > > > >> > > Thanks for the proposed revision to your KIP
> > > > >> > > (
> > > > >> > >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=77&selectedPageVersions=74
> > > > >> > > )
> > > > >> > >
> > > > >> > > in response to the concern pointed out during code review
> > > > >> > > (
> https://github.com/apache/kafka/pull/5527#issuecomment-505137962
> > > )
> > > > >> > >
> > > > >> > > We should have a brief discussion thread (here) in the mailing
> > > list to
> > > > >> > > make sure everyone who wants to gets a chance to consider the
> > > > >> > > modification to the design.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > > -John
> > > > >> > >
> > > > >>
> > > > >
> > >
>

Reply via email to