Hi, all

I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the 
performance test is OK. Now I am using tridentKafkaSpout, and being able to 
getting data from producer, see

 BrokerHosts zk = new ZkHosts("10.100.70.128:2181");   
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest"); 
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);
// TransactionalTridentKafkaSpout kafkaSpout = new 
TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology(); 
Stream stream = topology.newStream("topictestspout", kafkaSpout).shuffle()
                                                            .each(new 
Fields("str"),  
                                                                      new 
PrintStream(),
                                                                      new 
Fields("event_object")) 
                                                            
.parallelismHint(16); 


With above code, I can print out the json objects published to brokers. Instead 
of printing messages, I will like to simply populate the messages into 
postgresql DB. I download the code from 
https://github.com/geoforce/storm-postgresql

Here the problems I have:
1. When I am running the storm-postgresql code, the messages generated from a 
RandomTupleSpout(), I am only able to write data into postgresql DB 100 rows 
regardless how I change the PostgresqlStateConfig. 

2. Now I want to be able to write the json messages into postgresql DB, things 
seem to be simple, just 2 columns in the DB table, id and events which stores 
json messages. Forgive my dullness, I couldn't get it work by storm-postgresql. 

I wonder if anyone has done the similar jobs, getting data from 
tridentKafkaSpout and write exactly into postgresql DB. In addition, once the 
writer starts to work, if it stops and restarts for some reasons, and I will to 
writer to resume the consume process from the stop point instead of very 
beginning, how to manage the offset and restart to write into DB?


thanks

Alec 

> Hi, All
> 
> I setup a kafka cluster, and plan to publish the messages from Web to kafka, 
> the messages are in the form of json, I want to implement a consumer to write 
> the message I consumer to postgresql DB, not aggregation at all. I was 
> thinking to use KafkaSpout in storm to make it happen, now I want to simplify 
> the step, just use kafka consumer to populate message into postgresql. This 
> consumer should have the functions of consumer data, write into postgresql DB 
> in batch, if servers down, consumer can retrieve the data it stored in hard 
> drive with no redundancy and can consume the data from where it stopped once 
> the server up.
> 
> Is there any sample code for this?
> 
> 
> thanks a lot
> 
> 
> Alec

Reply via email to