Possible bug in Kafka producer partitioning logic

2017-04-10 Thread Gyula Fóra
Hi all,

We had some problems with custom partitioning for the 0.8 Kafka producer
and now that I checked the code it seems there might be a problem with the
logic.

The producer determines the number of partitions in the open method and
seems to be using that as a value passed to the custom partitioner for
producing the records.
This will however only work if the defaultTopicId (topic) has the same
number of partitions as all other topics in the kafka cluster when
producing to multiple topics.

In our case the default topic had 16 and new ones have 3 as default so it
gives an out of range partition error.

Is my understanding correct or am I overlooking something?

Thank you!
Gyula


[jira] [Created] (FLINK-6286) hbase command not found error

2017-04-10 Thread Jinjiang Ling (JIRA)
Jinjiang Ling created FLINK-6286:


 Summary: hbase command not found error
 Key: FLINK-6286
 URL: https://issues.apache.org/jira/browse/FLINK-6286
 Project: Flink
  Issue Type: Bug
Reporter: Jinjiang Ling
Priority: Minor


As I'm using flink where set the HBASE_CONF_DIR env variable and don't install 
hbase. Then I get the error message below.
{quote}
*bin/config.sh: line 303: hbase: command not found*
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6287) Flakey JobManagerRegistrationTest

2017-04-10 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-6287:
--

 Summary: Flakey JobManagerRegistrationTest
 Key: FLINK-6287
 URL: https://issues.apache.org/jira/browse/FLINK-6287
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.3.0
 Environment: unit tests
Reporter: Nico Kruber


There seems to be a race condition in the "{{JobManagerRegistrationTest.The 
JobManager should handle repeated registration calls}}" (scala) unit test.
Every so often, especially when my system is under load, this test fails with a 
timeout after seeing the following messages in the log4j INFO outputs:

{code}
14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - 
Trying to associate with JobManager leader akka://flink/user/$f#-1062324203
14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting 
JobManager at akka://flink/user/$f.
14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
message 
LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c
 @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, 
heap=1556938752, managed=10,1)) because there is currently no valid leader id 
known.
14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
message 
LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c
 @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, 
heap=1556938752, managed=10,1)) because there is currently no valid leader id 
known.
14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
message 
LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c
 @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, 
heap=1556938752, managed=10,1)) because there is currently no valid leader id 
known.
14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
message 
LeaderSessionMessage(----,RegisterResourceManager
 akka://flink/user/resourcemanager-61e00f37-9e99-4355-9099-53b992e8232e) 
because there is currently no valid leader id known.
14:18:42,259 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager 
akka://flink/user/$f was granted leadership with leader session ID 
Some(----).
{code}

Full log:
{code}
 14:18:42,230 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB 
server storage directory /tmp/blobStore-d09b13ee-26bb-4ec8-950a-956f4a6c16cf
14:18:42,231 WARN org.apache.flink.runtime.net.SSLUtils - Not a SSL socket, 
will skip setting tls version and cipher suites.
14:18:42,236 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB 
server at 0.0.0.0:39695 - max concurrent requests: 50 - max backlog: 1000
14:18:42,247 INFO org.apache.flink.runtime.metrics.MetricRegistry - No metrics 
reporter configured, no metrics will be exposed/reported.
14:18:42,249 WARN org.apache.flink.runtime.metrics.MetricRegistry - Could not 
start MetricDumpActor. No metrics will be submitted to the WebInterface.
akka.actor.InvalidActorNameException: actor name [MetricQueryService] is not 
unique!
at 
akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.startMetricQueryService(MetricQueryService.java:170)
at 
org.apache.flink.runtime.metrics.MetricRegistry.startQueryService(MetricRegistry.java:166)
at 
org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2720)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$apache$flink$runtime$jobmanager$JobManagerRegistrationTest$$startTestingJobManager(JobManagerRegistrationTest.scala:182)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:123)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeO

[jira] [Created] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-10 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6288:
--

 Summary: FlinkKafkaProducer's custom Partitioner is always invoked 
with number of partitions of default topic
 Key: FLINK-6288
 URL: https://issues.apache.org/jira/browse/FLINK-6288
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Tzu-Li (Gordon) Tai


The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
default topic, but the custom {{Partitioner}} interface does not follow this 
semantic.

The partitioner is always invoked the {{partition}} method with the number of 
partitions in the default topic, and not the number of partitions of the 
current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Possible bug in Kafka producer partitioning logic

2017-04-10 Thread Tzu-Li (Gordon) Tai
Hi Gyula,

Yes, I think the semantics of the Partitioner interface is a bit off.
The `numPartitions` value ideally should be the number of partitions of the 
`targetTopic`.

Here’s a JIRA I just filed to track the issue: 
https://issues.apache.org/jira/browse/FLINK-6288.

Cheers,
Gordon

On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:

Hi all,  

We had some problems with custom partitioning for the 0.8 Kafka producer  
and now that I checked the code it seems there might be a problem with the  
logic.  

The producer determines the number of partitions in the open method and  
seems to be using that as a value passed to the custom partitioner for  
producing the records.  
This will however only work if the defaultTopicId (topic) has the same  
number of partitions as all other topics in the kafka cluster when  
producing to multiple topics.  

In our case the default topic had 16 and new ones have 3 as default so it  
gives an out of range partition error.  

Is my understanding correct or am I overlooking something?  

Thank you!  
Gyula  


Re: Possible bug in Kafka producer partitioning logic

2017-04-10 Thread Gyula Fóra
Thanks for checking this out.

I would say this is definitely a blocking issue for the bugfix release,
what do you think?

Gyula

Tzu-Li (Gordon) Tai  ezt írta (időpont: 2017. ápr.
10., H, 15:39):

Hi Gyula,

Yes, I think the semantics of the Partitioner interface is a bit off.
The `numPartitions` value ideally should be the number of partitions of the
`targetTopic`.

Here’s a JIRA I just filed to track the issue:
https://issues.apache.org/jira/browse/FLINK-6288.

Cheers,
Gordon

On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:

Hi all,

We had some problems with custom partitioning for the 0.8 Kafka producer
and now that I checked the code it seems there might be a problem with the
logic.

The producer determines the number of partitions in the open method and
seems to be using that as a value passed to the custom partitioner for
producing the records.
This will however only work if the defaultTopicId (topic) has the same
number of partitions as all other topics in the kafka cluster when
producing to multiple topics.

In our case the default topic had 16 and new ones have 3 as default so it
gives an out of range partition error.

Is my understanding correct or am I overlooking something?

Thank you!
Gyula


Re: Possible bug in Kafka producer partitioning logic

2017-04-10 Thread Tzu-Li (Gordon) Tai
I would prefer to make this a blocker for a future bugfix actually, and not 
1.2.1.

The reason is that to fix this properly we might need to look again into (and 
possibly change) how partitioners are provided.
The main problem is that the `open` method can only possibly be called once 
with the partitions of one topic.
So, we might need the user to provide multiple partitioners, one for each of 
all the possible topics that will be written to.

One way or another, my gut feeling is that this would need somewhat slight 
change to the Kafka producer APIs.
And I’m not so sure of rushing API changes into releases.


On April 10, 2017 at 6:46:29 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:

Thanks for checking this out.  

I would say this is definitely a blocking issue for the bugfix release,  
what do you think?  

Gyula  

Tzu-Li (Gordon) Tai  ezt írta (időpont: 2017. ápr.  
10., H, 15:39):  

Hi Gyula,  

Yes, I think the semantics of the Partitioner interface is a bit off.  
The `numPartitions` value ideally should be the number of partitions of the  
`targetTopic`.  

Here’s a JIRA I just filed to track the issue:  
https://issues.apache.org/jira/browse/FLINK-6288.  

Cheers,  
Gordon  

On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:  

Hi all,  

We had some problems with custom partitioning for the 0.8 Kafka producer  
and now that I checked the code it seems there might be a problem with the  
logic.  

The producer determines the number of partitions in the open method and  
seems to be using that as a value passed to the custom partitioner for  
producing the records.  
This will however only work if the defaultTopicId (topic) has the same  
number of partitions as all other topics in the kafka cluster when  
producing to multiple topics.  

In our case the default topic had 16 and new ones have 3 as default so it  
gives an out of range partition error.  

Is my understanding correct or am I overlooking something?  

Thank you!  
Gyula  


[jira] [Created] (FLINK-6289) ExecutionEnvironment.readTextFile() can read gzip files & directories but not both

2017-04-10 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-6289:
--

 Summary: ExecutionEnvironment.readTextFile() can read gzip files & 
directories but not both
 Key: FLINK-6289
 URL: https://issues.apache.org/jira/browse/FLINK-6289
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.3
Reporter: Arnaud Linz
Priority: Minor


When calling `ExecutionEnvironment.readTextFile()` passing a ".gz" file as 
input, it deflates the file correctly. If I pass a directory as input, it reads 
the files contained in the directory. But If I pass a directory containing 
".gz" files as input, it does not deflate the files and treat them as ASCII 
files.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Possible bug in Kafka producer partitioning logic

2017-04-10 Thread Gyula Fóra
I understand the reasoning, on the other hand this creates a problem that
is very hard to work around. :/

Do you have any suggestions how to get around this?

Gyula

Tzu-Li (Gordon) Tai  ezt írta (időpont: 2017. ápr.
10., H, 15:57):

> I would prefer to make this a blocker for a future bugfix actually, and
> not 1.2.1.
>
> The reason is that to fix this properly we might need to look again into
> (and possibly change) how partitioners are provided.
> The main problem is that the `open` method can only possibly be called
> once with the partitions of one topic.
> So, we might need the user to provide multiple partitioners, one for each
> of all the possible topics that will be written to.
>
> One way or another, my gut feeling is that this would need somewhat slight
> change to the Kafka producer APIs.
> And I’m not so sure of rushing API changes into releases.
>
>
> On April 10, 2017 at 6:46:29 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:
>
> Thanks for checking this out.
>
> I would say this is definitely a blocking issue for the bugfix release,
> what do you think?
>
> Gyula
>
> Tzu-Li (Gordon) Tai  ezt írta (időpont: 2017. ápr.
> 10., H, 15:39):
>
> Hi Gyula,
>
> Yes, I think the semantics of the Partitioner interface is a bit off.
> The `numPartitions` value ideally should be the number of partitions of the
> `targetTopic`.
>
> Here’s a JIRA I just filed to track the issue:
> https://issues.apache.org/jira/browse/FLINK-6288.
>
> Cheers,
> Gordon
>
> On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:
>
> Hi all,
>
> We had some problems with custom partitioning for the 0.8 Kafka producer
> and now that I checked the code it seems there might be a problem with the
> logic.
>
> The producer determines the number of partitions in the open method and
> seems to be using that as a value passed to the custom partitioner for
> producing the records.
> This will however only work if the defaultTopicId (topic) has the same
> number of partitions as all other topics in the kafka cluster when
> producing to multiple topics.
>
> In our case the default topic had 16 and new ones have 3 as default so it
> gives an out of range partition error.
>
> Is my understanding correct or am I overlooking something?
>
> Thank you!
> Gyula
>


Re: Possible bug in Kafka producer partitioning logic

2017-04-10 Thread Gyula Fóra
In the worst case scenario we will have a custom build that will just cache
the different partition numbers in a map. (But still call partitioner.open
only once)
I think this simple intermediate fix would actually be good enough for most
people who get blocked by this in the short run.

Gyula

Gyula Fóra  ezt írta (időpont: 2017. ápr. 10., H,
16:01):

> I understand the reasoning, on the other hand this creates a problem that
> is very hard to work around. :/
>
> Do you have any suggestions how to get around this?
>
> Gyula
>
> Tzu-Li (Gordon) Tai  ezt írta (időpont: 2017. ápr.
> 10., H, 15:57):
>
> I would prefer to make this a blocker for a future bugfix actually, and
> not 1.2.1.
>
> The reason is that to fix this properly we might need to look again into
> (and possibly change) how partitioners are provided.
> The main problem is that the `open` method can only possibly be called
> once with the partitions of one topic.
> So, we might need the user to provide multiple partitioners, one for each
> of all the possible topics that will be written to.
>
> One way or another, my gut feeling is that this would need somewhat slight
> change to the Kafka producer APIs.
> And I’m not so sure of rushing API changes into releases.
>
>
> On April 10, 2017 at 6:46:29 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:
>
> Thanks for checking this out.
>
> I would say this is definitely a blocking issue for the bugfix release,
> what do you think?
>
> Gyula
>
> Tzu-Li (Gordon) Tai  ezt írta (időpont: 2017. ápr.
> 10., H, 15:39):
>
> Hi Gyula,
>
> Yes, I think the semantics of the Partitioner interface is a bit off.
> The `numPartitions` value ideally should be the number of partitions of the
> `targetTopic`.
>
> Here’s a JIRA I just filed to track the issue:
> https://issues.apache.org/jira/browse/FLINK-6288.
>
> Cheers,
> Gordon
>
> On April 10, 2017 at 1:16:18 AM, Gyula Fóra (gyula.f...@gmail.com) wrote:
>
> Hi all,
>
> We had some problems with custom partitioning for the 0.8 Kafka producer
> and now that I checked the code it seems there might be a problem with the
> logic.
>
> The producer determines the number of partitions in the open method and
> seems to be using that as a value passed to the custom partitioner for
> producing the records.
> This will however only work if the defaultTopicId (topic) has the same
> number of partitions as all other topics in the kafka cluster when
> producing to multiple topics.
>
> In our case the default topic had 16 and new ones have 3 as default so it
> gives an out of range partition error.
>
> Is my understanding correct or am I overlooking something?
>
> Thank you!
> Gyula
>
>


[jira] [Created] (FLINK-6290) SharedBuffer is improperly released when multiple edges between entries

2017-04-10 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-6290:
---

 Summary: SharedBuffer is improperly released when multiple edges 
between entries
 Key: FLINK-6290
 URL: https://issues.apache.org/jira/browse/FLINK-6290
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.3.0
Reporter: Dawid Wysakowicz
Priority: Critical
 Fix For: 1.3.0


Below test right now fails:
{code}
@Test
public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
SharedBuffer sharedBuffer = new 
SharedBuffer<>(Event.createTypeSerializer());
int numberEvents = 8;
Event[] events = new Event[numberEvents];
final long timestamp = 1L;

for (int i = 0; i < numberEvents; i++) {
events[i] = new Event(i + 1, "e" + (i + 1), i);
}

sharedBuffer.put("start", events[1], timestamp, 
DeweyNumber.fromString("1"));
sharedBuffer.put("branching", events[2], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.0"));
sharedBuffer.put("branching", events[3], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.1"));
sharedBuffer.put("branching", events[3], timestamp, 
"branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));

//simulate IGNORE (next event can point to events[2])
sharedBuffer.lock("branching", events[2], timestamp);

sharedBuffer.release("branching", events[4], timestamp);

//There should be still events[1] and events[2] in the buffer
assertFalse(sharedBuffer.isEmpty());
}
{code}

The problem is with the {{SharedBuffer#internalRemove}} method:

{{
private void internalRemove(final SharedBufferEntry entry) {
Stack> entriesToRemove = new Stack<>();
entriesToRemove.add(entry);

while (!entriesToRemove.isEmpty()) {
SharedBufferEntry currentEntry = 
entriesToRemove.pop();

if (currentEntry.getReferenceCounter() == 0) {
currentEntry.remove();

for (SharedBufferEdge edge: 
currentEntry.getEdges()) {
if (edge.getTarget() != null) {

edge.getTarget().decreaseReferenceCounter();

entriesToRemove.push(edge.getTarget());
}
}
}
}
}
}}

When currentEntry has multiple edges to the same entry. The entry will be added 
twice to the entriesToRemove and it's edges will be removed twice.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] Release Apache Flink 1.2.1 (RC1)

2017-04-10 Thread Robert Metzger
I've now started building the next release candidate.

On Sun, Apr 9, 2017 at 12:37 PM, Robert Metzger  wrote:

> Hi Gyula,
>
> I'm trying to push Stefan R. to get the RocksDB fixes in asap.
>
> On Sat, Apr 8, 2017 at 5:17 PM, Gyula Fóra  wrote:
>
>> Hi All,
>>
>> Any updates on this?
>>
>> It would be nice to get this out soon, the Kafka bug is hurting our prod
>> jobs big time.
>>
>> Thanks,
>> Gyula
>>
>> On Wed, Apr 5, 2017, 15:27 Ufuk Celebi  wrote:
>>
>> > @Stefan: What's the state with the RocksDB fixes? I would be +1 to do
>> this.
>> >
>> > On Tue, Apr 4, 2017 at 6:05 PM, Chesnay Schepler 
>> > wrote:
>> > > Yes, aljoscha already opened one against master:
>> > > https://github.com/apache/flink/pull/3670
>> > >
>> > > On 04.04.2017 17:57, Ted Yu wrote:
>> > >>
>> > >> Should the commits be reverted from master branch as well ?
>> > >>
>> > >> On Tue, Apr 4, 2017 at 4:59 AM, Aljoscha Krettek <
>> aljos...@apache.org>
>> > >> wrote:
>> > >>
>> > >>> The commits around FLINK-5808 have been reverted on release-1.2.
>> > >>>
>> >  On 4. Apr 2017, at 12:16, Stefan Richter <
>> s.rich...@data-artisans.com
>> > >
>> > >>>
>> > >>> wrote:
>> > 
>> >  I have created a custom build of RocksDB 4.11.2 that fixes a
>> > significant
>> > >>>
>> > >>> performance problem with append operations. I think this should
>> > >>> definitely
>> > >>> be part of the 1.2.1 release because this is already blocking some
>> > users.
>> > >>> What is missing is uploading the jar to maven central and a testing
>> > run,
>> > >>> e.g. with some misbehaved job that has large state.
>> > 
>> > 
>> > > Am 04.04.2017 um 11:57 schrieb Robert Metzger <
>> rmetz...@apache.org>:
>> > >
>> > > Thank you for opening a PR for this.
>> > >
>> > > Chesnay, do you need more reviews for the metrics changes /
>> > backports?
>> > >
>> > > Are there any other release blockers for 1.2.1, or are we good to
>> go?
>> > >
>> > > On Mon, Apr 3, 2017 at 6:48 PM, Aljoscha Krettek <
>> > aljos...@apache.org>
>> > > wrote:
>> > >
>> > >> I created a PR for the revert: https://github.com/apache/
>> > >>>
>> > >>> flink/pull/3664
>> > >>>
>> > >>> On 3. Apr 2017, at 18:32, Stephan Ewen 
>> wrote:
>> > >>>
>> > >>> +1 for options (1), but also invest the time to fix it properly
>> for
>> > >>>
>> > >>> 1.2.2
>> > >>>
>> > >>>
>> > >>> On Mon, Apr 3, 2017 at 9:10 AM, Kostas Kloudas <
>> > >>
>> > >> k.klou...@data-artisans.com>
>> > >>>
>> > >>> wrote:
>> > >>>
>> >  +1 for 1
>> > 
>> > > On Apr 3, 2017, at 5:52 PM, Till Rohrmann <
>> trohrm...@apache.org>
>> > >>
>> > >> wrote:
>> > >
>> > > +1 for option 1)
>> > >
>> > > On Mon, Apr 3, 2017 at 5:48 PM, Fabian Hueske <
>> fhue...@gmail.com
>> > >
>> > >>
>> > >> wrote:
>> > >>
>> > >> +1 to option 1)
>> > >>
>> > >> 2017-04-03 16:57 GMT+02:00 Ted Yu :
>> > >>
>> > >>> Looks like #1 is better - 1.2.1 would be at least as stable
>> as
>> > >>>
>> > >>> 1.2.0
>> > >>>
>> > >>> Cheers
>> > >>>
>> > >>> On Mon, Apr 3, 2017 at 7:39 AM, Aljoscha Krettek <
>> > >>
>> > >> aljos...@apache.org>
>> > >>>
>> > >>> wrote:
>> > >>>
>> >  Just so we’re all on the same page. ;-)
>> > 
>> >  There was https://issues.apache.org/jira/browse/FLINK-5808
>> > which
>> > >>
>> > >> was
>> > 
>> >  a
>> > 
>> >  bug that we initially discovered in Flink 1.2 which was/is
>> > about
>> > >>
>> > >> missing
>> > 
>> >  verification for the correctness of the combination of
>> > >>>
>> > >>> parallelism
>> > >>
>> > >> and
>> > 
>> >  max-parallelism. Due to lacking test coverage this
>> introduced
>> >  two
>> > >>
>> > >> more
>> > >>>
>> > >>> bugs:
>> > 
>> >  - https://issues.apache.org/jira/browse/FLINK-6188: Some
>> >  setParallelism() methods can't cope with default
>> parallelism
>> >  - https://issues.apache.org/jira/browse/FLINK-6209:
>> >  StreamPlanEnvironment always has a parallelism of 1
>> > 
>> >  IMHO, the options are:
>> >  1) revert the changes made for FLINK-5808 on the
>> release-1.2
>> > >>>
>> > >>> branch
>> > >>
>> > >> and
>> > 
>> >  live with the bug still being present
>> >  2) put in more work to fix FLINK-5808 which requires fixing
>> > some
>> > >>>
>> > >>> problems
>> > 
>> >  that have existed for a long time with how the parallelism
>> is
>> > >>>
>> > >>> set in
>> > 
>> >  streaming programs
>> > >

Question about the process order in stream aggregate

2017-04-10 Thread Xingcan Cui
Hi all,

I run some tests for stream aggregation on rows. The data stream is simply
registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
  Order(1L, "beer", 1),
  Order(2L, "diaper", 2),
  Order(3L, "diaper", 3),
  Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime()
rows between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order
for them can always be preserved.

Thanks,
Xingcan