Hi Eno,

At the moment this is hard coded, but overridable with command line
parameters:

config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Bytes().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.Bytes().getClass().getName());
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
maxMessageBytes);
config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes);
config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
config.put(ProducerConfig.RETRIES_CONFIG, 2);

if (ssl)
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

Variables:
appId - "my-streamer-app"
topic - "20170502_instancea_1234"
brokers - "localhost:9092,localhost:9093,localhost:9094"
zookeepers - "localhost:2181,localhost:2182,localhost:2183"
maxMessageBytes - 30000000
ssl - true

Thanks,
Henry
-- 
Henry Thacker

On 2 May 2017 at 10:16:25, Eno Thereska (eno.there...@gmail.com) wrote:

> Hi Henry,
>
> Could you share the streams configuration for your apps? I.e., the part
> where you assign application id and all the rest of the configs (just
> configs, not code).
>
> Thanks
> Eno
>
> On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote:
>
> Thanks all for your replies - I have checked out the docs which were very
> helpful.
>
> I have now moved the separate topic streams to different processes each
> with their own app.id and I'm getting the following pattern, with no data
> consumed:
>
> "Starting stream thread [StreamThread-1]
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group ..
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group .."
>
> The discover and dead states repeat every few minutes.
>
> During this time, the broker logs look happy.
>
> One other, hopefully unrelated point, is this cluster is all SSL
> encrypted.
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
> On 29 April 2017 at 05:31:30, Matthias J. Sax (matth...@confluent.io)
> wrote:
>
> Henry,
>
> you might want to check out the docs, that give an overview of the
> architecture:
> http://docs.confluent.io/current/streams/architecture.html#example
>
> Also, I am wondering why your application did not crash: I would expect
> an exception like
>
> java.lang.IllegalArgumentException: Assigned partition foo-2 for
> non-subscribed topic regex pattern; subscription pattern is bar
>
> Maybe you just don't hit it, because both topics have a single partition
> and not multiple.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved
>
>
> Yes. That should happen.
>
> why when
>
> running this in two separate processes do I not observe the same?
>
>
> Not sure what you mean by this?
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
>
> Yes.
>
>
> If both your topics are single partitioned, and you want to share state,
> you will not be able to run with more then one thread in your Streams app.
>
> The only way to work around this, would be to copy the data into another
> topic with more partitions before you process them -- of course, this
> would mean data duplication.
>
>
> -Matthias
>
>
> On 4/28/17 12:45 PM, Henry Thacker wrote:
>
> Thanks Michael and Eno for your help - I always thought the unit of
> parallelism was a combination of topic & partition rather than just
> partition.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved, why when
> running this in two separate processes do I not observe the same? Just
> wanting to try and form a mental model of how this is all working - I will
> try and look through some code over the weekend.
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
> Unfortunately the data on the input topics are provided by a third party
> component which sends these keyless messages on a single partition per
> topic, so I have little ability to fix this at source :-(
>
> Thanks,
> Henry
>
>
> ------------------------------
>
>
>

Reply via email to