Hi Eno, Think I've cracked it finally - was hit by two problems, firstly my listen IP was 0.0.0.0 and clients were trying to connect to this, which obviously wasn't going to work.
Part two was I had stupidly left some code in when I was working with the KStreamBuilder and hadn't removed it when moving to the TopologyBuilder, so I was trying to instantiate a KafkaStreams with no input source. Looks like everything works now - thank you very much for your help. Now I'm creating multiple application.ids that are essentially throwaway and all state can be removed after 5 days, just need to work out how to tidy this all up in a semi-automated fashion. Thanks, Henry -- Henry Thacker On 2 May 2017 at 16:21:33, Eno Thereska (eno.there...@gmail.com) wrote: > Could you make sure you don’t have a firewall or that the Kafka brokers > are set up correctly and can be accessed? Is the SSL port the same as the > PLAINTEXT port in your server.config file? E.g., see this: > https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521 > < > https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521> > > > Eno > > On May 2, 2017, at 10:59 AM, Henry Thacker <he...@henrythacker.com> > wrote: > > Hi Eno, > > At the moment this is hard coded, but overridable with command line > parameters: > > config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers); > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.Bytes().getClass().getName()); > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.Bytes().getClass().getName()); > config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > maxMessageBytes); > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes); > config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > WallclockTimestampExtractor.class.getName()); > config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir); > config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200); > config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); > config.put(ProducerConfig.RETRIES_CONFIG, 2); > > if (ssl) > config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); > > Variables: > appId - "my-streamer-app" > topic - "20170502_instancea_1234" > brokers - "localhost:9092,localhost:9093,localhost:9094" > zookeepers - "localhost:2181,localhost:2182,localhost:2183" > maxMessageBytes - 30000000 > ssl - true > > Thanks, > Henry > -- > Henry Thacker > > On 2 May 2017 at 10:16:25, Eno Thereska (eno.there...@gmail.com) wrote: > > Hi Henry, > > Could you share the streams configuration for your apps? I.e., the part > where you assign application id and all the rest of the configs (just > configs, not code). > > Thanks > Eno > > On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote: > > Thanks all for your replies - I have checked out the docs which were very > helpful. > > I have now moved the separate topic streams to different processes each > with their own app.id and I'm getting the following pattern, with no data > consumed: > > "Starting stream thread [StreamThread-1] > Discovered coordinator .... for group .. > Marking the coordinator .... dead for group .. > Discovered coordinator .... for group .. > Marking the coordinator .... dead for group .." > > The discover and dead states repeat every few minutes. > > During this time, the broker logs look happy. > > One other, hopefully unrelated point, is this cluster is all SSL > encrypted. > > Thanks, > Henry > > -- > Henry Thacker > > On 29 April 2017 at 05:31:30, Matthias J. Sax (matth...@confluent.io) > wrote: > > Henry, > > you might want to check out the docs, that give an overview of the > architecture: > http://docs.confluent.io/current/streams/architecture.html#example > > Also, I am wondering why your application did not crash: I would expect > an exception like > > java.lang.IllegalArgumentException: Assigned partition foo-2 for > non-subscribed topic regex pattern; subscription pattern is bar > > Maybe you just don't hit it, because both topics have a single partition > and not multiple. > > Out of interest though, had I subscribed for both topics in one subscriber > - I would have expected records for both topics interleaved > > > Yes. That should happen. > > why when > > running this in two separate processes do I not observe the same? > > > Not sure what you mean by this? > > If I fix this by changing the application ID for each streaming process - > does this mean I lose the ability to share state stores between the > applications? > > > Yes. > > > If both your topics are single partitioned, and you want to share state, > you will not be able to run with more then one thread in your Streams app. > > The only way to work around this, would be to copy the data into another > topic with more partitions before you process them -- of course, this > would mean data duplication. > > > -Matthias > > > On 4/28/17 12:45 PM, Henry Thacker wrote: > > Thanks Michael and Eno for your help - I always thought the unit of > parallelism was a combination of topic & partition rather than just > partition. > > Out of interest though, had I subscribed for both topics in one subscriber > - I would have expected records for both topics interleaved, why when > running this in two separate processes do I not observe the same? Just > wanting to try and form a mental model of how this is all working - I will > try and look through some code over the weekend. > > If I fix this by changing the application ID for each streaming process - > does this mean I lose the ability to share state stores between the > applications? > > Unfortunately the data on the input topics are provided by a third party > component which sends these keyless messages on a single partition per > topic, so I have little ability to fix this at source :-( > > Thanks, > Henry > > > ------------------------------ > > > > >