GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/130
KIP-28: First patch Some open questions collected so far on the first patch. Thanks @gwenshap @jkreps. 1. Can we hide the Chooser interface from users? In other words, if users can specify the "time" on each fetched messages from Kafka, would a hard-coded MinTimestampMessageChooser be sufficient so that we can move TimestampTracker / RecordQueue / Chooser / RecordCollector / etc all to the internal folders? 2. Shall we split the o.a.k.clients into two folders, with o.a.k.clients.processor in stream? Or should we just remove o.a.k.clients.processor and make everything under o.a.k.stream? In addition, currently there is a cyclic dependency between that two, would better to break it in the end state. 3. Topology API: requiring users to instantiate their own Topology class with the overridden build() function is a little awkward. Instead it would be great to let users explicitly build the topology in Main and pass it in as a class: ``` Topology myTopology = new TopologyBuilder(defaultDeser) .addProcessor("my-processor", MyProcessor.class, new Source("my-source")) .addProcessor("my-other-processor", MyOtherProcessor.class, "my-processor"); KafkaStreaming streaming = new KafkaStreaming(config, myTopology); streaming.run(); ``` So the implementation of KStream.filter look instead like this: ``` public KStream<K, V> filter(Predicate<K, V> predicate) { KStreamFilter<K, V> filter = new KStreamFilter<>(); topology.addProcessor(KStreamFilter.class, new Configs("predicate", predicate)); return this; } ``` The advantage is that the user code can now get rid of the whole Topology class with the builder. I think the order of execution for that API is quite unintuitive. 4. We can probably move the forward() function from Processor to ProcessorContext, and split ProcessorContext into two classes, one with all the function calls as commit / send / schedule / forward, and another with the metadata function calls as topic / partition / offset / timestamp. 5. Merge ProcessorConfigs with ProcessorProperties. 6. Consider moving the external dependencies such as RocksDB into a separate jar? For example we can just include a kafka-stream-rocksdb.jar which includes the RocksDBKeyValueStore only, and later on when we deprecate / remove such implementations we can simply remove the jar itself. You can merge this pull request into a Git repository by running: $ git pull https://github.com/confluentinc/kafka streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/130.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #130 ---- commit 1527f277dc33a18ce348a357d7883349af72fc49 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-24T16:40:46Z First commit commit abc220a0f2217a69103466fc3e9bdcf92502a15a Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-25T17:56:58Z stream synchronization commit dc200a460f31331bbb5b2bcc3f0567e5fb80904e Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-29T16:27:06Z ported many stuff from Jay's streming prototype commit 34d02b21e46902df77e844b1b8b30043fa98cff3 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-29T20:52:49Z removed punctuate method, added punctuationqueue per streamsynchronizer commit d7c068990513c4ded6f0efbd04d9995c1a69db85 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-29T21:39:13Z fixed compile warnings commit f81d463df655349af25889e2a3319403fa017d6d Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-29T23:19:49Z added sync to PunctuationSchedulerImpl commit 480ee6d41e26499a3e035f8471a82431b3733014 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T00:09:23Z pass stream time to puctuate() commit b20bbd2d596f6e5e53a0eaab0f8f88275c5e8e8b Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T16:46:16Z removed flush method from KStream and Processor commit 583348bad6e7dd2ea62078231d757de72a7ea0ec Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T17:09:35Z simplified recordqueue interface commit 8cce08db01c0d3df62ba52316fade3dfd58cba24 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T18:03:40Z separated timestamp stacking from queue impl commit 31576183c1011e68fe54774d4160c81ab17c59ab Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T19:03:18Z comments commit 20b52d40514313616f49980f737597ff9a4c961c Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T19:33:59Z comments commit 2c4f3335c88a397557dede3d5745a7b69ee068ea Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T21:24:40Z added Ingestor interface, renamed RegulatedConsumer to IngestorImpl commit 9f561b18268dc7bc6d90099cdcee2e602d027373 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T21:54:18Z added KStreamBranchTest commit ee3e923bd2fbc92d3b24449a13bd64ead5bc1f20 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T23:10:06Z more tests commit 0c790442a6d82796140d63939952806f88426913 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-06-30T23:29:01Z more tests commit 566d45778e00ccd2bc027811675c92a86f143025 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T16:04:03Z use Ingestor interface in StreamSycnhronizer and KStreamContextImpl commit b388a3ede7784263c9236b3e12ffd0de3f6ded06 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T16:51:42Z more join tests commit 02af7188154bcb64ba3aa60afc3f1fa6c1c61241 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T17:16:28Z test FilteredIterator commit 50dbcb8998e825e510495458f0daa32064ae1be3 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T17:48:34Z test MinTimestampTracker commit 1d9d63d5bc880c2f8e69b2b5044ca5dcfec4f67e Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T20:47:38Z more test, and removed StreamSynchronizerFactory commit 56ce42495dba119e908fb8720bfe31e5e5b87da7 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T21:07:49Z removed KStreamContext.steamingConfig() method commit 3ab1730edfc358f8a4a4fc54fb53da6bc994d6ea Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T21:17:21Z parameter name commit 36bcdd6f74bb336d92a619666ee9a649dc1d7b9a Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T21:33:38Z added Ingestor.numPartition(topic) method commit e3d4c598f69e7d15b3ac21f3246173fc55fa207f Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T22:09:08Z removed unused member variables commit ff6df57dbee920637b3566db5073d9b4312b2c95 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T23:02:48Z more test commit b021f43f45c9e228829707d31b4e8458b183d702 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T23:15:50Z moved StreamSynchronizer to internal commit c27397fb8d111a9f70e248b9c1e741701324490b Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-01T23:32:31Z moved Chooser and RecordQueue to internal commit 72fd3abf2ac8568e296ed8a5a79213a04dbe2419 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-02T18:05:12Z removed nestedLoop, added joinPrior commit a32fa7315d8736ec7ce7be00841121929fb3ab44 Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Date: 2015-07-02T18:29:44Z test joinPior ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---