[DISCUSS] 0.8.2 release branch, "unofficial" release candidates(s), 0.8.1.2 release

2014-09-03 Thread Jonathan Weeks

+1 on a 0.8.1.2 release as described.

I manually applied patches to cobble together a working gradle build for kafka 
for scala 2.11, but would really appreciate an official release — i.e. 0.8.1.2, 
as we also have other dependent libraries we use as well (e.g. akka-kafka) that 
would be much easier to migrate and support if the build was public and 
official.

There were at least several others on the “users” list that expressed interest 
in scala 2.11 support, who knows how many more “lurkers” are out there.

Best Regards,

-Jonathan

> Hey, I wanted to take a quick pulse to see if we are getting closer to a
> branch for 0.8.2.
> 
> 1) There still seems to be a lot of open issues
> https://issues.apache.org/jira/browse/KAFKA/fixforversion/12326167/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-issues-panel
> and our 30 day summary is showing issues: 51 created and *34* resolved and not
> sure how much of that we could really just decide to push off to 0.8.3 or
> 0.9.0 vs working on 0.8.2 as stable for release.  There is already so much
> goodness on trunk.  I appreciate the double commit pain especially as trunk
> and branch drift (ugh).
> 
> 2) Also, I wanted to float the idea of after making the 0.8.2 branch that I
> would do some unofficial release candidates for folks to test prior to
> release and vote.  What I was thinking was I would build, upload and stage
> like I was preparing artifacts for vote but let the community know to go in
> and "have at it" well prior to the vote release.  We don't get a lot of
> community votes during a release but issues after (which is natural because
> of how things are done).  I have seen four Apache projects doing this very
> successfully not only have they had less iterations of RC votes (sensitive
> to that myself) but the community kicked back issues they saw by giving
> them some "pre release" time to go through their own test and staging
> environments as the release are coming about.
> 
> 3) Checking again on "should we have a 0.8.1.2" release if folks in the
> community find important features (this might be best asked on the user
> list maybe not sure) they don't want/can't wait for which wouldn't be too
> much pain/dangerous to back port. Two things that spring to the top of my
> head are 2.11 Scala support and fixing the source jars.  Both of these are
> easy to patch personally I don't mind but want to gauge more from the
> community on this too.  I have heard gripes ad hoc from folks in direct
> communication but no complains really in the public forum and wanted to
> open the floor if folks had a need.
> 
> 4) 0.9 work I feel is being held up some (or at least resourcing it from my
> perspective).  We decided to hold up including SSL (even though we have a
> path for it). Jay did a nice update recently to the Security wiki which I
> think we should move forward with.  I have some more to add/change/update
> and want to start getting down to more details and getting specific people
> working on specific tasks but without knowing what we are doing when it is
> hard to manage.
> 
> 5) I just updated https://issues.apache.org/jira/browse/KAFKA-1555 I think
> it is a really important feature update doesn't have to be in 0.8.2 but we
> need consensus (no pun intended). It fundamentally allows for data in min
> two rack requirement which A LOT of data requires for successful save to
> occur.
> 
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /


Re: [DISCUSS] 0.8.1.2 Release

2014-09-30 Thread Jonathan Weeks
I was one asking for 0.8.1.2 a few weeks back, when 0.8.2 was at least 6-8 
weeks out.

If we truly believe that 0.8.2 will go “golden” and stable in 2-3 weeks, I, for 
one, don’t need a 0.8.1.2, but it depends on the confidence in shipping 0.8.2 
soonish.

YMMV,

-Jonathan


On Sep 30, 2014, at 12:37 PM, Neha Narkhede  wrote:

> Can we discuss the need for 0.8.1.2? I'm wondering if it's related to the
> timeline of 0.8.2 in any way? For instance, if we can get 0.8.2 out in the
> next 2-3 weeks, do we still need to get 0.8.1.2 out or can people just
> upgrade to 0.8.2?
> 
> On Tue, Sep 30, 2014 at 9:53 AM, Joe Stein  wrote:
> 
>> Hi, I wanted to kick off a specific discussion on a 0.8.1.2 release.
>> 
>> Here are the JIRAs I would like to propose to back port a patch (if not
>> already done so) and apply them to the 0.8.1 branch for a 0.8.1.2 release
>> 
>> https://issues.apache.org/jira/browse/KAFKA-1502 (source jar is empty)
>> https://issues.apache.org/jira/browse/KAFKA-1419 (cross build for scala
>> 2.11)
>> https://issues.apache.org/jira/browse/KAFKA-1382 (Update zkVersion on
>> partition state update failures)
>> https://issues.apache.org/jira/browse/KAFKA-1490 (remove gradlew initial
>> setup output from source distribution)
>> https://issues.apache.org/jira/browse/KAFKA-1645 (some more jars in our
>> src
>> release)
>> 
>> If the community and committers can comment on the patches proposed that
>> would be great. If I missed any bring them up or if you think any I have
>> proposed shouldn't be int he release bring that up too please.
>> 
>> Once we have consensus on this thread my thought was that I would apply and
>> commit the agreed to tickets to the 0.8.1 branch. If any tickets don't
>> apply of course a back port patch has to happen through our standard
>> process (not worried about that we have some engineering cycles to
>> contribute to making that happen). Once that is all done, I will build
>> 0.8.1.2 release artifacts and call a VOTE for RC1.
>> 
>> /***
>> Joe Stein
>> Founder, Principal Consultant
>> Big Data Open Source Security LLC
>> http://www.stealth.ly
>> Twitter: @allthingshadoop 
>> /
>> 



Re: Compile failure going from kafka 0.8.1.1 to 0.8.2

2014-10-30 Thread Jonathan Weeks
+1 on the two methods suggestion

-JW

On Oct 30, 2014, at 9:20 AM, Jay Kreps  wrote:

> But Jun,
> 
> That change breaks peoples code who weren't calling in that way. I don't
> think we should be making breaking changes in a point release like this.
> 
> I think we should treat this like a bug for 0.8.2 final, we should be able
> to add two commitOffsets methods with and without the param which should
> fix the problem, right?
> 
> -Jay
> 
> On Thu, Oct 30, 2014 at 8:59 AM, Jun Rao  wrote:
> 
>> Jack,
>> 
>> The commit offset api is changed slightly from
>> 
>> def commitOffsets() in 0.8.1.x
>> 
>> to
>> 
>> def commitOffsets(retryOnFailure: Boolean = true) in 0.8.2.x.
>> 
>> If you have been calling the method with parentheses like commitOffsets(),
>> then the code will compile in both 0.8.1.x and 0.8.2.x. In general, the
>> scala rule (http://docs.scala-lang.org/style/method-invocation.html) for
>> omitting parentheses when calling arity-0 methods is that the methods in
>> question have no side effects. In this case, commitOffsets() clearly has
>> side effect and should have been called with parentheses.
>> 
>> Thanks,
>> 
>> Jun
>> 
>> 
>> 
>> 
>> On Wed, Oct 29, 2014 at 12:40 PM, Jack Foy  wrote:
>> 
>>> My Scala project built against kafka 0.8.1.1 commits consumer offsets as
>>> follows:
>>> 
>>>connector.commitOffsets
>>> 
>>> This compiles without warnings. When I bumped the library dependency to
>>> 0.8.2-beta, the compiler started emitting this error:
>>> 
>>>[error]
>>> src/main/scala/com/whitepages/kafka/consumer/Connector.scala:21: missing
>>> arguments for method commitOffsets in trait ConsumerConnector;
>>>[error] follow this method with `_' if you want to treat it as a
>>> partially applied function
>>>[error] connector.commitOffsets
>>>[error]   ^
>>>[error] one error found
>>>[error] (compile:compile) Compilation failed
>>> 
>>> The following change resolved the error:
>>> 
>>>-connector.commitOffsets
>>>+connector.commitOffsets()
>>> 
>>> Should we expect compilation-breaking changes moving from 0.8.1.1 to
>>> 0.8.2-beta?
>>> 
>>> --
>>> Jack Foy 
>>> 
>>> 
>>> 
>>> 
>> 



Kafka 0.8.2 profile / hotspot analysis blog post

2014-11-23 Thread Jonathan Weeks
http://www.autoletics.com/posts/quick-performance-hotspot-analysis-apache-kafka 


I didn’t do, this, just a pointer to some interesting info, YMMV.

-Jonathan

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jonathan Weeks
+1 on this change — APIs are forever. As much as we’d love to see 0.8.2 release 
ASAP, it is important to get this right.

-JW

> On Nov 24, 2014, at 5:58 PM, Jun Rao  wrote:
> 
> Hi, Everyone,
> 
> I'd like to start a discussion on whether it makes sense to add the
> serializer api back to the new java producer. Currently, the new java
> producer takes a byte array for both the key and the value. While this api
> is simple, it pushes the serialization logic into the application. This
> makes it hard to reason about what type of data is being sent to Kafka and
> also makes it hard to share an implementation of the serializer. For
> example, to support Avro, the serialization logic could be quite involved
> since it might need to register the Avro schema in some remote registry and
> maintain a schema cache locally, etc. Without a serialization api, it's
> impossible to share such an implementation so that people can easily reuse.
> We sort of overlooked this implication during the initial discussion of the
> producer api.
> 
> So, I'd like to propose an api change to the new producer by adding back
> the serializer api similar to what we had in the old producer. Specially,
> the proposed api changes are the following.
> 
> First, we change KafkaProducer to take generic types K and V for the key
> and the value, respectively.
> 
> public class KafkaProducer implements Producer {
> 
>public Future send(ProducerRecord record, Callback
> callback);
> 
>public Future send(ProducerRecord record);
> }
> 
> Second, we add two new configs, one for the key serializer and another for
> the value serializer. Both serializers will default to the byte array
> implementation.
> 
> public class ProducerConfig extends AbstractConfig {
> 
>.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> KEY_SERIALIZER_CLASS_DOC)
>.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> VALUE_SERIALIZER_CLASS_DOC);
> }
> 
> Both serializers will implement the following interface.
> 
> public interface Serializer extends Configurable {
>public byte[] serialize(String topic, T data, boolean isKey);
> 
>public void close();
> }
> 
> This is more or less the same as what's in the old producer. The slight
> differences are (1) the serializer now only requires a parameter-less
> constructor; (2) the serializer has a configure() and a close() method for
> initialization and cleanup, respectively; (3) the serialize() method
> additionally takes the topic and an isKey indicator, both of which are
> useful for things like schema registration.
> 
> The detailed changes are included in KAFKA-1797. For completeness, I also
> made the corresponding changes for the new java consumer api as well.
> 
> Note that the proposed api changes are incompatible with what's in the
> 0.8.2 branch. However, if those api changes are beneficial, it's probably
> better to include them now in the 0.8.2 release, rather than later.
> 
> I'd like to discuss mainly two things in this thread.
> 1. Do people feel that the proposed api changes are reasonable?
> 2. Are there any concerns of including the api changes in the 0.8.2 final
> release?
> 
> Thanks,
> 
> Jun



Re: kafka 0.8.1: Producer.send() can block forever when a broker is down

2014-09-17 Thread Jonathan Weeks Gmail

The issue is that even with one down broker, the rest of the cluster is up, but 
unreachable from the producer client in this case, which defeats the high 
availability characteristics of clustering.

For any producer trying to use the service, it is "russian roulette" whether 
you will get meta-data back when asking for topic/partition data.

The ClientUtils code rightly iterates through the broker list looking for the 
metadata in random order, but if the first broker in the list is down, the 
others are never retried in a timely manner.

An example stacktrace shows the problem:

default-dispatcher-3" prio=5 tid=0x7fef131c6000 nid=0x5f03 runnable 
[0x0001146d2000]
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
- locked <0x0007ad4c1b50> (a java.lang.Object)
- locked <0x0007ad4c1b70> (a java.lang.Object)
- locked <0x0007ad4c1b60> (a java.lang.Object)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
- locked <0x0007ad3f3408> (a java.lang.Object)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
- locked <0x0007ad3de648> (a java.lang.Object)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)

An eight minute timeout is a non-starter for a clustered (HA) service. One 
would expect the system to respect the request.timeout.ms config setting, which 
it does, unless a broker host is down and happens to be first in the shuffled 
list of brokers to try to get the metadata.

I believe this bug is also exacerbated by the fact that the meta data is 
(rightly) refreshed via the topic.metadata.refresh.interval.ms config setting, 
which defaults to every 10 minutes. AFAIK, this means that if a single broker 
is down, every new producer as well as every existing producer has a 
(1/clusterSize-1) chance of either not starting or hanging for a minimum of 8 
minutes, (assuming the tcp connection code times out), every 10 minutes (or 
whatever topic.metadata.refresh.interval.ms is set to), if I understand 
correctly.  

Initializing the SocketChannel in code that doesn't respect the 
request.timeout.ms setting logically defeats the spirit of the timeout setting 
as well makes as the iteration code in ClientUtils far less useful:

(from fetchTopicMetadata:)
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
  val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, 
shuffledBrokers(i))
  info("Fetching metadata from broker %s with correlation id %d for %d topic(s) 
%s".format(shuffledBrokers(i), correlationId, topics.size, topics))
  try {
topicMetadataResponse = producer.send(topicMetadataRequest)
  
Opening the connection with a timeout as Jack suggests seems far preferable to 
the current situation.

Best Regards,

-Jonathan


On Sep 16, 2014, at 10:08 PM, Jun Rao  wrote:
> Jack,
> 
> If the broker is down, channel.connect() should throw an IOException,
> instead of blocking forever. In your case, is the broker host down? In that
> case, the connect call will likely wait for the default tcp connection
> timeout, which is 8+ mins.
> 
> Thanks,
> 
> Jun
> 
> On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy  wrote:
> 
>> We observe that when a broker is down, Producer.send() can get into a
>> state where it will block forever, even when using the async producer.
>> 
>> When a Producer first sends data, it fetches topic metadata from the
>> broker cluster. To do this, it shuffles the list of hosts in the cluster,
>> then iterates through the list querying each broker.
>> 
>> For each broker in the shuffled list, the Producer creates a SyncProducer
>> and invokes SyncProducer.send().
>> SyncProducer.send() creates a BlockingChannel and invokes
>> BlockingChannel.connect().
>> BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel,
>> sets it to blocking mode, and invokes SocketChannel.connect(), passing the
>> current broker hostname.
>> 
>> If the first broker in the list is nonresponsive, SocketChannel.connect()
>> will wait forever.
>> 
>> I think the correct change is as follows:
>> 
>> diff --git