[ https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14541122#comment-14541122 ]
Geoffrey Anderson commented on KAFKA-1898: ------------------------------------------ Just making sure you're aware of work we're doing at Confluent on system tests. I'll be posting a KIP for this soon, but here's some info: The original plan is sketched here: https://cwiki.apache.org/confluence/display/KAFKA/System+Test+Improvements This is the core library/test framework (WIP) which aids in writing and running the tests https://github.com/confluentinc/ducktape/ This has system tests we've written to date for the Confluent Platform https://github.com/confluentinc/muckrake > compatibility testing framework > -------------------------------- > > Key: KAFKA-1898 > URL: https://issues.apache.org/jira/browse/KAFKA-1898 > Project: Kafka > Issue Type: Bug > Reporter: Joe Stein > Fix For: 0.8.3 > > Attachments: cctk.png > > > There are a few different scenarios where you want/need to know the > status/state of a client library that works with Kafka. Client library > development is not just about supporting the wire protocol but also the > implementations around specific interactions of the API. The API has > blossomed into a robust set of producer, consumer, broker and administrative > calls all of which have layers of logic above them. A Client Library may > choose to deviate from the path the project sets out and that is ok. The goal > of this ticket is to have a system for Kafka that can help to explain what > the library is or isn't doing (regardless of what it claims). > The idea behind this stems in being able to quickly/easily/succinctly analyze > the topic message data. Once you can analyze the topic(s) message you can > gather lots of information about what the client library is doing, is not > doing and such. There are a few components to this. > 1) dataset-generator > Test Kafka dataset generation tool. Generates a random text file with given > params: > --filename, -f - output file name. > --filesize, -s - desired size of output file. The actual size will always be > a bit larger (with a maximum size of $filesize + $max.length - 1) > --min.length, -l - minimum generated entry length. > --max.length, -h - maximum generated entry length. > Usage: > ./gradlew build > java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 100000 -l 2 > -h 20 > 2) dataset-producer > Test Kafka dataset producer tool. Able to produce the given dataset to Kafka > or Syslog server. The idea here is you already have lots of data sets that > you want to test different things for. You might have different sized > messages, formats, etc and want a repeatable benchmark to run and re-run the > testing on. You could just have a days worth of data and just choose to > replay it. The CCTK idea is that you are always starting from CONSUME in > your state of library. If your library is only producing then you will fail a > bunch of tests and that might be ok for people. > Accepts following params: > {code} > --filename, -f - input file name. > --kafka, -k - Kafka broker address in host:port format. If this parameter is > set, --producer.config and --topic must be set too (otherwise they're > ignored). > --producer.config, -p - Kafka producer properties file location. > --topic, -t - Kafka topic to produce to. > --syslog, -s - Syslog server address. Format: protocol://host:port > (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example) > --loop, -l - flag to loop through file until shut off manually. False by > default. > Usage: > ./gradlew build > java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename > dataset --syslog tcp://0.0.0.0:5140 --loop true > {code} > 3) extract > This step is good so you can save data and compare tests. It could also be > removed if folks are just looking for a real live test (and we could support > that too). Here we are taking data out of Kafka and putting it into > Cassandra (but other data stores can be used too and we should come up with a > way to abstract this out completely so folks could implement whatever they > wanted. > {code} > package ly.stealth.shaihulud.reader > import java.util.UUID > import com.datastax.spark.connector._ > import com.datastax.spark.connector.cql.CassandraConnector > import consumer.kafka.MessageAndMetadata > import consumer.kafka.client.KafkaReceiver > import org.apache.spark._ > import org.apache.spark.storage.StorageLevel > import org.apache.spark.streaming._ > import org.apache.spark.streaming.dstream.DStream > object Main extends App with Logging { > val parser = new scopt.OptionParser[ReaderConfiguration]("spark-reader") { > head("Spark Reader for Kafka client applications", "1.0") > opt[String]("testId") unbounded() optional() action { (x, c) => > c.copy(testId = x) > } text ("Source topic with initial set of data") > opt[String]("source") unbounded() required() action { (x, c) => > c.copy(sourceTopic = x) > } text ("Source topic with initial set of data") > opt[String]("destination") unbounded() required() action { (x, c) => > c.copy(destinationTopic = x) > } text ("Destination topic with processed set of data") > opt[Int]("partitions") unbounded() optional() action { (x, c) => > c.copy(partitions = x) > } text ("Partitions in topic") > opt[String]("zookeeper") unbounded() required() action { (x, c) => > c.copy(zookeeper = x) > } text ("Zookeeper connection host:port") > opt[Int]("kafka.fetch.size") unbounded() optional() action { (x, c) => > c.copy(kafkaFetchSize = x) > } text ("Maximum KBs to fetch from Kafka") > checkConfig { c => > if (c.testId.isEmpty || c.sourceTopic.isEmpty || > c.destinationTopic.isEmpty || c.zookeeper.isEmpty) { > failure("You haven't provided all required parameters") > } else { > success > } > } > } > val config = parser.parse(args, ReaderConfiguration()) match { > case Some(c) => c > case None => sys.exit(1) > } > val sparkConfig = new SparkConf().setAppName("kafka_client_validator") > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > val ssc = new StreamingContext(sparkConfig, Seconds(10)) > ssc.checkpoint("reader") > CassandraConnector(sparkConfig).withSessionDo( session => { > session.execute("CREATE KEYSPACE IF NOT EXISTS kafka_client_validation > WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}") > session.execute("CREATE TABLE IF NOT EXISTS > kafka_client_validation.tests(test_id text PRIMARY KEY, source_topic text, > destination_topic text)") > session.execute("CREATE TABLE IF NOT EXISTS > kafka_client_validation.counters(test_id text, topic text, total counter, > PRIMARY KEY(test_id, topic))") > session.execute("CREATE TABLE IF NOT EXISTS > kafka_client_validation.messages(test_id text, topic text, partition int, > offset int, payload text, PRIMARY KEY(test_id, topic, partition, offset))") > }) > val test = Test(config.testId, config.sourceTopic, config.destinationTopic) > > ssc.sparkContext.parallelize(Seq(test)).saveToCassandra("kafka_client_validation", > "tests") > startStreamForTopic(test.test_id, config.sourceTopic, config) > startStreamForTopic(test.test_id, config.destinationTopic, config) > ssc.start() > ssc.awaitTermination() > def startStreamForTopic(testId: String, topic: String, config: > ReaderConfiguration) { > val stream = createKafkaStream(config.zookeeper, topic, > config.partitions).repartition(config.partitions).persist(StorageLevel.MEMORY_AND_DISK_SER) > stream.map(message => { > Counter(testId, message.getTopic, 1L) > }).reduce((prev, curr) => { > Counter(testId, prev.topic, prev.total + curr.total) > }).foreachRDD(rdd => { > rdd.saveToCassandra("kafka_client_validation", "counters") > }) > stream.map(message => { > Message(testId, message.getTopic,message.getPartition.partition, > message.getOffset, new String(message.getPayload)) > }).foreachRDD(rdd => { > rdd.saveToCassandra("kafka_client_validation", "messages") > }) > } > private def createKafkaStream(zkConnect: String, topic: String, partitions: > Int): DStream[MessageAndMetadata] = { > val zkhosts = zkConnect.split(":")(0) > val zkports = zkConnect.split(":")(1) > val kafkaParams = Map("zookeeper.hosts" -> zkhosts, > "zookeeper.port" -> zkports, > "zookeeper.consumer.connection" -> zkConnect, > "zookeeper.broker.path" -> "/brokers", > "zookeeper.consumer.path" -> "/consumers", > "fetch.size.bytes" -> (config.kafkaFetchSize * > 1024).toString, > "kafka.topic" -> topic, > "kafka.consumer.id" -> "%s-%s".format(topic, > UUID.randomUUID().toString)) > val props = new java.util.Properties() > kafkaParams foreach { case (key, value) => props.put(key, value)} > val streams = (0 to partitions - 1).map { partitionId => > ssc.receiverStream(new KafkaReceiver(StorageLevel.MEMORY_AND_DISK_SER, props, > partitionId))} > ssc.union(streams) > } > } > case class Test(test_id: String = "", source_topic: String = "", > destination_topic: String = "") > case class Counter(test_id: String = "", topic: String = "", total: Long = 0L) > case class Message(test_id: String = "", topic: String = "", partition: Int = > 0, offset: Long = 0, payload: String = "") > case class ReaderConfiguration(testId: String = UUID.randomUUID().toString, > sourceTopic: String = "", destinationTopic: String = "", > partitions: Int = 1, zookeeper: String = > "", kafkaFetchSize: Int = 8) > {code} > 4) validator > This is plug-able both for how to read the topics and process the results to > once done > Right now we have been checking out using Spark and Cassandra for this, we > also are looking at Spark and HBase and Samza with the Mesos support. The > nice thing about using Samza is we really don't have to use another data > store it is just so easy to put the results back into a topic. > Here is kind of what the Spark/Cassandra version looks like for whether or > not a consumer/producer is a) at least once processing guarantee 2) order > order preserving 3) etc, etc, etc. While this test is running many (as much > as you want) negative testing can be done to the cluster. It is made to run > in an environment where you want to pump through as much data as you can as > fast as you can and then once done, analyze it. > {code} > package ly.stealth.shaihulud.validator > import java.security.MessageDigest > import java.util.Iterator > import com.datastax.driver.core.{Cluster, Row, SocketOptions} > object Main extends App { > val parser = new > scopt.OptionParser[ValidatorConfiguration]("spark-validator") { > head("Spark Validator for Kafka client applications", "1.0") > opt[String]("test.id") unbounded() required() action { (x, c) => > c.copy(testId = x) > } text ("Test ID") > opt[String]("cassandra.connect") unbounded() required() action { (x, c) => > c.copy(cassandraConnect = x) > } text ("Cassandra host") > opt[String]("cassandra.user") unbounded() required() action { (x, c) => > c.copy(cassandraUser = x) > } text ("Cassandra user") > opt[String]("cassandra.password") unbounded() required() action { (x, c) > => > c.copy(cassandraPassword = x) > } text ("Cassandra password") > checkConfig { c => > if (c.testId.isEmpty || c.cassandraConnect.isEmpty || > c.cassandraUser.isEmpty || c.cassandraPassword.isEmpty) { > failure("You haven't provided all required parameters") > } else { > success > } > } > } > val config = parser.parse(args, ValidatorConfiguration()) match { > case Some(c) => c > case None => sys.exit(1) > } > val cluster = new Cluster.Builder() > .addContactPoints(config.cassandraConnect) > .withSocketOptions(new SocketOptions().setTcpNoDelay(true)) > .build() > val session = cluster.connect("kafka_client_validation") > val tests = session.execute("SELECT * FROM kafka_client_validation.tests > WHERE test_id='%s'".format(config.testId)) > val test = tests.one() > if (test != null) { > val testId = test.getString("test_id") > val sourceTopic = test.getString("source_topic") > val destinationTopic = test.getString("destination_topic") > val countersQuery = "SELECT * FROM kafka_client_validation.counters WHERE > test_id='%s' AND topic='%s'" > val sourceCounter = session.execute(countersQuery.format(testId, > sourceTopic)) > val destinationCounter = session.execute(countersQuery.format(testId, > destinationTopic)) > println("***** TEST RESULTS *****") > var sameAmount = false > val totalInSource = sourceCounter.one().getLong("total") > val totalInDestination = destinationCounter.one().getLong("total") > if (totalInSource == totalInDestination) { > sameAmount = true > } > println(" - Destination topic contains the same amount of messages as > Source topic(%d out of %d): %B".format(totalInSource, > > totalInDestination, > > sameAmount)) > val messagesQuery = "SELECT * FROM kafka_client_validation.messages WHERE > test_id='%s' AND topic='%s'" > val sourceMessages = session.execute(messagesQuery.format(testId, > sourceTopic)) > val destinationMessages = session.execute(messagesQuery.format(testId, > destinationTopic)) > val si = sourceMessages.iterator() > val di = destinationMessages.iterator() > val portionSize = 1000 > var isOrderPreserved = true > while ((si.hasNext || di.hasNext) && isOrderPreserved) { > val sourceHash = this.calculateMD5ForSlice(si, portionSize) > val destinationHash = this.calculateMD5ForSlice(di, portionSize) > if (sourceHash != destinationHash) { > isOrderPreserved = false > } > } > println(" - Destination topic preserves ordering of Source topic: > %B".format(isOrderPreserved)) > } else { > System.err.println("There is no such test '%s'".format(config.testId)) > } > cluster.close() > def calculateMD5ForSlice(it: Iterator[Row], portionSize: Int): String = { > val sb = new StringBuilder > var left = portionSize > while (it.hasNext && left > 0) { > sb.append(it.next.getString("payload")) > left = left - 1 > } > new > String(MessageDigest.getInstance("MD5").digest(sb.toString().getBytes("UTF-8"))) > } > } > case class ValidatorConfiguration(testId: String = "", cassandraConnect: > String = "", cassandraUser: String = "", cassandraPassword: String = "") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)