Sorry for coming back at this so late.


On 11.12.2017 07:12, Colin McCabe wrote:
On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
Hi,

sorry for the late reply, busy times :-/

I would ask you one thing maybe. Since the timeout
argument seems to be settled I have no further argument
form your side except the "i don't want to".

Can you see that connection.max.idle.max is the exact time
that expresses "We expect the client to be away for this long,
and come back and continue"?
Hi Jan,

Sure, connection.max.idle.max is the exact time that we want to keep
around a TCP session.  TCP sessions are relatively cheap, so we can
afford to keep them around for 10 minutes by default.  Incremental fetch
state is less cheap, so we want to set a shorter timeout for it.  We
also want new TCP sessions to be able to reuse an existing incremental
fetch session rather than creating a new one and waiting for the old one
to time out.

also clarified some stuff inline

Best Jan




On 05.12.2017 23:14, Colin McCabe wrote:
On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
Hi Colin

Addressing the topic of how to manage slots from the other thread.
With tcp connections all this comes for free essentially.
Hi Jan,

I don't think that it's accurate to say that cache management "comes for
free" by coupling the incremental fetch session with the TCP session.
When a new TCP session is started by a fetch request, you still have to
decide whether to grant that request an incremental fetch session or
not.  If your answer is that you always grant the request, I would argue
that you do not have cache management.
First I would say, the client has a big say in this. If the client
is not going to issue incremental he shouldn't ask for a cache
when the client ask for the cache we still have all options to deny.
To put it simply, we have to have some cache management above and beyond
just giving out an incremental fetch session to anyone who has a TCP
session.  Therefore, caching does not become simpler if you couple the
fetch session to the TCP session.
Simply giving out an fetch session for everyone with a connection is too simple, but I think it plays well into the idea of consumers choosing to use the feature therefore only enabling where it brings maximum gains (replicas,MirrorMakers)

I guess you could argue that timeouts are cache management, but I don't
find that argument persuasive.  Anyone could just create a lot of TCP
sessions and use a lot of resources, in that case.  So there is
essentially no limit on memory use.  In any case, TCP sessions don't
help us implement fetch session timeouts.
We still have all the options denying the request to keep the state.
What you want seems like a max connections / ip safeguard.
I can currently take down a broker with to many connections easily.


I still would argue we disable it by default and make a flag in the
broker to ask the leader to maintain the cache while replicating and also only
have it optional in consumers (default to off) so one can turn it on
where it really hurts.  MirrorMaker and audit consumers prominently.
I agree with Jason's point from earlier in the thread.  Adding extra
configuration knobs that aren't really necessary can harm usability.
Certainly asking people to manually turn on a feature "where it really
hurts" seems to fall in that category, when we could easily enable it
automatically for them.
This doesn't make much sense to me.
There are no tradeoffs to think about from the client's point of view:
it always wants an incremental fetch session.  So there is no benefit to
making the clients configure an extra setting.  Updating and managing
client configurations is also more difficult than managing broker
configurations for most users.

You also wanted to implement
a "turn of in case of bug"-knob. Having the client indicate if the
feauture will be used seems reasonable to me.,
True.  However, if there is a bug, we could also roll back the client,
so having this configuration knob is not strictly required.

Otherwise I left a few remarks in-line, which should help to understand
my view of the situation better

Best Jan


On 05.12.2017 08:06, Colin McCabe wrote:
On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
On 03.12.2017 21:55, Colin McCabe wrote:
On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
Thanks for the explanation, Colin. A few more questions.

The session epoch is not complex.  It's just a number which increments
on each incremental fetch.  The session epoch is also useful for
debugging-- it allows you to match up requests and responses when
looking at log files.
Currently each request in Kafka has a correlation id to help match the
requests and responses. Is epoch doing something differently?
Hi Becket,

The correlation ID is used within a single TCP session, to uniquely
associate a request with a response.  The correlation ID is not unique
(and has no meaning) outside the context of that single TCP session.

Keep in mind, NetworkClient is in charge of TCP sessions, and generally
tries to hide that information from the upper layers of the code.  So
when you submit a request to NetworkClient, you don't know if that
request creates a TCP session, or reuses an existing one.
Unfortunately, this doesn't work.  Imagine the client misses an
increment fetch response about a partition.  And then the partition is
never updated after that.  The client has no way to know about the
partition, since it won't be included in any future incremental fetch
responses.  And there are no offsets to compare, since the partition is
simply omitted from the response.
I am curious about in which situation would the follower miss a response
of a partition. If the entire FetchResponse is lost (e.g. timeout), the
follower would disconnect and retry. That will result in sending a full
FetchRequest.
Basically, you are proposing that we rely on TCP for reliable delivery
in a distributed system.  That isn't a good idea for a bunch of
different reasons.  First of all, TCP timeouts tend to be very long.  So
if the TCP session timing out is your error detection mechanism, you
have to wait minutes for messages to timeout.  Of course, we add a
timeout on top of that after which we declare the connection bad and
manually close it.  But just because the session is closed on one end
doesn't mean that the other end knows that it is closed.  So the leader
may have to wait quite a long time before TCP decides that yes,
connection X from the follower is dead and not coming back, even though
gremlins ate the FIN packet which the follower attempted to translate.
If the cache state is tied to that TCP session, we have to keep that
cache around for a much longer time than we should.
Hi,

I see this from a different perspective. The cache expiry time
has the same semantic as idle connection time in this scenario.
It is the time range we expect the client to come back an reuse
its broker side state. I would argue that on close we would get an
extra shot at cleaning up the session state early. As opposed to
always wait for that duration for expiry to happen.
Hi Jan,

The idea here is that the incremental fetch cache expiry time can be
much shorter than the TCP session timeout.  In general the TCP session
timeout is common to all TCP connections, and very long.  To make these
numbers a little more concrete, the TCP session timeout is often
configured to be 2 hours on Linux.  (See
https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html
)  The timeout I was proposing for incremental fetch sessions was one or
two minutes at most.
Currently this is taken care of by
connections.max.idle.ms on the broker and defaults to something of few
minutes.
It is 10 minutes by default, which is longer than what we want the
incremental fetch session timeout to be.  There's no reason to couple
these two things.

Also something we could let the client change if we really wanted to.
So there is no need to worry about coupling our implementation to some
timeouts given by the OS, with TCP one always has full control over the worst
times + one gets the extra shot cleaning up early when the close comes through.
Which is the majority of the cases.
In the majority of cases, the TCP session will be re-established.  In
that case, we have to send a full fetch request rather than an
incremental fetch request.
I actually have a hard time believing this. Do you have any numbers of
any existing production system? Is it the virtualisation layer cutting
all the connections?
We see this only on application crashes and restarts where the app needs
todo the full anyways
as it probably continues with stores offsets.
Yes, TCP connections get dropped.  It happens very often in production
clusters, actually.  When I was working on Hadoop, one of the most
common questions I heard from newcomers was "why do I see so many
EOFException messages in the logs"?  The other thing that happens a lot
is DNS outages or slowness.  Public clouds seem to have even more
unstable networks than the on-premise clusters.  I am not sure why that
is.
Hadoop has a wiki page on exactly this
https://wiki.apache.org/hadoop/EOFException

besides user errors they have servers crashing and actually loss of connection high on their list. In the case of "server goes away" the cache goes with it. So nothing to argue about the cache beeing reused by
a new connection.

Can you make an argument at which point the epoch would be updated broker side to maximise re-usage of the cache on lost connections. In many cases the epoch would go out of sync and we would need a full fetch anyways. Am I mistaken here?





Secondly, from a software engineering perspective, it's not a good idea
to try to tightly tie together TCP and our code.  We would have to
rework how we interact with NetworkClient so that we are aware of things
like TCP sessions closing or opening.  We would have to be careful
preserve the ordering of incoming messages when doing things like
putting incoming requests on to a queue to be processed by multiple
threads.  It's just a lot of complexity to add, and there's no upside.
I see the point here. And I had a small chat with Dong Lin already
making me aware of this. I tried out the approaches and propose the
following:

The client start and does a full fetch. It then does incremental fetches.
The connection to the broker dies and is re-established by NetworkClient
under the hood.
The broker sees an incremental fetch without having state => returns
error:
Client sees the error, does a full fetch and goes back to incrementally
fetching.

having this 1 additional error round trip is essentially the same as
when something
with the sessions or epoch changed unexpectedly to the client (say
expiry).

So its nothing extra added but the conditions are easier to evaluate.
Especially since we do everything with NetworkClient. Other implementers
on the
protocol are free to optimizes this and do not do the errornours
roundtrip on the
new connection.
Its a great plus that the client can know when the error is gonna
happen. instead of
the server to always have to report back if something changes
unexpectedly for the client
You are assuming that the leader and the follower agree that the TCP
session drops at the same time.  When there are network problems, this
may not be true.  The leader may still think the previous TCP session is
active.  In that case, we have to keep the incremental fetch session
state around until we learn otherwise (which could be up to that 2 hour
timeout I mentioned).  And if we get a new incoming incremental fetch
request, we can't assume that it replaces the previous one, because the
IDs will be different (the new one starts a new session).
As mentioned, no reason to fear some time-outs out of our control
Imagine that I made an argument that client IDs are "complex" and should
be removed from our APIs.  After all, we can just look at the remote IP
address and TCP port of each connection.  Would you think that was a
good idea?  The client ID is useful when looking at logs.  For example,
if a rebalance is having problems, you want to know what clients were
having a problem.  So having the client ID field to guide you is
actually much less "complex" in practice than not having an ID.
I still cant follow why the correlation idea will not help here.
Correlating logs with it usually works great. Even with primitive tools
like grep
The correlation ID does help somewhat, but certainly not as much as a
unique 64-bit ID.  The correlation ID is not unique in the broker, just
unique to a single NetworkClient.  Simiarly, the correlation ID is not
unique on the client side, if there are multiple Consumers, etc.
Can always bump entropy in correlation IDs, never had a problem
of finding to many duplicates. Would be a different KIP though.
Similarly, if metadata responses had epoch numbers (simple incrementing
numbers), we would not have to debug problems like clients accidentally
getting old metadata from servers that had been partitioned off from the
network for a while.  Clients would know the difference between old and
new metadata.  So putting epochs in to the metadata request is much less
"complex" operationally, even though it's an extra field in the request.
     This has been discussed before on the mailing list.

So I think the bottom line for me is that having the session ID and
session epoch, while it adds two extra fields, reduces operational
complexity and increases debuggability.  It avoids tightly coupling us
to assumptions about reliable ordered delivery which tend to be violated
in practice in multiple layers of the stack.  Finally, it  avoids the
necessity of refactoring NetworkClient.
So there is stacks out there that violate TCP guarantees? And software
still works? How can this be? Can you elaborate a little where this
can be violated? I am not very familiar with virtualized environments
but they can't really violate TCP contracts.
TCP's guarantees of reliable, in-order transmission certainly can be
violated.  For example, I once had to debug a cluster where a certain
node had a network card which corrupted its transmissions occasionally.
With all the layers of checksums, you would think that this was not
possible, but it happened.  We occasionally got corrupted data written
to disk on the other end because of it.  Even more frustrating, the data
was not corrupted on disk on the sending node-- it was a bug in the
network card driver that was injecting the errors.
true, but your broker might aswell read a corrupted 600GB as size from
the network and die with OOM instantly.
If you read 600 GB as the size from the network, you will not "die with
OOM instantly."  That would be a bug.  Instead, you will notice that 600
GB is greater than max.message.bytes, and close the connection.
We only check max.message.bytes to late to guard against consumer
stalling.
we dont have a notion of max.networkpacket.size before we allocate the
bytebuffer to read it into.
"network packets" are not the same thing as "kafka RPCs."  One Kafka RPC
could take up mutiple ethernet packets.

Also, max.message.bytes has nothing to do with "consumer stalling" --
you are probably thinking about some of the fetch request
configurations.  max.message.bytes is used by the RPC system to figure
out whether to read the full incoming RP
Whoops, this is incorrect.  I was thinking about
"socket.request.max.bytes" rather than "max.message.bytes."  Sorry about
that.  See Ismael's email as well.

best,
Colin

best,
Colin

Optimizing for still having functional
software under this circumstances is not reasonable.
You want to get rid of such a
node ASAP and pray that zookeepers ticks get corrupted often enough
that it finally drops out of the cluster.

There is a good reason that these kinda things
https://issues.apache.org/jira/browse/MESOS-4105
don't end up as kafka Jiras. In the end you can't run any software in
these containers anymore. Application layer checksums are a neat thing to
fail fast but trying to cope with this probably causes more bad than
good.  So I would argue that we shouldn't try this for the fetch requests.
One of the goals of Apache Kafka is to be "a streaming platform...
[that] lets you store streams of records in a fault-tolerant way."  For
more information, see https://kafka.apache.org/intro .  Fault-tolerance
is explicitly part of the goal of Kafka.  Prayer should be optional, not
required, when running the software.
Yes, we need to fail ASAP when we read corrupted packages. It seemed
to me like you tried to make the case for pray and try to stay alive.
Fault
tolerance here means. I am a fishy box i am going to let a good box
handle
it and be silent until i get fixed up.
Crashing because someone sent you a bad packet is not reasonable
behavior.  It is a bug.  Similarly, bringing down the whole cluster,
which could a hundred nodes, because someone had a bad network adapter
is not reasonable behavior.  It is perhaps reasonable for the cluster to
perform worse when hardware is having problems.  But that's a different
discussion.
See above.
best,
Colin

However, my point was not about TCP's guarantees being violated.  My
point is that TCP's guarantees are only one small building block to
build a robust distributed system.  TCP basically just says that if you
get any bytes from the stream, you will get the ones that were sent by
the sender, in the order they were sent.  TCP does not guarantee that
the bytes you send will get there.  It does not guarantee that if you
close the connection, the other end will know about it in a timely
fashion.
These are very powerful grantees and since we use TCP we should
piggy pack everything that is reasonable on to it. IMO there is no
need to reimplement correct sequencing again if you get that from
your transport layer. It saves you the complexity, it makes
you application behave way more naturally and your api easier to
understand.

There is literally nothing the Kernel wont let you decide
especially not any timings. Only noticeable exception being TIME_WAIT
of usually 240 seconds but that already has little todo with the broker
itself and
if we are running out of usable ports because of this then expiring
fetch requests
wont help much anyways.

I hope I could strengthen the trust you have in userland TCP connection
management. It is really powerful and can be exploited for maximum gains
without much risk in my opinion.



It does not guarantee that the bytes will be received in a
certain timeframe, and certainly doesn't guarantee that if you send a
byte on connection X and then on connection Y, that the remote end will
read a byte on X before reading a byte on Y.
Noone expects this from two independent paths of any kind.

best,
Colin

Hope this made my view clearer, especially the first part.

Best Jan


best,
Colin


If there is an error such as NotLeaderForPartition is
returned for some partitions, the follower can always send a full
FetchRequest. Is there a scenario that only some of the partitions in a
FetchResponse is lost?

Thanks,

Jiangjie (Becket) Qin


On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<cmcc...@apache.org>  wrote:

On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<cmcc...@apache.org>
wrote:
On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
Hey Colin,

Thanks much for the update. I have a few questions below:

1. I am not very sure that we need Fetch Session Epoch. It seems that
Fetch
Session Epoch is only needed to help leader distinguish between "a
full
fetch request" and "a full fetch request and request a new
incremental
fetch session". Alternatively, follower can also indicate "a full
fetch
request and request a new incremental fetch session" by setting Fetch
Session ID to -1 without using Fetch Session Epoch. Does this make
sense?
Hi Dong,

The fetch session epoch is very important for ensuring correctness.  It
prevents corrupted or incomplete fetch data due to network reordering
or
loss.

For example, consider a scenario where the follower sends a fetch
request to the leader.  The leader responds, but the response is lost
because of network problems which affected the TCP session.  In that
case, the follower must establish a new TCP session and re-send the
incremental fetch request.  But the leader does not know that the
follower didn't receive the previous incremental fetch response.  It is
only the incremental fetch epoch which lets the leader know that it
needs to resend that data, and not data which comes afterwards.

You could construct similar scenarios with message reordering,
duplication, etc.  Basically, this is a stateful protocol on an
unreliable network, and you need to know whether the follower got the
previous data you sent before you move on.  And you need to handle
issues like duplicated or delayed requests.  These issues do not affect
the full fetch request, because it is not stateful-- any full fetch
request can be understood and properly responded to in isolation.

Thanks for the explanation. This makes sense. On the other hand I would
be interested in learning more about whether Becket's solution can help
simplify the protocol by not having the echo field and whether that is
worth doing.
Hi Dong,

I commented about this in the other thread.  A solution which doesn't
maintain session information doesn't work here.

2. It is said that Incremental FetchRequest will include partitions
whose
fetch offset or maximum number of fetch bytes has been changed. If
follower's logStartOffet of a partition has changed, should this
partition also be included in the next FetchRequest to the leader?
Otherwise, it
may affect the handling of DeleteRecordsRequest because leader may
not
know
the corresponding data has been deleted on the follower.
Yeah, the follower should include the partition if the logStartOffset
has changed.  That should be spelled out on the KIP.  Fixed.

3. In the section "Per-Partition Data", a partition is not considered
dirty if its log start offset has changed. Later in the section
"FetchRequest
Changes", it is said that incremental fetch responses will include a
partition if its logStartOffset has changed. It seems inconsistent.
Can
you update the KIP to clarify it?

In the "Per-Partition Data" section, it does say that logStartOffset
changes make a partition dirty, though, right?  The first bullet point
is:

* The LogCleaner deletes messages, and this changes the log start
offset
of the partition on the leader., or

Ah I see. I think I didn't notice this because statement assumes that the
LogStartOffset in the leader only changes due to LogCleaner. In fact the
LogStartOffset can change on the leader due to either log retention and
DeleteRecordsRequest. I haven't verified whether LogCleaner can change
LogStartOffset though. It may be a bit better to just say that a
partition is considered dirty if LogStartOffset changes.
I agree.  It should be straightforward to just resend the partition if
logStartOffset changes.

4. In "Fetch Session Caching" section, it is said that each broker
has a
limited number of slots. How is this number determined? Does this
require
a new broker config for this number?
Good point.  I added two broker configuration parameters to control
this
number.

I am curious to see whether we can avoid some of these new configs. For
example, incremental.fetch.session.cache.slots.per.broker is probably
not
necessary because if a leader knows that a FetchRequest comes from a
follower, we probably want the leader to always cache the information
from that follower. Does this make sense?
Yeah, maybe we can avoid having
incremental.fetch.session.cache.slots.per.broker.

Maybe we can discuss the config later after there is agreement on how the
protocol would look like.


What is the error code if broker does
not have new log for the incoming FetchRequest?
Hmm, is there a typo in this question?  Maybe you meant to ask what
happens if there is no new cache slot for the incoming FetchRequest?
That's not an error-- the incremental fetch session ID just gets set to
0, indicating no incremental fetch session was created.

Yeah there is a typo. You have answered my question.


5. Can you clarify what happens if follower adds a partition to the
ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader
needs to generate a new session for this ReplicaFetcherThread or
does it
re-use
the existing session?  If it uses a new session, is the old session
actively deleted from the slot?
The basic idea is that you can't make changes, except by sending a full
fetch request.  However, perhaps we can allow the client to re-use its
existing session ID.  If the client sets sessionId = id, epoch = 0, it
could re-initialize the session.

Yeah I agree with the basic idea. We probably want to understand more
detail about how this works later.
Sounds good.  I updated the KIP with this information.  A
re-initialization should be exactly the same as an initialization,
except that it reuses an existing ID.

best,
Colin


BTW, I think it may be useful if the KIP can include the example
workflow
of how this feature will be used in case of partition change and so
on.
Yeah, that might help.

best,
Colin

Thanks,
Dong


On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<cmcc...@apache.org>
wrote:

I updated the KIP with the ideas we've been discussing.

best,
Colin

On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
Hi Colin, thank you  for this KIP, it can become a really
useful
thing.
I just scanned through the discussion so far and wanted to
start a
thread to make as decision about keeping the
cache with the Connection / Session or having some sort of UUID
indN
exed
global Map.

Sorry if that has been settled already and I missed it. In this
case
could anyone point me to the discussion?
Hi Jan,

I don't think anyone has discussed the idea of tying the cache
to an
individual TCP session yet.  I agree that since the cache is
intended to
be used only by a single follower or client, it's an interesting
thing
to think about.

I guess the obvious disadvantage is that whenever your TCP
session
drops, you have to make a full fetch request rather than an
incremental
one.  It's not clear to me how often this happens in practice --
it
probably depends a lot on the quality of the network.  From a
code
perspective, it might also be a bit difficult to access data
associated
with the Session from classes like KafkaApis (although we could
refactor
it to make this easier).

It's also clear that even if we tie the cache to the session, we
still
have to have limits on the number of caches we're willing to
create.
And probably we should reserve some cache slots for each
follower, so
that clients don't take all of them.

Id rather see a protocol in which the client is hinting the
broker
that,
he is going to use the feature instead of a client
realizing that the broker just offered the feature (regardless
of
protocol version which should only indicate that the feature
would be usable).
Hmm.  I'm not sure what you mean by "hinting."  I do think that
the
server should have the option of not accepting incremental
requests
from
specific clients, in order to save memory space.

This seems to work better with a per
connection/session attached Metadata than with a Map and could
allow
for
easier client implementations.
It would also make Client-side code easier as there wouldn't
be any
Cache-miss error Messages to handle.
It is nice not to have to handle cache-miss responses, I agree.
However, TCP sessions aren't exposed to most of our client-side
code.
For example, when the Producer creates a message and hands it
off to
the
NetworkClient, the NC will transparently re-connect and re-send a
message if the first send failed.  The higher-level code will
not be
informed about whether the TCP session was re-established,
whether an
existing TCP session was used, and so on.  So overall I would
still
lean
towards not coupling this to the TCP session...

best,
Colin

      Thank you again for the KIP. And again, if this was clarified
already
please drop me a hint where I could read about it.

Best Jan





On 21.11.2017 22:02, Colin McCabe wrote:
Hi all,

I created a KIP to improve the scalability and latency of
FetchRequest:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
227%3A+Introduce+Incremental+FetchRequests+to+Increase+
Partition+Scalability
Please take a look.

cheers,
Colin

Reply via email to