Here is the entire source code of the TraceKeyOuterClass.TraceKey: https://gist.github.com/pdeva/26bd29fb5b9b842dd84d9d9d26018cc0
Its an autogenerated file by protoc. The hashCode actually is indeed stable. Here is the hashCode() method of TraceKey (also in the gist above): @java.lang.Override public int hashCode() { if (memoizedHashCode != 0) { return memoizedHashCode; } int hash = 41; hash = (19 * hash) + getDescriptor().hashCode(); hash = (37 * hash) + ACCOUNTID_FIELD_NUMBER; hash = (53 * hash) + getAccountId().hashCode(); hash = (37 * hash) + TRACEID_FIELD_NUMBER; hash = (53 * hash) + com.google.protobuf.Internal.hashLong( getTraceID()); hash = (37 * hash) + ENV_FIELD_NUMBER; hash = (53 * hash) + getEnv().hashCode(); hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; } On Sun, Mar 20, 2022 at 11:22 PM Guowei Ma <guowei....@gmail.com> wrote: > It seems that the key's hashcode is not stable. > So would you like to show the details of the `TraceKeyOuterClass.TraceKey`. > > Best, > Guowei > > > On Sun, Mar 20, 2022 at 3:21 PM Prashant Deva <prash...@astradot.com> > wrote: > >> here is the key code (in kotlin) >> >> val ks = object: KeySelector<Tuple2<TraceKeyOuterClass.TraceKey, >> TraceFragmentOuterClass.TraceFragment>, TraceKeyOuterClass.TraceKey> { >> override fun getKey(it:Tuple2<TraceKeyOuterClass.TraceKey, >> TraceFragmentOuterClass.TraceFragment>): TraceKeyOuterClass.TraceKey { >> return it.f0 >> } >> } >> >> and here is the code that uses it: >> >> env.addSource(kafkaConsumer, name_source) >> >> .name(name_source).uid(name_source).setMaxParallelism(Config.MAX_PARALLELISM) >> .keyBy (ks) >> >> .window(EventTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.seconds(60))) >> .process(MyProcessor()) >> >> .name(name_processor).uid(name_processor).setMaxParallelism(Config.MAX_PARALLELISM) >> .addSink(kafkaProducer) >> .uid(name_sink).name(name_sink) >> >> >> i am using protobufserializer from chill-protobuf library for serde. its >> configured as follows: >> >> >> env.config.registerTypeWithKryoSerializer(TraceFragmentOuterClass.TraceFragment::class.java, >> ProtobufSerializer::class.java) >> >> env.config.registerTypeWithKryoSerializer(TraceKeyOuterClass.TraceKey::class.java, >> ProtobufSerializer::class.java) >> >> env.config.registerTypeWithKryoSerializer(FullTraceOuterClass.FullTrace::class.java, >> ProtobufSerializer::class.java) >> >> env.config.registerTypeWithKryoSerializer(SpanOuterClass.Span::class.java, >> ProtobufSerializer::class.java) >> >> >> On Sun, Mar 20, 2022 at 12:15 AM caoyu <javaca...@163.com> wrote: >> >>> Would you like copy the key code here to help debugging. >>> >>> ---- Replied Message ---- >>> From Prashant Deva<prash...@astradot.com> <prash...@astradot.com> >>> Date 03/20/2022 12:24 >>> To user<user@flink.apache.org> <user@flink.apache.org> >>> Subject exception when parallelizing application >>> using flink 1.13.2. When i increase the parallelization of my >>> application from 1 to 2, i see the following exceptions. what do they mean? >>> how can i possibly fix this? >>> >>> java.lang.IllegalArgumentException: key group from 128 to 256 does not >>> contain 89 >>> at >>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922) >>> at >>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:44) >>> at >>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:936) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >>> at >>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >>> at >>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) >>> at java.base/java.lang.Thread.run(Thread.java:829) >>> >>>