here is the key code (in kotlin)

 val ks =  object: KeySelector<Tuple2<TraceKeyOuterClass.TraceKey,
TraceFragmentOuterClass.TraceFragment>, TraceKeyOuterClass.TraceKey> {
        override fun getKey(it:Tuple2<TraceKeyOuterClass.TraceKey,
TraceFragmentOuterClass.TraceFragment>): TraceKeyOuterClass.TraceKey {
            return it.f0
        }
    }

and here is the code that uses it:

env.addSource(kafkaConsumer, name_source)

.name(name_source).uid(name_source).setMaxParallelism(Config.MAX_PARALLELISM)
            .keyBy (ks)

.window(EventTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.seconds(60)))
            .process(MyProcessor())

.name(name_processor).uid(name_processor).setMaxParallelism(Config.MAX_PARALLELISM)
            .addSink(kafkaProducer)
            .uid(name_sink).name(name_sink)


i am using protobufserializer from chill-protobuf library for serde. its
configured as follows:


env.config.registerTypeWithKryoSerializer(TraceFragmentOuterClass.TraceFragment::class.java,
ProtobufSerializer::class.java)

env.config.registerTypeWithKryoSerializer(TraceKeyOuterClass.TraceKey::class.java,
ProtobufSerializer::class.java)

env.config.registerTypeWithKryoSerializer(FullTraceOuterClass.FullTrace::class.java,
ProtobufSerializer::class.java)

env.config.registerTypeWithKryoSerializer(SpanOuterClass.Span::class.java,
ProtobufSerializer::class.java)


On Sun, Mar 20, 2022 at 12:15 AM caoyu <javaca...@163.com> wrote:

> Would you like copy the key code here to help debugging.
>
> ---- Replied Message ----
> From Prashant Deva<prash...@astradot.com> <prash...@astradot.com>
> Date 03/20/2022 12:24
> To user<user@flink.apache.org> <user@flink.apache.org>
> Subject exception when parallelizing application
> using flink 1.13.2. When i increase the parallelization of my application
> from 1 to 2, i see the following exceptions. what do they mean? how can i
> possibly fix this?
>
> java.lang.IllegalArgumentException: key group from 128 to 256 does not 
> contain 89
>       at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>       at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>       at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>       at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>       at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922)
>       at 
> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:44)
>       at 
> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:936)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
>       at java.base/java.lang.Thread.run(Thread.java:829)
>
>

Reply via email to