Something like below
my bad for the last quick email
topology.newStream("topictestspout", kafkaSpout).parallelismHint(4)
Regards
Sai
On Tue, Oct 28, 2014 at 3:48 PM, saiprasad mishra <[email protected]
> wrote:
>
> you can increase the parallelismHint while creating the stream
>
> You can also take a look at this for some more explanation
>
> https://gist.github.com/mrflip/5958028
>
>
> Regards
> Sai
>
> On Tue, Oct 28, 2014 at 3:35 PM, Sa Li <[email protected]> wrote:
>
>> Thanks Sai, I set spoutConf.fetchSizeBytes = 500*1024*1024; it didn't
>> help, I assume there might be some other issues unresolved.
>>
>> THanks
>>
>>
>> Alec
>>
>> On Tue, Oct 28, 2014 at 3:18 PM, saiprasad mishra <
>> [email protected]> wrote:
>>
>>> You can configure how much to fetch from kafka incase you have not done
>>> the below like config
>>>
>>> spoutConf.fetchSizeBytes = 10*1024*1024; // these many bytes default is
>>> 1024*1024 i guess
>>>
>>>
>>> Regards
>>>
>>> Sai
>>>
>>> On Tue, Oct 28, 2014 at 2:59 PM, Sa Li <[email protected]> wrote:
>>>
>>>> Hi, all
>>>>
>>>> I have a question about TridentKafkaSpout, I am using TridentKafkaSpout
>>>> to consume data from
>>>> Kafka cluster, I found the latency of kafkaSpout is too slow to
>>>> acceptable, see this is the code
>>>>
>>>> BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
>>>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest");
>>>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>> OpaqueTridentKafkaSpout kafkaSpout = new
>>>> OpaqueTridentKafkaSpout(spoutConf);
>>>> TridentTopology topology = new TridentTopology();
>>>>
>>>> I use two ways to create stream, one is KafkaSpout to read data from
>>>> kafka, one is RandomTupleSpout to generate tuple randomly.
>>>>
>>>> topology.newStream("topictestspout", kafkaSpout)
>>>> //topology.newStream("test", new RandomTupleSpout())
>>>>
>>>> .persistentAggregate(PostgresqlState.newFactory(config), new
>>>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"))
>>>>
>>>>
>>>> I am expecting to process the batch data in State, but I found list
>>>> size of messages is up to 4 by using kafkaSpout no matter how large the
>>>> batch size I set, but with RandomTupleSpout(), the actual batch size in
>>>> memory can reach few hundreds (not very high), so I am concluding the
>>>> KafkaSpout consume data too slow, it that because something wrong with my
>>>> code, or something I need to change in Kafka/storm cluster configurations.
>>>> Currently I am using 3-nodes (9 brokers) kafka cluster, single node storm
>>>> cluster, 3-node zk ensemble. Above code is in the local mode.
>>>>
>>>> thanks
>>>>
>>>> Alec
>>>>
>>>
>>>
>>
>