Hi, all I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the performance test is OK. Now I am using tridentKafkaSpout, and being able to getting data from producer, see
BrokerHosts zk = new ZkHosts("10.100.70.128:2181"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf); // TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(spoutConf); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("topictestspout", kafkaSpout).shuffle() .each(new Fields("str"), new PrintStream(), new Fields("event_object")) .parallelismHint(16); With above code, I can print out the json objects published to brokers. Instead of printing messages, I will like to simply populate the messages into postgresql DB. I download the code from https://github.com/geoforce/storm-postgresql Here the problems I have: 1. When I am running the storm-postgresql code, the messages generated from a RandomTupleSpout(), I am only able to write data into postgresql DB 100 rows regardless how I change the PostgresqlStateConfig. 2. Now I want to be able to write the json messages into postgresql DB, things seem to be simple, just 2 columns in the DB table, id and events which stores json messages. Forgive my dullness, I couldn't get it work by storm-postgresql. I wonder if anyone has done the similar jobs, getting data from tridentKafkaSpout and write exactly into postgresql DB. In addition, once the writer starts to work, if it stops and restarts for some reasons, and I will to writer to resume the consume process from the stop point instead of very beginning, how to manage the offset and restart to write into DB? thanks Alec > Hi, All > > I setup a kafka cluster, and plan to publish the messages from Web to kafka, > the messages are in the form of json, I want to implement a consumer to write > the message I consumer to postgresql DB, not aggregation at all. I was > thinking to use KafkaSpout in storm to make it happen, now I want to simplify > the step, just use kafka consumer to populate message into postgresql. This > consumer should have the functions of consumer data, write into postgresql DB > in batch, if servers down, consumer can retrieve the data it stored in hard > drive with no redundancy and can consume the data from where it stopped once > the server up. > > Is there any sample code for this? > > > thanks a lot > > > Alec