Re: Low-latency, high message size variance

2015-12-16 Thread Ismael Juma
Jason, maybe useful to KAFKA-2986 with this information if we ever decide
to do this?

Ismael
On 16 Dec 2015 04:42, "Jason Gustafson"  wrote:

> I was talking with Jay this afternoon about this use case. The tricky thing
> about adding a ping() or heartbeat() API is that you have to deal with the
> potential for rebalancing. This means either allowing it to block while a
> rebalance completes or having it raise an exception indicating that a
> rebalance is needed. In code, the latter might look like this:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   try {
> for (ConsumerRecord record : records) {
>   process(record);
>   consumer.heartbeat();
> }
>   } catch (RebalanceException e){
> continue;
>   }
> }
>
> Unfortunately, this wouldn't work with auto-commit since it would tend to
> break message processing early which would let the committed position get
> ahead of the last offset processed. The alternative blocking approach
> wouldn't be any better in this regard. Overall, it seems like this might
> introduce a bigger problem than it solves.
>
> Perhaps the simpler solution is to provide a way to set the maximum number
> of messages returned. This could either be a new configuration option or a
> second argument in poll, but it would let you handle messages one-by-one if
> you needed to. You'd then be able to set the session timeout according to
> the expected time to handle a single message. It'd be a bit more work to
> implement this, but if the use case is common enough, it might be
> worthwhile.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
> wrote:
>
> > Hey Jens,
> >
> > The purpose of pause() is to stop fetches for a set of partitions. This
> > lets you continue calling poll() to send heartbeats. Also note that
> poll()
> > generally only blocks for rebalances. In code, something like this is
> what
> > I was thinking:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   if (queue.offer(records))
> > continue;
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!queue.offer(records, heartbeatIntervalMs,
> TimeUnit.MILLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> > }
> >
> > The tricky thing is handling rebalances since they might occur in either
> > call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> > current offsets, and 3) maybe break from the inner poll loop. If the
> > processing thread is busy when the rebalance is triggered, then you may
> > have to discard the results when it's finished. It's also a little
> > difficult communicating completion to the poll loop, which is where the
> > offset commit needs to take place. I suppose another queue would work,
> > sigh.
> >
> > Well, I think you can make that work, but I tend to agree that it's
> pretty
> > complicated. Perhaps instead of a queue, you should just submit the
> > processor to an executor service for each record set returned and await
> its
> > completion directly. For example:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> >   consumer.commitSync();
> > }
> >
> > This seems closer to the spirit of the poll loop, and it makes handling
> > commits a lot easier. You still have to deal with the rebalance problem,
> > but at least you don't have to deal with the queue. It's still a little
> > complex though. Maybe the consumer needs a ping() API which does the same
> > thing as poll() but doesn't send or return any fetches. That would
> simplify
> > things a little more:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.ping();
> >   consumer.commitSync();
> > }
> >
> > Anyway, I'll think about it a little more and see if any other approaches
> > come to mind. I do agree that we should have a way to handle this case
> > without too much extra work.
> >
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil 
> wrote:
> >
> >> Hi Jason,
> >>
> >> Thanks for your response. See replies inline:
> >>
> >> On Tuesday, December 15, 2015, Jason Gustafson 
> >> wrote:
> >>
> >> > Hey Jens,
> >> >
> >> > I'm not sure I understand why increasing the session timeout is not an
> >> > option. Is the issue that there's too much uncertainly about
> processing
> >> > time to set an upper bound for each round of the poll loop?
> >> >
> >>
> >> Yes, that's the issue.
> >>
> >>
> >> > One general wor

Consumer lag JMX in 0.9

2015-12-16 Thread Marko Bonaći
Hi,
there was a problem with JMX consumer lag in 0.8:
http://search-hadoop.com/m/uyzND14v72215XZpK&subj=Re+Consumer+lag+lies+orphaned+offsets+

Has anything changed now with 0.9?

Thanks


kafka-connect-jdbc: ids, timestamps, and transactions

2015-12-16 Thread Mark Drago
I had asked this in a github issue but I'm reposting here to try and get an
answer from a wider audience.

Has any thought gone into how kafka-connect-jdbc will be impacted by SQL
transactions committing IDs and timestamps out-of-order?  Let me give an
example with two connections.

1: begin transaction
1: insert (get id 1)
2: begin transaction
2: insert (get id 2)
2: commit (recording id 2)
kafka-connect-jdbc runs and thinks it has handled everything through id 2
1: commit (recording id 1)

This would result in kafka-connect-jdbc missing id 1. The same thing could
happen with timestamps. I've read through some of the kafka-connect-jdbc
code and I think it may be susceptible to this problem, but I haven't run
it or verified that it would be an issue. Has this come up before? Are
there plans to deal with this situation?

Obviously something like bottled-water for postgresql would handle this
nicely as it would get the changes once they're committed.


Thanks for any insight,

Mark.


Original github issue:
https://github.com/confluentinc/kafka-connect-jdbc/issues/27


reassign __consumer_offsets partitions

2015-12-16 Thread Damian Guy
Hi,


We have had some temporary nodes in our kafka cluster and i now need to
move assigned partitions off of those nodes onto the permanent members. I'm
familiar with the kafka-reassign-partitions script, but ... How do i get it
to work with the __consumer_offsets partition? It currently seems to ignore
it.

Thanks,
Damian


Re: Low-latency, high message size variance

2015-12-16 Thread Jens Rantil
Hi again Jason,

Sorry for a bit of a late response - I'm travelling and check my e-mail
spuriously.

I have a specific question regarding they pause solution quoted below:

On Tuesday, December 15, 2015, Jason Gustafson  wrote:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   if (queue.offer(records))
> continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
> }
>

As far as I've understood, the `KafkaConsumer` has a background thread that
fetches records, right? If so, isn't there a race condition between the
`consumer.poll(1000);` call and `consumer.pause(assignment);` where the
fetcher might fetch, and commit, messages that I then collect on my first
`consumer.poll(0);` call? Since `consumer.poll(0);` then would return a
non-empty list, I would essentially ignoring messages? Or is the pause()
call both 1) making sure consumer#poll never returns anything _and_ 2)
pauses the background fetcher?

Cheers,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Low-latency, high message size variance

2015-12-16 Thread Jens Rantil
Hi again Jason,

Once again thanks for your answer.

Yes, the more I think/read about this it sounds like the "max records"
approach is more viable. Without knowing the code, I guess it would make
more sense to create a "max.partition.fetch.messages" property. That way a
consumer could optimize for quick fetch on startup instead of per-poll()
call. And if a consumer really would like to change the number of messages
realtime, they could simply close the consumer and restart it.

I spent 45 minutes trying to set up a development environment to have a
look at the Kafka code and maybe submit a pull request for this. Do you
think this would be hard to implement? Would introducing this need a larger
consensus/discussion in KAFKA-2986?

Last, but not least, I'm happy to hear that this case is something that
Kafka should handle. I've reviewed many queueing solutions really seem like
the absolute best solution to our problem as long we can overcome this
issue.

Thanks,
Jens

On Tuesday, December 15, 2015, Jason Gustafson > wrote:

> I was talking with Jay this afternoon about this use case. The tricky thing
> about adding a ping() or heartbeat() API is that you have to deal with the
> potential for rebalancing. This means either allowing it to block while a
> rebalance completes or having it raise an exception indicating that a
> rebalance is needed. In code, the latter might look like this:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   try {
> for (ConsumerRecord record : records) {
>   process(record);
>   consumer.heartbeat();
> }
>   } catch (RebalanceException e){
> continue;
>   }
> }
>
> Unfortunately, this wouldn't work with auto-commit since it would tend to
> break message processing early which would let the committed position get
> ahead of the last offset processed. The alternative blocking approach
> wouldn't be any better in this regard. Overall, it seems like this might
> introduce a bigger problem than it solves.
>
> Perhaps the simpler solution is to provide a way to set the maximum number
> of messages returned. This could either be a new configuration option or a
> second argument in poll, but it would let you handle messages one-by-one if
> you needed to. You'd then be able to set the session timeout according to
> the expected time to handle a single message. It'd be a bit more work to
> implement this, but if the use case is common enough, it might be
> worthwhile.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
> wrote:
>
> > Hey Jens,
> >
> > The purpose of pause() is to stop fetches for a set of partitions. This
> > lets you continue calling poll() to send heartbeats. Also note that
> poll()
> > generally only blocks for rebalances. In code, something like this is
> what
> > I was thinking:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   if (queue.offer(records))
> > continue;
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!queue.offer(records, heartbeatIntervalMs,
> TimeUnit.MILLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> > }
> >
> > The tricky thing is handling rebalances since they might occur in either
> > call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> > current offsets, and 3) maybe break from the inner poll loop. If the
> > processing thread is busy when the rebalance is triggered, then you may
> > have to discard the results when it's finished. It's also a little
> > difficult communicating completion to the poll loop, which is where the
> > offset commit needs to take place. I suppose another queue would work,
> > sigh.
> >
> > Well, I think you can make that work, but I tend to agree that it's
> pretty
> > complicated. Perhaps instead of a queue, you should just submit the
> > processor to an executor service for each record set returned and await
> its
> > completion directly. For example:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> >   consumer.commitSync();
> > }
> >
> > This seems closer to the spirit of the poll loop, and it makes handling
> > commits a lot easier. You still have to deal with the rebalance problem,
> > but at least you don't have to deal with the queue. It's still a little
> > complex though. Maybe the consumer needs a ping() API which does the same
> > thing as poll() but doesn't send or return any fetches. That would
> simplify
> > things a little more:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(

Re: kafka-connect-jdbc: ids, timestamps, and transactions

2015-12-16 Thread Ewen Cheslack-Postava
Mark,

There are definitely limitations to using JDBC for change data capture.
Using a database-specific implementation, especially if you can read
directly off the database's log, will be able to handle more situations
like this. Cases like the one you describe are difficult to address
efficiently working only with simple queries.

The JDBC connector offers a few different modes for handling incremental
queries. One of them uses both a timestamp and a unique ID, which will be
more robust to issues like these. However, even with both, you can still
come up with variants that can cause issues like the one you describe. You
also have the option of using a custom query which might help if you can do
something smarter by making assumptions about your table, but for now
that's pretty limited for constructing incremental queries since the
connector doesn't provide a way to track offset columns with custom
queries. I'd like to improve the support for this in the future, but at
some point it starts making sense to look at database-specific connectors.

(By the way, this gets even messier once you start thinking about the
variety of different isolation levels people may be using...)

-Ewen

P.S. Where to ask these questions is a bit confusing since Connect is part
of Kafka. In general, for specific connectors I'd suggest asking on the
corresponding mailing list for the project, which in the case of the JDBC
connector would be the Confluent Platform mailing list here:
https://groups.google.com/forum/#!forum/confluent-platform

On Wed, Dec 16, 2015 at 5:27 AM, Mark Drago  wrote:

> I had asked this in a github issue but I'm reposting here to try and get an
> answer from a wider audience.
>
> Has any thought gone into how kafka-connect-jdbc will be impacted by SQL
> transactions committing IDs and timestamps out-of-order?  Let me give an
> example with two connections.
>
> 1: begin transaction
> 1: insert (get id 1)
> 2: begin transaction
> 2: insert (get id 2)
> 2: commit (recording id 2)
> kafka-connect-jdbc runs and thinks it has handled everything through id 2
> 1: commit (recording id 1)
>
> This would result in kafka-connect-jdbc missing id 1. The same thing could
> happen with timestamps. I've read through some of the kafka-connect-jdbc
> code and I think it may be susceptible to this problem, but I haven't run
> it or verified that it would be an issue. Has this come up before? Are
> there plans to deal with this situation?
>
> Obviously something like bottled-water for postgresql would handle this
> nicely as it would get the changes once they're committed.
>
>
> Thanks for any insight,
>
> Mark.
>
>
> Original github issue:
> https://github.com/confluentinc/kafka-connect-jdbc/issues/27
>



-- 
Thanks,
Ewen


failed with LeaderNotAvailableError -

2015-12-16 Thread David Montgomery
Hi,

I am very concerned about using kafka in production given the below
errors:

Now issues with myt zookeeper.  Other services use ZK.  Only kafka fails.
I have 2 kafka servers using 8.x.  How do I resolve?  I tried restarting
services for kafka.  Below is my kafka server.properties file

'Traceback (most recent call last):
  File
"/usr/local/lib/python2.7/dist-packages/gevent-1.1b6-py2.7-linux-x86_64.egg/gevent/greenlet.py",
line 523, in run
result = self._run(*self.args, **self.kwargs)
  File "/var/feed-server/ad-server/pixel-server.py", line 145, in
send_kafka_message
res = producer.send_messages(topic, message)
  File "build/bdist.linux-x86_64/egg/kafka/producer/simple.py", line 52, in
send_messages
partition = self._next_partition(topic)
  File "build/bdist.linux-x86_64/egg/kafka/producer/simple.py", line 36, in
_next_partition
self.client.load_metadata_for_topics(topic)
  File "build/bdist.linux-x86_64/egg/kafka/client.py", line 383, in
load_metadata_for_topics
kafka.common.check_error(topic_metadata)
  File "build/bdist.linux-x86_64/egg/kafka/common.py", line 233, in
check_error
raise error_class(response)
LeaderNotAvailableError: TopicMetadata(topic='topic-test-production',
error=5, partitions=[])
 failed with LeaderNotAvailableError








# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=<%=@broker_id%>
advertised.host.name=<%=@ipaddress%>
advertised.port=9092
# Socket Server Settings
#

# The port the socket server listens on
port=9092

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the
value returned from
# from java.net.InetAddress.getCanonicalHostName().
host.name=<%=@ipaddress%>

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
num.io.threads=2

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The number of logical partitions per topic per server. More partitions
allow greater parallelism
# for consumption, but also mean more files.
num.partitions=2

# Log Flush Policy #

# The following configurations control the flush of data to disk. This is
among the most
# important performance knob in kafka.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using
replication.
#2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation,
and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a
flush
log.flush.interval.ms=1000

# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

# Log Retention Policy
#

# The following configurations control the disposal of log segments. The
policy can
# be set to delete segments after a period of time, or after a given size
has accumulated.
# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.cleanup.interval.mins=1

# Zookeeper #

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separat