Re: Custom state store

2017-11-13 Thread Matthias J. Sax
) >> >> >> -Matthias >> >> On 11/13/17 10:26 AM, Boris Lublinsky wrote: >>> >>>> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky >>>> wrote: >>>> >>>> It looks like for the custom state store implementation the only

Re: Custom state store

2017-11-13 Thread Boris Lublinsky
>>> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky >>> wrote: >>> >>> It looks like for the custom state store implementation the only option is >>> to use Topology APIs. >>> The problem is that in the case of DSL, Kafka streams does n

Re: Custom state store

2017-11-13 Thread Matthias J. Sax
You can plug in a custom store via `Materialized` parameter that allows to specify a custom `KeyValueBytesStoreSupplier` (and others) -Matthias On 11/13/17 10:26 AM, Boris Lublinsky wrote: > >> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky >> wrote: >> >> It look

Re: Custom state store

2017-11-13 Thread Boris Lublinsky
> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky > wrote: > > It looks like for the custom state store implementation the only option is to > use Topology APIs. > The problem is that in the case of DSL, Kafka streams does not provide any > option to create Store Build

Custom state store

2017-11-13 Thread Boris Lublinsky
It looks like for the custom state store implementation the only option is to use Topology APIs. The problem is that in the case of DSL, Kafka streams does not provide any option to create Store Builder for a custom store. Am I missing something? Boris Lublinsky FDP Architect boris.lublin

Re: IllegalStateException with custom state store ..

2017-07-20 Thread Matthias J. Sax
has a null RecordContext. Gave the > following debug statement .. > > println(context.asInstanceOf[ProcessorContextImpl].recordContext) > > and got null. > > regards. > > On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh > wrote: > >> Hi - >> >> I h

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Debasish Ghosh
if (loggingEnabled) { >>>> changeLogger.logChange(changelogKey, bf >>>> } >>>> } >>>> >>>> which in turn calls logChange that gives the error. >>>> >>>> Am I missing something ? >>>> >>>&

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Damian Guy
>> >>> regards. >>> >>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy wrote: >>> >>>> Hi, >>>> >>>> It is because you are calling `context.timestamp` during `commit`. At >>>> this point there is no `RecordContext` ass

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Debasish Ghosh
y want to log the change >>> when you write to the store. >>> >>> Thanks, >>> Damian >>> >>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh >>> wrote: >>> >>>> Just to give some more information, the ProcessorContext tha

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Damian Guy
gt; to the init method of the custom store has a null RecordContext. Gave the >>> following debug statement .. >>> >>> println(context.asInstanceOf[ProcessorContextImpl].recordContext) >>> >>> and got null. >>> >>> regards. >>>

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Debasish Ghosh
rintln(context.asInstanceOf[ProcessorContextImpl].recordContext) >> >> and got null. >> >> regards. >> >> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh >> wrote: >> >> > Hi - >> > >> > I

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Damian Guy
xt.asInstanceOf[ProcessorContextImpl].recordContext) > > and got null. > > regards. > > On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh > wrote: > > > Hi - > > > > I have implemented a custom state store named BFStore with a change > > logger as follows

Re: IllegalStateException with custom state store ..

2017-07-01 Thread Debasish Ghosh
PM, Debasish Ghosh wrote: > Hi - > > I have implemented a custom state store named BFStore with a change > logger as follows: > > class BFStoreChangeLogger[K, V](val storeName: String, > val context: ProcessorContext, >

IllegalStateException with custom state store ..

2017-07-01 Thread Debasish Ghosh
Hi - I have implemented a custom state store named BFStore with a change logger as follows: class BFStoreChangeLogger[K, V](val storeName: String, val context: ProcessorContext, val partition: Int