Hi guys

In my samza setup I need to create a custom SystemConsumer. Basically I need to 
source data from some database table but due to Oracle constraints I can’t use 
DataBus.

So I created a consumer who polls database and pushes new changes into stream 
by extending BlockingEnvelopMap and calling put() method:

public class MockTradeSource extends BlockingEnvelopeMap {
//…poll here, remember last read row number
msg = nextMsg(lastRowNumer);
MockTradeSource.this.put(ssp, new IncomingMessageEnvelope(ssp,
        null, null, msg));
}
Obviously this source now has state - it remembers what was last row it 
published.
Question is - how can I make it statefull for Samza? I would prefer to leverage 
existing checkpointing or similar framework-level thing and have this Source 
managed by Yarn

Of course I could also take this code out of Samza and just push messages to 
Kafka running the publisher as external process, but this is not what I am 
looking for.

Reply via email to