There are ways you can manually force the coder here. However I would first
try to split up the KV creation into two operations. Have ProcessEvents
just create a PCollection<SharedCoreEvent>, and then a following operation
to create the KV. Something like this:
input.apply(ParDo.of(New ProcessEvents()))
.apply(WithKeys.of((SerializableFunction<SharedCoreEvent, Long>)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));
I suspect that this will allow the mechanism to better infer the final
Coder. If that doesn't work, you could always brute force it like this:
PCollection<SharedCoreEvent> coreEvents = input.apply(ParDo.of(New
ProcessEvents()));
coreEvents.apply(WithKeys.of((SerializableFunction<SharedCoreEvent, Long>)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
.setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
.apply(Reshuffle.of())
... etc.
On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas <[email protected]> wrote:
> ProcessEvents receive as an input a Session object and créate a KV<long,
> SharedCoreEvent> as an output
>
> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user <
> [email protected]> escribió:
>
>> There are some sharp edges unfortunately around auto-inference of KV
>> coders and schemas. Is there a previous PCollection of type
>> SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents?
>>
>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas <[email protected]>
>> wrote:
>>
>>> Hello guys
>>>
>>> I have a question, is it possible to use KV along with AutoValueSchema
>>> objects? I'm having troubles when I try to use it together.
>>>
>>> I have an object like the following
>>>
>>> @AutoValue
>>> @DefaultSchema(AutoValueSchema.class)
>>> public abstract class SharedCoreEvent {
>>>
>>> @JsonProperty("subscriptionId")
>>> public abstract String getSubscription();
>>>
>>> <other properties>
>>> }
>>>
>>> Then I have a pipeline like the following:
>>>
>>> input.apply(ParDo.of(new ProcessEvents()))
>>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>>
>>> My input is a single object and my ProcessEvents will produce tons of
>>> events, in a fan-out fashion. that is why I used Reshuffle here
>>>
>>> But when I run this pipeline it throws the following error:
>>>
>>> lang.IllegalStateException: Unable to return a default Coder for
>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>>> [PCollection@2131266396]. Correct one of the following root causes:
>>> No Coder has been manually specified; you may do so using .setCoder().
>>>
>>> Inferring a Coder from the CoderRegistry failed: Cannot provide
>>> coder for parameterized type
>>> org.apache.beam.sdk.values.KV<java.lang.Long, events.SharedCoreEvent>:
>>> Unable to provide a Coder for events.SharedCoreEvent
>>> Building a Coder using a registered CoderProvider failed.
>>>
>>>
>>> Something similar happens with my source when I use KafkaIO and the
>>> source produces a KV<String,Session> PCollection.
>>>
>>> Is there any reason for this? Maybe I'm misusing the schemas?
>>>
>>> Really appreciate your help
>>>
>>> Thanks
>>> Ruben
>>>
>>