)
>>
>>
>> -Matthias
>>
>> On 11/13/17 10:26 AM, Boris Lublinsky wrote:
>>>
>>>> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky
>>>> wrote:
>>>>
>>>> It looks like for the custom state store implementation the only
>>> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky
>>> wrote:
>>>
>>> It looks like for the custom state store implementation the only option is
>>> to use Topology APIs.
>>> The problem is that in the case of DSL, Kafka streams does n
You can plug in a custom store via `Materialized` parameter that allows
to specify a custom `KeyValueBytesStoreSupplier` (and others)
-Matthias
On 11/13/17 10:26 AM, Boris Lublinsky wrote:
>
>> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky
>> wrote:
>>
>> It look
> On Nov 13, 2017, at 12:24 PM, Boris Lublinsky
> wrote:
>
> It looks like for the custom state store implementation the only option is to
> use Topology APIs.
> The problem is that in the case of DSL, Kafka streams does not provide any
> option to create Store Build
It looks like for the custom state store implementation the only option is to
use Topology APIs.
The problem is that in the case of DSL, Kafka streams does not provide any
option to create Store Builder for a custom store.
Am I missing something?
Boris Lublinsky
FDP Architect
boris.lublin
has a null RecordContext. Gave the
> following debug statement ..
>
> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>
> and got null.
>
> regards.
>
> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh
> wrote:
>
>> Hi -
>>
>> I h
if (loggingEnabled) {
>>>> changeLogger.logChange(changelogKey, bf
>>>> }
>>>> }
>>>>
>>>> which in turn calls logChange that gives the error.
>>>>
>>>> Am I missing something ?
>>>>
>>>&
>>
>>> regards.
>>>
>>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy wrote:
>>>
>>>> Hi,
>>>>
>>>> It is because you are calling `context.timestamp` during `commit`. At
>>>> this point there is no `RecordContext` ass
y want to log the change
>>> when you write to the store.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh
>>> wrote:
>>>
>>>> Just to give some more information, the ProcessorContext tha
gt; to the init method of the custom store has a null RecordContext. Gave the
>>> following debug statement ..
>>>
>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>>
>>> and got null.
>>>
>>> regards.
>>>
rintln(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>
>> and got null.
>>
>> regards.
>>
>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh
>> wrote:
>>
>> > Hi -
>> >
>> > I
xt.asInstanceOf[ProcessorContextImpl].recordContext)
>
> and got null.
>
> regards.
>
> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh
> wrote:
>
> > Hi -
> >
> > I have implemented a custom state store named BFStore with a change
> > logger as follows
PM, Debasish Ghosh
wrote:
> Hi -
>
> I have implemented a custom state store named BFStore with a change
> logger as follows:
>
> class BFStoreChangeLogger[K, V](val storeName: String,
> val context: ProcessorContext,
>
Hi -
I have implemented a custom state store named BFStore with a change logger
as follows:
class BFStoreChangeLogger[K, V](val storeName: String,
val context: ProcessorContext,
val partition: Int
14 matches
Mail list logo