Hi guys In my samza setup I need to create a custom SystemConsumer. Basically I need to source data from some database table but due to Oracle constraints I can’t use DataBus.
So I created a consumer who polls database and pushes new changes into stream by extending BlockingEnvelopMap and calling put() method: public class MockTradeSource extends BlockingEnvelopeMap { //…poll here, remember last read row number msg = nextMsg(lastRowNumer); MockTradeSource.this.put(ssp, new IncomingMessageEnvelope(ssp, null, null, msg)); } Obviously this source now has state - it remembers what was last row it published. Question is - how can I make it statefull for Samza? I would prefer to leverage existing checkpointing or similar framework-level thing and have this Source managed by Yarn Of course I could also take this code out of Samza and just push messages to Kafka running the publisher as external process, but this is not what I am looking for.