Hi Eno, At the moment this is hard coded, but overridable with command line parameters:
config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass().getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass().getName()); config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxMessageBytes); config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes); config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); config.put(ProducerConfig.RETRIES_CONFIG, 2); if (ssl) config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); Variables: appId - "my-streamer-app" topic - "20170502_instancea_1234" brokers - "localhost:9092,localhost:9093,localhost:9094" zookeepers - "localhost:2181,localhost:2182,localhost:2183" maxMessageBytes - 30000000 ssl - true Thanks, Henry -- Henry Thacker On 2 May 2017 at 10:16:25, Eno Thereska (eno.there...@gmail.com) wrote: > Hi Henry, > > Could you share the streams configuration for your apps? I.e., the part > where you assign application id and all the rest of the configs (just > configs, not code). > > Thanks > Eno > > On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote: > > Thanks all for your replies - I have checked out the docs which were very > helpful. > > I have now moved the separate topic streams to different processes each > with their own app.id and I'm getting the following pattern, with no data > consumed: > > "Starting stream thread [StreamThread-1] > Discovered coordinator .... for group .. > Marking the coordinator .... dead for group .. > Discovered coordinator .... for group .. > Marking the coordinator .... dead for group .." > > The discover and dead states repeat every few minutes. > > During this time, the broker logs look happy. > > One other, hopefully unrelated point, is this cluster is all SSL > encrypted. > > Thanks, > Henry > > -- > Henry Thacker > > On 29 April 2017 at 05:31:30, Matthias J. Sax (matth...@confluent.io) > wrote: > > Henry, > > you might want to check out the docs, that give an overview of the > architecture: > http://docs.confluent.io/current/streams/architecture.html#example > > Also, I am wondering why your application did not crash: I would expect > an exception like > > java.lang.IllegalArgumentException: Assigned partition foo-2 for > non-subscribed topic regex pattern; subscription pattern is bar > > Maybe you just don't hit it, because both topics have a single partition > and not multiple. > > Out of interest though, had I subscribed for both topics in one subscriber > - I would have expected records for both topics interleaved > > > Yes. That should happen. > > why when > > running this in two separate processes do I not observe the same? > > > Not sure what you mean by this? > > If I fix this by changing the application ID for each streaming process - > does this mean I lose the ability to share state stores between the > applications? > > > Yes. > > > If both your topics are single partitioned, and you want to share state, > you will not be able to run with more then one thread in your Streams app. > > The only way to work around this, would be to copy the data into another > topic with more partitions before you process them -- of course, this > would mean data duplication. > > > -Matthias > > > On 4/28/17 12:45 PM, Henry Thacker wrote: > > Thanks Michael and Eno for your help - I always thought the unit of > parallelism was a combination of topic & partition rather than just > partition. > > Out of interest though, had I subscribed for both topics in one subscriber > - I would have expected records for both topics interleaved, why when > running this in two separate processes do I not observe the same? Just > wanting to try and form a mental model of how this is all working - I will > try and look through some code over the weekend. > > If I fix this by changing the application ID for each streaming process - > does this mean I lose the ability to share state stores between the > applications? > > Unfortunately the data on the input topics are provided by a third party > component which sends these keyless messages on a single partition per > topic, so I have little ability to fix this at source :-( > > Thanks, > Henry > > > ------------------------------ > > >