Possible bug in Kafka producer partitioning logic
Hi all, We had some problems with custom partitioning for the 0.8 Kafka producer and now that I checked the code it seems there might be a problem with the logic. The producer determines the number of partitions in the open method and seems to be using that as a value passed to the custom partitioner for producing the records. This will however only work if the defaultTopicId (topic) has the same number of partitions as all other topics in the kafka cluster when producing to multiple topics. In our case the default topic had 16 and new ones have 3 as default so it gives an out of range partition error. Is my understanding correct or am I overlooking something? Thank you! Gyula
[jira] [Created] (FLINK-6286) hbase command not found error
Jinjiang Ling created FLINK-6286: Summary: hbase command not found error Key: FLINK-6286 URL: https://issues.apache.org/jira/browse/FLINK-6286 Project: Flink Issue Type: Bug Reporter: Jinjiang Ling Priority: Minor As I'm using flink where set the HBASE_CONF_DIR env variable and don't install hbase. Then I get the error message below. {quote} *bin/config.sh: line 303: hbase: command not found* {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6287) Flakey JobManagerRegistrationTest
Nico Kruber created FLINK-6287: -- Summary: Flakey JobManagerRegistrationTest Key: FLINK-6287 URL: https://issues.apache.org/jira/browse/FLINK-6287 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.3.0 Environment: unit tests Reporter: Nico Kruber There seems to be a race condition in the "{{JobManagerRegistrationTest.The JobManager should handle repeated registration calls}}" (scala) unit test. Every so often, especially when my system is under load, this test fails with a timeout after seeing the following messages in the log4j INFO outputs: {code} 14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203 14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/$f. 14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(----,RegisterResourceManager akka://flink/user/resourcemanager-61e00f37-9e99-4355-9099-53b992e8232e) because there is currently no valid leader id known. 14:18:42,259 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/$f was granted leadership with leader session ID Some(----). {code} Full log: {code} 14:18:42,230 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-d09b13ee-26bb-4ec8-950a-956f4a6c16cf 14:18:42,231 WARN org.apache.flink.runtime.net.SSLUtils - Not a SSL socket, will skip setting tls version and cipher suites. 14:18:42,236 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:39695 - max concurrent requests: 50 - max backlog: 1000 14:18:42,247 INFO org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter configured, no metrics will be exposed/reported. 14:18:42,249 WARN org.apache.flink.runtime.metrics.MetricRegistry - Could not start MetricDumpActor. No metrics will be submitted to the WebInterface. akka.actor.InvalidActorNameException: actor name [MetricQueryService] is not unique! at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76) at akka.actor.ActorCell.reserveChild(ActorCell.scala:369) at akka.actor.dungeon.Children$class.makeChild(Children.scala:201) at akka.actor.dungeon.Children$class.attachChild(Children.scala:41) at akka.actor.ActorCell.attachChild(ActorCell.scala:369) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553) at org.apache.flink.runtime.metrics.dump.MetricQueryService.startMetricQueryService(MetricQueryService.java:170) at org.apache.flink.runtime.metrics.MetricRegistry.startQueryService(MetricRegistry.java:166) at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2720) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$apache$flink$runtime$jobmanager$JobManagerRegistrationTest$$startTestingJobManager(JobManagerRegistrationTest.scala:182) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:123) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeO
[jira] [Created] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
Tzu-Li (Gordon) Tai created FLINK-6288: -- Summary: FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic Key: FLINK-6288 URL: https://issues.apache.org/jira/browse/FLINK-6288 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Tzu-Li (Gordon) Tai The {{FlinkKafkaProducerBase}} supports routing records to topics besides the default topic, but the custom {{Partitioner}} interface does not follow this semantic. The partitioner is always invoked the {{partition}} method with the number of partitions in the default topic, and not the number of partitions of the current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: Possible bug in Kafka producer partitioning logic
Hi Gyula, Yes, I think the semantics of the Partitioner interface is a bit off. The `numPartitions` value ideally should be the number of partitions of the `targetTopic`. Here’s a JIRA I just filed to track the issue: https://issues.apache.org/jira/browse/FLINK-6288. Cheers, Gordon On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: Hi all, We had some problems with custom partitioning for the 0.8 Kafka producer and now that I checked the code it seems there might be a problem with the logic. The producer determines the number of partitions in the open method and seems to be using that as a value passed to the custom partitioner for producing the records. This will however only work if the defaultTopicId (topic) has the same number of partitions as all other topics in the kafka cluster when producing to multiple topics. In our case the default topic had 16 and new ones have 3 as default so it gives an out of range partition error. Is my understanding correct or am I overlooking something? Thank you! Gyula
Re: Possible bug in Kafka producer partitioning logic
Thanks for checking this out. I would say this is definitely a blocking issue for the bugfix release, what do you think? Gyula Tzu-Li (Gordon) Tai ezt írta (időpont: 2017. ápr. 10., H, 15:39): Hi Gyula, Yes, I think the semantics of the Partitioner interface is a bit off. The `numPartitions` value ideally should be the number of partitions of the `targetTopic`. Here’s a JIRA I just filed to track the issue: https://issues.apache.org/jira/browse/FLINK-6288. Cheers, Gordon On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: Hi all, We had some problems with custom partitioning for the 0.8 Kafka producer and now that I checked the code it seems there might be a problem with the logic. The producer determines the number of partitions in the open method and seems to be using that as a value passed to the custom partitioner for producing the records. This will however only work if the defaultTopicId (topic) has the same number of partitions as all other topics in the kafka cluster when producing to multiple topics. In our case the default topic had 16 and new ones have 3 as default so it gives an out of range partition error. Is my understanding correct or am I overlooking something? Thank you! Gyula
Re: Possible bug in Kafka producer partitioning logic
I would prefer to make this a blocker for a future bugfix actually, and not 1.2.1. The reason is that to fix this properly we might need to look again into (and possibly change) how partitioners are provided. The main problem is that the `open` method can only possibly be called once with the partitions of one topic. So, we might need the user to provide multiple partitioners, one for each of all the possible topics that will be written to. One way or another, my gut feeling is that this would need somewhat slight change to the Kafka producer APIs. And I’m not so sure of rushing API changes into releases. On April 10, 2017 at 6:46:29 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: Thanks for checking this out. I would say this is definitely a blocking issue for the bugfix release, what do you think? Gyula Tzu-Li (Gordon) Tai ezt írta (időpont: 2017. ápr. 10., H, 15:39): Hi Gyula, Yes, I think the semantics of the Partitioner interface is a bit off. The `numPartitions` value ideally should be the number of partitions of the `targetTopic`. Here’s a JIRA I just filed to track the issue: https://issues.apache.org/jira/browse/FLINK-6288. Cheers, Gordon On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: Hi all, We had some problems with custom partitioning for the 0.8 Kafka producer and now that I checked the code it seems there might be a problem with the logic. The producer determines the number of partitions in the open method and seems to be using that as a value passed to the custom partitioner for producing the records. This will however only work if the defaultTopicId (topic) has the same number of partitions as all other topics in the kafka cluster when producing to multiple topics. In our case the default topic had 16 and new ones have 3 as default so it gives an out of range partition error. Is my understanding correct or am I overlooking something? Thank you! Gyula
[jira] [Created] (FLINK-6289) ExecutionEnvironment.readTextFile() can read gzip files & directories but not both
Arnaud Linz created FLINK-6289: -- Summary: ExecutionEnvironment.readTextFile() can read gzip files & directories but not both Key: FLINK-6289 URL: https://issues.apache.org/jira/browse/FLINK-6289 Project: Flink Issue Type: Bug Affects Versions: 1.1.3 Reporter: Arnaud Linz Priority: Minor When calling `ExecutionEnvironment.readTextFile()` passing a ".gz" file as input, it deflates the file correctly. If I pass a directory as input, it reads the files contained in the directory. But If I pass a directory containing ".gz" files as input, it does not deflate the files and treat them as ASCII files. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: Possible bug in Kafka producer partitioning logic
I understand the reasoning, on the other hand this creates a problem that is very hard to work around. :/ Do you have any suggestions how to get around this? Gyula Tzu-Li (Gordon) Tai ezt írta (időpont: 2017. ápr. 10., H, 15:57): > I would prefer to make this a blocker for a future bugfix actually, and > not 1.2.1. > > The reason is that to fix this properly we might need to look again into > (and possibly change) how partitioners are provided. > The main problem is that the `open` method can only possibly be called > once with the partitions of one topic. > So, we might need the user to provide multiple partitioners, one for each > of all the possible topics that will be written to. > > One way or another, my gut feeling is that this would need somewhat slight > change to the Kafka producer APIs. > And I’m not so sure of rushing API changes into releases. > > > On April 10, 2017 at 6:46:29 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: > > Thanks for checking this out. > > I would say this is definitely a blocking issue for the bugfix release, > what do you think? > > Gyula > > Tzu-Li (Gordon) Tai ezt írta (időpont: 2017. ápr. > 10., H, 15:39): > > Hi Gyula, > > Yes, I think the semantics of the Partitioner interface is a bit off. > The `numPartitions` value ideally should be the number of partitions of the > `targetTopic`. > > Here’s a JIRA I just filed to track the issue: > https://issues.apache.org/jira/browse/FLINK-6288. > > Cheers, > Gordon > > On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: > > Hi all, > > We had some problems with custom partitioning for the 0.8 Kafka producer > and now that I checked the code it seems there might be a problem with the > logic. > > The producer determines the number of partitions in the open method and > seems to be using that as a value passed to the custom partitioner for > producing the records. > This will however only work if the defaultTopicId (topic) has the same > number of partitions as all other topics in the kafka cluster when > producing to multiple topics. > > In our case the default topic had 16 and new ones have 3 as default so it > gives an out of range partition error. > > Is my understanding correct or am I overlooking something? > > Thank you! > Gyula >
Re: Possible bug in Kafka producer partitioning logic
In the worst case scenario we will have a custom build that will just cache the different partition numbers in a map. (But still call partitioner.open only once) I think this simple intermediate fix would actually be good enough for most people who get blocked by this in the short run. Gyula Gyula Fóra ezt írta (időpont: 2017. ápr. 10., H, 16:01): > I understand the reasoning, on the other hand this creates a problem that > is very hard to work around. :/ > > Do you have any suggestions how to get around this? > > Gyula > > Tzu-Li (Gordon) Tai ezt írta (időpont: 2017. ápr. > 10., H, 15:57): > > I would prefer to make this a blocker for a future bugfix actually, and > not 1.2.1. > > The reason is that to fix this properly we might need to look again into > (and possibly change) how partitioners are provided. > The main problem is that the `open` method can only possibly be called > once with the partitions of one topic. > So, we might need the user to provide multiple partitioners, one for each > of all the possible topics that will be written to. > > One way or another, my gut feeling is that this would need somewhat slight > change to the Kafka producer APIs. > And I’m not so sure of rushing API changes into releases. > > > On April 10, 2017 at 6:46:29 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: > > Thanks for checking this out. > > I would say this is definitely a blocking issue for the bugfix release, > what do you think? > > Gyula > > Tzu-Li (Gordon) Tai ezt írta (időpont: 2017. ápr. > 10., H, 15:39): > > Hi Gyula, > > Yes, I think the semantics of the Partitioner interface is a bit off. > The `numPartitions` value ideally should be the number of partitions of the > `targetTopic`. > > Here’s a JIRA I just filed to track the issue: > https://issues.apache.org/jira/browse/FLINK-6288. > > Cheers, > Gordon > > On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote: > > Hi all, > > We had some problems with custom partitioning for the 0.8 Kafka producer > and now that I checked the code it seems there might be a problem with the > logic. > > The producer determines the number of partitions in the open method and > seems to be using that as a value passed to the custom partitioner for > producing the records. > This will however only work if the defaultTopicId (topic) has the same > number of partitions as all other topics in the kafka cluster when > producing to multiple topics. > > In our case the default topic had 16 and new ones have 3 as default so it > gives an out of range partition error. > > Is my understanding correct or am I overlooking something? > > Thank you! > Gyula > >
[jira] [Created] (FLINK-6290) SharedBuffer is improperly released when multiple edges between entries
Dawid Wysakowicz created FLINK-6290: --- Summary: SharedBuffer is improperly released when multiple edges between entries Key: FLINK-6290 URL: https://issues.apache.org/jira/browse/FLINK-6290 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.3.0 Reporter: Dawid Wysakowicz Priority: Critical Fix For: 1.3.0 Below test right now fails: {code} @Test public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() { SharedBuffer sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); int numberEvents = 8; Event[] events = new Event[numberEvents]; final long timestamp = 1L; for (int i = 0; i < numberEvents; i++) { events[i] = new Event(i + 1, "e" + (i + 1), i); } sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1")); sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0")); sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1")); sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0")); sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0")); sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0")); //simulate IGNORE (next event can point to events[2]) sharedBuffer.lock("branching", events[2], timestamp); sharedBuffer.release("branching", events[4], timestamp); //There should be still events[1] and events[2] in the buffer assertFalse(sharedBuffer.isEmpty()); } {code} The problem is with the {{SharedBuffer#internalRemove}} method: {{ private void internalRemove(final SharedBufferEntry entry) { Stack> entriesToRemove = new Stack<>(); entriesToRemove.add(entry); while (!entriesToRemove.isEmpty()) { SharedBufferEntry currentEntry = entriesToRemove.pop(); if (currentEntry.getReferenceCounter() == 0) { currentEntry.remove(); for (SharedBufferEdge edge: currentEntry.getEdges()) { if (edge.getTarget() != null) { edge.getTarget().decreaseReferenceCounter(); entriesToRemove.push(edge.getTarget()); } } } } } }} When currentEntry has multiple edges to the same entry. The entry will be added twice to the entriesToRemove and it's edges will be removed twice. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [VOTE] Release Apache Flink 1.2.1 (RC1)
I've now started building the next release candidate. On Sun, Apr 9, 2017 at 12:37 PM, Robert Metzger wrote: > Hi Gyula, > > I'm trying to push Stefan R. to get the RocksDB fixes in asap. > > On Sat, Apr 8, 2017 at 5:17 PM, Gyula Fóra wrote: > >> Hi All, >> >> Any updates on this? >> >> It would be nice to get this out soon, the Kafka bug is hurting our prod >> jobs big time. >> >> Thanks, >> Gyula >> >> On Wed, Apr 5, 2017, 15:27 Ufuk Celebi wrote: >> >> > @Stefan: What's the state with the RocksDB fixes? I would be +1 to do >> this. >> > >> > On Tue, Apr 4, 2017 at 6:05 PM, Chesnay Schepler >> > wrote: >> > > Yes, aljoscha already opened one against master: >> > > https://github.com/apache/flink/pull/3670 >> > > >> > > On 04.04.2017 17:57, Ted Yu wrote: >> > >> >> > >> Should the commits be reverted from master branch as well ? >> > >> >> > >> On Tue, Apr 4, 2017 at 4:59 AM, Aljoscha Krettek < >> aljos...@apache.org> >> > >> wrote: >> > >> >> > >>> The commits around FLINK-5808 have been reverted on release-1.2. >> > >>> >> > On 4. Apr 2017, at 12:16, Stefan Richter < >> s.rich...@data-artisans.com >> > > >> > >>> >> > >>> wrote: >> > >> > I have created a custom build of RocksDB 4.11.2 that fixes a >> > significant >> > >>> >> > >>> performance problem with append operations. I think this should >> > >>> definitely >> > >>> be part of the 1.2.1 release because this is already blocking some >> > users. >> > >>> What is missing is uploading the jar to maven central and a testing >> > run, >> > >>> e.g. with some misbehaved job that has large state. >> > >> > >> > > Am 04.04.2017 um 11:57 schrieb Robert Metzger < >> rmetz...@apache.org>: >> > > >> > > Thank you for opening a PR for this. >> > > >> > > Chesnay, do you need more reviews for the metrics changes / >> > backports? >> > > >> > > Are there any other release blockers for 1.2.1, or are we good to >> go? >> > > >> > > On Mon, Apr 3, 2017 at 6:48 PM, Aljoscha Krettek < >> > aljos...@apache.org> >> > > wrote: >> > > >> > >> I created a PR for the revert: https://github.com/apache/ >> > >>> >> > >>> flink/pull/3664 >> > >>> >> > >>> On 3. Apr 2017, at 18:32, Stephan Ewen >> wrote: >> > >>> >> > >>> +1 for options (1), but also invest the time to fix it properly >> for >> > >>> >> > >>> 1.2.2 >> > >>> >> > >>> >> > >>> On Mon, Apr 3, 2017 at 9:10 AM, Kostas Kloudas < >> > >> >> > >> k.klou...@data-artisans.com> >> > >>> >> > >>> wrote: >> > >>> >> > +1 for 1 >> > >> > > On Apr 3, 2017, at 5:52 PM, Till Rohrmann < >> trohrm...@apache.org> >> > >> >> > >> wrote: >> > > >> > > +1 for option 1) >> > > >> > > On Mon, Apr 3, 2017 at 5:48 PM, Fabian Hueske < >> fhue...@gmail.com >> > > >> > >> >> > >> wrote: >> > >> >> > >> +1 to option 1) >> > >> >> > >> 2017-04-03 16:57 GMT+02:00 Ted Yu : >> > >> >> > >>> Looks like #1 is better - 1.2.1 would be at least as stable >> as >> > >>> >> > >>> 1.2.0 >> > >>> >> > >>> Cheers >> > >>> >> > >>> On Mon, Apr 3, 2017 at 7:39 AM, Aljoscha Krettek < >> > >> >> > >> aljos...@apache.org> >> > >>> >> > >>> wrote: >> > >>> >> > Just so we’re all on the same page. ;-) >> > >> > There was https://issues.apache.org/jira/browse/FLINK-5808 >> > which >> > >> >> > >> was >> > >> > a >> > >> > bug that we initially discovered in Flink 1.2 which was/is >> > about >> > >> >> > >> missing >> > >> > verification for the correctness of the combination of >> > >>> >> > >>> parallelism >> > >> >> > >> and >> > >> > max-parallelism. Due to lacking test coverage this >> introduced >> > two >> > >> >> > >> more >> > >>> >> > >>> bugs: >> > >> > - https://issues.apache.org/jira/browse/FLINK-6188: Some >> > setParallelism() methods can't cope with default >> parallelism >> > - https://issues.apache.org/jira/browse/FLINK-6209: >> > StreamPlanEnvironment always has a parallelism of 1 >> > >> > IMHO, the options are: >> > 1) revert the changes made for FLINK-5808 on the >> release-1.2 >> > >>> >> > >>> branch >> > >> >> > >> and >> > >> > live with the bug still being present >> > 2) put in more work to fix FLINK-5808 which requires fixing >> > some >> > >>> >> > >>> problems >> > >> > that have existed for a long time with how the parallelism >> is >> > >>> >> > >>> set in >> > >> > streaming programs >> > >
Question about the process order in stream aggregate
Hi all, I run some tests for stream aggregation on rows. The data stream is simply registered as val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 1), Order(2L, "diaper", 2), Order(3L, "diaper", 3), Order(4L, "rubber", 4))) tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount), and the SQL is defined as select product, sum(amount) over (partition by product order by procTime() rows between unbounded preceding and current row from orderA). My expected output should be 2> Result(beer,1) 2> Result(diaper,2) 1> Result(rubber,4) 2> Result(diaper,5). However, sometimes I get the following output 2> Result(beer,1) 2> Result(diaper,3) 1> Result(rubber,4) 2> Result(diaper,5). It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)" are out of order. Is that normal? BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can always be preserved. Thanks, Xingcan