[jira] [Created] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session

2016-07-06 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4156:
-

 Summary: Job with -m yarn-cluster registers TaskManagers to 
another running Yarn session
 Key: FLINK-4156
 URL: https://issues.apache.org/jira/browse/FLINK-4156
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Reporter: Stefan Richter


When a job is started using cluster mode (-m yarn-cluster) and a Yarn session 
is running on the same cluster, the job accidentally registers it's worker 
tasks with the  ongoing Yarn session. This happens because the same Zookeeper 
namespace is used. 

We should consider isolating Flink applications from another by using UUIDS, 
e.g. based on their application ids, in their Zookeeper paths.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Discussion] Query Regarding Operator chaining

2016-07-06 Thread Aljoscha Krettek
Hi,
unfortunately the reading of one Kafka partition cannot be split among
several parallel instances of the Kafka source. So if you have only 2
partitions your reading parallelism is limited to that. You are right that
this can lead to bad performance and underutilization. The only solution I
see right now is to have more partitions in Kafka so that more readers can
read in parallel.

+Robert Adding Robert directly because he might have something more to say
about this.

Cheers,
Aljoscha

On Tue, 5 Jul 2016 at 15:48 Vinay Patil  wrote:

> Hi,
>
> The re-balance actually distributes it to all the task managers, and now
> all TM's are getting utilized, You were right , I am seeing two
> boxes(Tasks) now.
>
> I have one question regarding the task slots :
>
> For the source the parallelism is set to 56, now when we see on the UI and
> click on source sub-task , I see 56 entries , out of which only two are
> getting the data from Kafka (this may be because I have two kafka
> partitions)
>
> The 56 entries that I am seeing for a sub-task on UI are the total task
> slots of all TM's, right ?
>
> If yes, only two slots are getting utilized, how do I ensure enough task
> slots are getting utilized at the source ? I have 7 task managers (8 cores
> per TM), so if only 1 core each of two task manager is performing the
> consume operation, wouldn't it hamper the performance.
>
> Even if two Task managers are utilized , all 16 slots should have been used
> , right ?
>
> For the other sub-task, for all 56 entries I am seeing bytes received.
> (this may be because of applying rebalance after the source)
>
> P.S: I am reading over million records from Kafka , so need to utilize
> enough resources [Performance is the key here].
>
>
> Regards,
> Vinay Patil
>
> On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil 
> wrote:
>
> > Thanks a lot guys, this helps to understand better
> >
> > Regards,
> > Vinay Patil
> >
> > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen  wrote:
> >
> >> Just to be sure: Each *subtask* has one thread - so for each task, there
> >> are as many parallel threads (distributed across nodes) as your
> >> parallelism
> >> indicates.
> >>
> >> For most cases, having long chains and then a higher parallelism is a
> good
> >> choice.
> >> Cases where individual functions (MapFunction, etc) do something very
> CPU
> >> intensive are cases where you may want to not chain them, so they get a
> >> separate thread.
> >>
> >> If you see all tasks in one box in the UI, it probably means you have
> only
> >> "Filter" and "Map" as a function? In that case it is fine to have just
> one
> >> box (=Task) in the UI. The box still has parallelism via subtasks.
> >>
> >> If you insert a "rebalance()" between the Kafka Source and the
> >> Map/Filter/etc it makes sure that the data distribution in the
> >> Map/Filter/etc operators has best utilization independent of how the
> data
> >> was partitioned in Kafka.
> >> You should then also see two boxes in the UI - one for the Kafka Source,
> >> one for the actual processing.
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek 
> >> wrote:
> >>
> >> > Hi,
> >> > chaining is useful to minimize communication overhead. But in your
> case
> >> you
> >> > might benefit more from having good cluster utilization. There seems
> to
> >> be
> >> > a tradeoff. Maybe you can run some easy tests to see how it behaves
> for
> >> > you.
> >> >
> >> > Cheers,
> >> > Aljoscha
> >> >
> >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil 
> >> wrote:
> >> >
> >> > > Thanks,
> >> > >
> >> > > so is operator chaining useful in terms of utilizing the resources
> or
> >> we
> >> > > should keep the chaining to minimal use, say 3-4 operators and
> disable
> >> > > chaining ?
> >> > > I am worried because I am seeing all the operators in one box on
> flink
> >> > UI.
> >> > >
> >> > >
> >> > > Regards,
> >> > > Vinay Patil
> >> > >
> >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <
> aljos...@apache.org
> >> >
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > > this is true, yes. If the number of Kafka partitions is less than
> >> the
> >> > > > parallelism then some of the sources might not be utilized. If you
> >> > > insert a
> >> > > > rebalance after the sources you should be able to utilize all the
> >> > > > downstream operations equally.
> >> > > >
> >> > > > Cheers,
> >> > > > Aljoscha
> >> > > >
> >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil 
> >> > wrote:
> >> > > >
> >> > > > > Just an update, the task will be executed by multiple threads ,
> my
> >> > bad
> >> > > I
> >> > > > > asked the wrong way.
> >> > > > > Can you please clarify other things.
> >> > > > >
> >> > > > > Out of 8 node only 3 of them are getting utilized, reading the
> >> data
> >> > > from
> >> > > > > Kafka , does it mean that the Kafka partitions are set to less
> >> > number ?
> >> > > > >
> >> > > > > What if we use rescale or rebalance since it evenly distributes
> ,
> >> > would
> >> >

[jira] [Created] (FLINK-4157) FlinkKafkaMetrics cause TaskManager shutdown during cancellation

2016-07-06 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4157:
-

 Summary: FlinkKafkaMetrics cause TaskManager shutdown during 
cancellation
 Key: FLINK-4157
 URL: https://issues.apache.org/jira/browse/FLINK-4157
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Robert Metzger
Assignee: Robert Metzger
Priority: Critical
 Fix For: 1.1.0


The following issue was reported by a user:
{code}
2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Sink: KafkaOutput (59/72)
2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task 
- Sink: KafkaOutput (53/72) switched to CANCELED
2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Sink: KafkaOutput (53/72)
2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy  
- 
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
at 
org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106)
at 
org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211)
at 
org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152)
at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1777)
at java.util.HashMap.writeObject(HashMap.java:1354)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at 
java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691)
at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.util.SerializedValue.(SerializedValue.java:48)
at 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
at 
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78)
at 
org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1150)
at 
org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:407)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.

[jira] [Created] (FLINK-4158) Scala QuickStart StreamingJob fails to compile

2016-07-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4158:
---

 Summary: Scala QuickStart StreamingJob fails to compile
 Key: FLINK-4158
 URL: https://issues.apache.org/jira/browse/FLINK-4158
 Project: Flink
  Issue Type: Bug
  Components: Quickstarts
Affects Versions: 1.1.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.1.0


The StreamingJob does not compile since it uses
{code}
import org.apache.flink.api.scala._
{code}

instead of
{code}
import org.apache.flink.streaming.api.scala._
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Issues while interacting with DynamoDB

2016-07-06 Thread Aljoscha Krettek
Hi,
are you running this on Yarn. If yes, the EMR Yarn installation might
already have some of the AWS jars in the classpath and that might interact
badly with the Jars that you manually put into the flink/lib folder.

Cheers,
Aljoscha

P.S. In the future, please use the user mailing list for requests like
this. The dev mailing list is meant for discussions regarding the
development of Flink and adjacent projects.

On Tue, 5 Jul 2016 at 02:46 Deepak Jha  wrote:

> Hi All,
>
> We've flink (1.0.2) HA setup on AWS cloud and are using IAM roles to
> interact with S3 (S3a as suggested in flink best practices) and DynamoDB.
> While trying to interact with DynamoDB to perform key-value pair lookup
> from one of the operator we are running into the following issue.
>
> def putItem() = {
>   val id = 126639L
>   val item = new Item().withPrimaryKey("Id",
> "sfsaf12344").withLong("uniqueIdentifier", id)
>   table.putItem(item)
>
> }
>
> 2016-07-04 17:15:18,379 PDT [INFO]  ip-10-6-10-182
> [flink-akka.actor.default-dispatcher-29]
> o.a.f.runtime.jobmanager.JobManager - Status of job
> 3ec7e145208453b5dbcf6224f373018f (Topology) changed to FAILING.
> org.apache.flink.runtime.util.SerializedThrowable:
>
> com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest;
> at
>
> com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.doPutItem(PutItemImpl.java:82)
> at
>
> com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.putItem(PutItemImpl.java:41)
> at com.amazonaws.services.dynamodbv2.document.Table.putItem(Table.java:144)
> at
>
> com.mix.ingestion.url.dupedetection.DynamoDBIO$.putItem(DynamoDBHandler.scala:38)
> at
>
> com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKey(ABC.scala:143)
> at
>
> com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKeyAndUpdateDupeFlag(ABC.scala:135)
> at
>
> com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.updateDupeFlagAndTable(ABC.scala:96)
> at
>
> com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.detectDupe(ABC.scala:111)
> at
>
> com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$.detectDupe(ABC.scala:158)
> at
>
> com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70)
> at
>
> com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70)
> at
>
> org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485)
> at
>
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:168)
> at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Serialized representation of
> java.lang.NoSuchMethodError:
>
> com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest;
> ... 18 common frames omitted
>
> It works if I just run it in standalone fashion using "*java -cp
> fatjar.jar:/opt/flink/lib/*  a.b.c.d.DynamoDBHandler"* on the same ec2
> instance but I'm running into error when it tries to interact with DynamoDB
> from inside an operator.
> It fails even if I call the same *putItem* from inside the operator.
>
> We've aws-java-sdk-1.7.4.jar , hadoop-aws-2.7.2.jar in flink/lib folder.
> We're using fatjar to deploy the topology and it contains aws-java-sdk-s3
> and aws-java-sdk-dynamodb both 1.11.3 version. I also experimented with
> using aws-java-sdk in fatjar as well but it did not work. I looked into
> aws-java-sdk-1.7.4.jar and see that com/amazonaws/services/dynamodbv2
> exists.
>
>
>
> Please let me know what am I doing wrong. Any help will be appreciated.
>
> --
> Thanks,
> Deepak Jha
>


[jira] [Created] (FLINK-4159) Quickstart poms exclude unused dependencies

2016-07-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4159:
---

 Summary: Quickstart poms exclude unused dependencies
 Key: FLINK-4159
 URL: https://issues.apache.org/jira/browse/FLINK-4159
 Project: Flink
  Issue Type: Bug
  Components: Quickstarts
Affects Versions: 1.1.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
Priority: Minor
 Fix For: 1.1.0


The Quickstart poms exclude several dependencies from being packaged into the 
fat-jar, even though they aren't used by Flink according to `mvn 
dependency:tree`.

com.amazonaws:aws-java-sdk
com.twitter:chill-avro_*
com.twitter:chill-bijection_*
com.twitter:bijection-core_*
com.twitter:bijection-avro_*
de.javakaffee:kryo-serializers
org.apache.sling:org.apache.sling.commons.json



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4160) YARN session doesn't show input validation errors

2016-07-06 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4160:
-

 Summary: YARN session doesn't show input validation errors
 Key: FLINK-4160
 URL: https://issues.apache.org/jira/browse/FLINK-4160
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Affects Versions: 1.1.0
Reporter: Robert Metzger
Priority: Critical


Setting a jobmanager size below 768 mb causes this error:
{code}
~/flink/build-target$ ./bin/yarn-session.sh -n 5 -s 4 -jm 512
Error while starting the YARN Client. Please check log output!
{code}

The problem is that the logs don't contain any information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4161) Quickstarts can exclude more flink-dist dependencies

2016-07-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4161:
---

 Summary: Quickstarts can exclude more flink-dist dependencies
 Key: FLINK-4161
 URL: https://issues.apache.org/jira/browse/FLINK-4161
 Project: Flink
  Issue Type: Improvement
  Components: Quickstarts
Affects Versions: 1.1.0
Reporter: Chesnay Schepler
Priority: Trivial


The Quickstart poms exclude several dependencies that flink-dist contains from 
being packaged into the fat-jar.

However, the following flink-dist dependencies are not excluded:

{code}
org.apache.flink:flink-streaming-scala_2.10
org.apache.flink:flink-scala-shell_2.10
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Allowed Lateness in Flink

2016-07-06 Thread Aljoscha Krettek
Hi,
I cleaned up the document a bit and added sections to address comments on
the doc:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
(I
also marked proposed features that are already implemented as [done].)

The main thing that remains to be figured out is how we deal with purging,
i.e. whether the trigger can decide to purge a window or whether the
WindowOperator should do this and also what happens when window state is
garbage collected. The original proposal was to reduce the current set of
trigger results from (CONTINUE, FIRE, PURGE, FIRE_AND_PURGE) to (CONTINUE,
FIRE) and have a global flag in the WindowOperator that says whether firing
windows should be purged (DISCARDING) or kept for a bit, until the allowed
lateness expires (ACCUMULATING). Based on comments by Elias I added a
section that sketches an alternative where the triggers are in charge of
purging and also decide what should happen in case of window cleanup.

One thing we should also keep in mind is how we can make the windowing API
easy to use for people that don't need all the bells and whistles of custom
triggers, allowed lateness and so on. This is partially covered by the
proposal to add composite triggers but I feel we can go further there.

In the future, it might be good to to discussions directly on the ML and
then change the document accordingly. This way everyone can follow the
discussion on the ML. I also feel that Google Doc comments often don't give
enough space for expressing more complex opinions.

Cheers,
Aljoscha


On Mon, 30 May 2016 at 11:23 Aljoscha Krettek  wrote:

> Thanks for the feedback! :-) I already read the comments on the file.
>
> On Mon, 30 May 2016 at 11:10 Gyula Fóra  wrote:
>
>> Thanks Aljoscha :) I added some comments that might seem relevant from the
>> users point of view.
>>
>> Gyula
>>
>> Aljoscha Krettek  ezt írta (időpont: 2016. máj. 30.,
>> H, 10:33):
>>
>> > Hi,
>> > I created a new doc specifically about the interplay of lateness and
>> > window state garbage collection:
>> >
>> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
>> >
>> > There is still some stuff that needs to be figured out, both in the new
>> > doc and the existing doc. For example, we need to decide whether to make
>> > accumulating/discarding behavior global for a window operation or
>> > controllable by triggers. Initially, I suggested to make
>> > accumulating/discarding a global setting for the window operation
>> because
>> > we can get away with keeping less state if we know that we always
>> discard
>> > when firing. Please take a look at the new doc to see what I'm talking
>> > about there.
>> >
>> > Feedback very welcome!
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek 
>> wrote:
>> >
>> >> Hi Max,
>> >> thanks for the Feedback and suggestions! I'll try and address each
>> >> paragraph separately.
>> >>
>> >> I'm afraid deciding based on the "StreamTimeCharacteristic is not
>> >> possible since a user can use processing-time windows in their job even
>> >> though the set the characteristic to event-time. Enabling event time
>> does
>> >> not disable processing time, it just enables an additional feature.
>> (IMHO,
>> >> the handling of the StreamTimeCharacteristic is still somewhat
>> problematic.)
>> >>
>> >> Making the decision based purely on the class of the WindowAssigner is
>> >> also not possible since we don't know in advance which WindowAssigners
>> the
>> >> users will write and what time characteristic they will use.
>> >>
>> >> Regarding the third proposition. Removing 'System.currentTimeMillis()'
>> is
>> >> very desirable and part of my proposal. However, it is still meant as
>> being
>> >> separate from "event-time" since a Trigger/WindowAssigner might need
>> both.
>> >> For example, a Trigger might want to do early triggering a few
>> >> (processing-time) seconds after the first elements arrive and proper
>> >> triggering once the watermark for the end of the window arrives.
>> >>
>> >> These are good ideas but I'm afraid we still don't have a good
>> solution.
>> >> This whole processing time/event time business is just very tricky.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels 
>> wrote:
>> >>
>> >>> Hi Aljoscha,
>> >>>
>> >>> Thank you for the detailed design document.
>> >>>
>> >>> Wouldn't it be ok to allow these new concepts regardless of the time
>> >>> semantics? For Event Time and Ingestion Time "Lateness" and
>> >>> "Accumulating/Discarding" make sense. If the user chooses Processing
>> >>> time then these can be ignored during translation of the StreamGraph
>> >>> (possibly with a warning).
>> >>>
>> >>> Detecting when these concepts make sense should be possible by
>> >>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>> >>> the involved classes (e.g. SlidingProcessingTimeWindows) in t

Re: [DISCUSS] Allowed Lateness in Flink

2016-07-06 Thread Ufuk Celebi
On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek  wrote:
> In the future, it might be good to to discussions directly on the ML and
> then change the document accordingly. This way everyone can follow the
> discussion on the ML. I also feel that Google Doc comments often don't give
> enough space for expressing more complex opinions.

I agree! Would you mind raising this point as a separate discussion on dev@?


Re: [DISCUSS] Allowed Lateness in Flink

2016-07-06 Thread Aljoscha Krettek
I did:
https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e
 ;-)

On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi  wrote:

> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek 
> wrote:
> > In the future, it might be good to to discussions directly on the ML and
> > then change the document accordingly. This way everyone can follow the
> > discussion on the ML. I also feel that Google Doc comments often don't
> give
> > enough space for expressing more complex opinions.
>
> I agree! Would you mind raising this point as a separate discussion on dev@
> ?
>


[jira] [Created] (FLINK-4162) Event-Time CEP Job Fails after Restart

2016-07-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4162:
---

 Summary: Event-Time CEP Job Fails after Restart
 Key: FLINK-4162
 URL: https://issues.apache.org/jira/browse/FLINK-4162
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.0.3, 1.1.0
Reporter: Aljoscha Krettek
Priority: Blocker


With the fix for FLINK-4149 a restarting event-time CEP job fails with this 
exception:
{code}
java.lang.IllegalStateException: Detected an underflow in the pruning 
timestamp. This indicates that either the window length is too long (10) or 
that the timestamp has not been set correctly (e.g. Long.MIN_VALUE).
at org.apache.flink.cep.nfa.NFA.process(NFA.java:185)
at 
org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:48)
at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processWatermark(AbstractKeyedCEPPatternOperator.java:163)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:165)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] FLIP 1 - Flink Improvement Proposal

2016-07-06 Thread Ufuk Celebi
Hey Aljoscha,

thanks for this proposal. I've somehow missed it last week. I like the
idea very much and agree with your assessment about the problems with
the Google Doc approach.

Regarding the process: I'm also in favour of adopting it from Kafka. I
would not expect any problems with this, but we can post a quick note
to their ML.

@Matthias: The name works for me. ;-)

– Ufuk

On Tue, Jun 28, 2016 at 10:19 PM, Matthias J. Sax  wrote:
> FLIP ?? Really? :D
>
> http://www.maya.tv/en/character/flip
>
> -Matthias
>
>
> On 06/28/2016 06:26 PM, Aljoscha Krettek wrote:
>> I'm proposing to add a formal process for how we deal with (major)
>> improvements to Flink and design docs. This has been mentioned several
>> times recently but we never took any decisive action to actually implement
>> such a process so here we go.
>>
>> Right now, we have Jira issues and we sometimes we have design docs that we
>> keep in Google Docs. Jamie recently added links to those that he could find
>> on the mailing list to the Flink wiki:
>> https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home. The
>> problem with these is that a) the comments on the Google Docs are not
>> reflected in Jira and the mailing list. There has been some very active
>> discussion on some of the docs that most people would never notice. The
>> community therefore might seem less active than it actually is. b) the
>> documents are not very discoverable, if we had a clearly defined place
>> where we put them and also prominently link to this on the Flink homepage
>> this would greatly help people that try to find out about current
>> developments.
>>
>> Kafka has a process like this:
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.
>> They call it KIP, for Kafka Improvement Proposal. We could either adapt
>> this for Flink or come up with our own process. Doing the former would save
>> us a lot of time and I don't think the Kafka community would mind us
>> copying their process. The subject also hints at this, our process could be
>> called FLIP, for Flink Improvement Proposal.
>>
>> What do you think? Feedback is highly welcome. :-)
>>
>> Cheers,
>> Aljoscha
>>
>


Connecting Flink and Hive

2016-07-06 Thread Alan Gates
I’d like to work on creating a Flink Sink for Hive’s streaming ingest[1].   But 
I recall recently seeing a message on the dev list about moving some of the 
third party connectors out of Flink as devs were having problems maintaining 
them.  So, is this the sort of thing I should contribute to Flink or is the 
plan to house these types of things elsewhere?

Alan.

1. https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest

Re: [DISCUSS] Allowed Lateness in Flink

2016-07-06 Thread Vishnu Viswanath
Hi,

I was going through the suggested improvements in window, and I have
few questions/suggestion on improvement regarding the Evictor.

1) I am having a use case where I have to create a custom Evictor that will
evict elements from the window based on the value (e.g., if I have elements
are of case class Item(id: Int, type:String) then evict elements that has
type="a"). I believe this is not currently possible.
2) this is somewhat related to 1) where there should be an option to evict
elements from anywhere in the window. not only from the beginning of the
window. (e.g., apply the delta function to all elements and remove all
those don't pass. I checked the code and evict method just returns the
number of elements to be removed and processTriggerResult just skips those
many elements from the beginning.
3) Add an option to enables the user to decide if the eviction should
happen before the apply function or after the apply function. Currently it
is before the apply function, but I have a use case where I need to first
apply the function and evict afterward.

I am doing these for a POC so I think I can modify the flink code base to
make these changes and build, but I would appreciate any suggestion on
whether these are viable changes or will there any performance issue if
these are done. Also any pointer on where to start(e.g, do I create a new
class similar to EvictingWindowOperator that extends WindowOperator?)

Thanks and Regards,
Vishnu Viswanath,

On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek 
wrote:

> I did:
>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e
>  ;-)
>
> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi  wrote:
>
> > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek 
> > wrote:
> > > In the future, it might be good to to discussions directly on the ML
> and
> > > then change the document accordingly. This way everyone can follow the
> > > discussion on the ML. I also feel that Google Doc comments often don't
> > give
> > > enough space for expressing more complex opinions.
> >
> > I agree! Would you mind raising this point as a separate discussion on
> dev@
> > ?
> >
>


Re: [DISCUSS] Allowed Lateness in Flink

2016-07-06 Thread Aljoscha Krettek
@Vishnu Funny you should ask that because I have a design doc lying around.
I'll open a new mail thread to not hijack this one.

On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath 
wrote:

> Hi,
>
> I was going through the suggested improvements in window, and I have
> few questions/suggestion on improvement regarding the Evictor.
>
> 1) I am having a use case where I have to create a custom Evictor that will
> evict elements from the window based on the value (e.g., if I have elements
> are of case class Item(id: Int, type:String) then evict elements that has
> type="a"). I believe this is not currently possible.
> 2) this is somewhat related to 1) where there should be an option to evict
> elements from anywhere in the window. not only from the beginning of the
> window. (e.g., apply the delta function to all elements and remove all
> those don't pass. I checked the code and evict method just returns the
> number of elements to be removed and processTriggerResult just skips those
> many elements from the beginning.
> 3) Add an option to enables the user to decide if the eviction should
> happen before the apply function or after the apply function. Currently it
> is before the apply function, but I have a use case where I need to first
> apply the function and evict afterward.
>
> I am doing these for a POC so I think I can modify the flink code base to
> make these changes and build, but I would appreciate any suggestion on
> whether these are viable changes or will there any performance issue if
> these are done. Also any pointer on where to start(e.g, do I create a new
> class similar to EvictingWindowOperator that extends WindowOperator?)
>
> Thanks and Regards,
> Vishnu Viswanath,
>
> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek 
> wrote:
>
> > I did:
> >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e
> >  ;-)
> >
> > On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi  wrote:
> >
> > > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek 
> > > wrote:
> > > > In the future, it might be good to to discussions directly on the ML
> > and
> > > > then change the document accordingly. This way everyone can follow
> the
> > > > discussion on the ML. I also feel that Google Doc comments often
> don't
> > > give
> > > > enough space for expressing more complex opinions.
> > >
> > > I agree! Would you mind raising this point as a separate discussion on
> > dev@
> > > ?
> > >
> >
>


[DISCUSS] Enhance Window Evictor in Flink

2016-07-06 Thread Aljoscha Krettek
Hi,
as mentioned in the thread on improving the Windowing API I also have a
design doc just for improving WindowEvictors. I had this in my head for a
while but was hesitant to publish but since people are asking about this
now might be a good time to post it. Here's the doc:
https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing

Feedback/Suggestions are very welcome! Please let me know what you think.

@Vishnu: Are you interested in contributing a solution for this to the
Flink code base? I'd be very happy to work with you on this.

Cheers,
Aljoscha

P.S. I think it would be best to keep discussions to the ML because
comments on the doc will not be visible here for everyone.


Re: [DISCUSS] Enhance Window Evictor in Flink

2016-07-06 Thread Vishnu Viswanath
Hi Aljoscha,

Thanks. Yes the new interface seems to address points 1 and 2. of

*1) I am having a use case where I have to create a custom Evictor that
will evict elements from the window based on the value (e.g., if I have
elements are of case class Item(id: Int, type:String) then evict elements
that has type="a"). I believe this is not currently possible.*
*2) this is somewhat related to 1) where there should be an option to evict
elements from anywhere in the window. not only from the beginning of the
window. (e.g., apply the delta function to all elements and remove all
those don't pass. I checked the code and evict method just returns the
number of elements to be removed and processTriggerResult just skips those
many elements from the beginning.  *
*3) Add an option to enables the user to decide if the eviction should
happen before the apply function or after the apply function. Currently it
is before the apply function, but I have a use case where I need to first
apply the function and evict afterward.*

I would be interested in contributing to the code base. Please let me know
the steps.

Thanks and Regards,
Vishnu Viswanath

On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek 
wrote:

> Hi,
> as mentioned in the thread on improving the Windowing API I also have a
> design doc just for improving WindowEvictors. I had this in my head for a
> while but was hesitant to publish but since people are asking about this
> now might be a good time to post it. Here's the doc:
> https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing
>
> Feedback/Suggestions are very welcome! Please let me know what you think.
>
> @Vishnu: Are you interested in contributing a solution for this to the
> Flink code base? I'd be very happy to work with you on this.
>
> Cheers,
> Aljoscha
>
> P.S. I think it would be best to keep discussions to the ML because
> comments on the doc will not be visible here for everyone.
>


Re: [DISCUSS] Enhance Window Evictor in Flink

2016-07-06 Thread Maxim
The new API forces iteration through every element of the buffer even if a
single value to be evicted. What about implementing Iterator.remove()
method for elements? The API would look like:

public interface Evictor extends Serializable {

   /**
*  Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane. Use
Iterator.remove to evict.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
*/
   void evictBefore(Iterable elements, int size, EvictorContext ctx);

   /**
*  Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane. Use
Iterator.remove to evict.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
*/
   void evictAfter(Iterable elements, int size, EvictorContext ctx);
}

Such API allows to abort iteration at any point and evict elements in any
order.

Thanks,

Maxim.

On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks. Yes the new interface seems to address points 1 and 2. of
>
> *1) I am having a use case where I have to create a custom Evictor that
> will evict elements from the window based on the value (e.g., if I have
> elements are of case class Item(id: Int, type:String) then evict elements
> that has type="a"). I believe this is not currently possible.*
> *2) this is somewhat related to 1) where there should be an option to
evict
> elements from anywhere in the window. not only from the beginning of the
> window. (e.g., apply the delta function to all elements and remove all
> those don't pass. I checked the code and evict method just returns the
> number of elements to be removed and processTriggerResult just skips those
> many elements from the beginning.  *
> *3) Add an option to enables the user to decide if the eviction should
> happen before the apply function or after the apply function. Currently it
> is before the apply function, but I have a use case where I need to first
> apply the function and evict afterward.*
>
> I would be interested in contributing to the code base. Please let me know
> the steps.
>
> Thanks and Regards,
> Vishnu Viswanath
>
> On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek 
> wrote:
>
> > Hi,
> > as mentioned in the thread on improving the Windowing API I also have a
> > design doc just for improving WindowEvictors. I had this in my head for
a
> > while but was hesitant to publish but since people are asking about this
> > now might be a good time to post it. Here's the doc:
> >
https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing
> >
> > Feedback/Suggestions are very welcome! Please let me know what you
think.
> >
> > @Vishnu: Are you interested in contributing a solution for this to the
> > Flink code base? I'd be very happy to work with you on this.
> >
> > Cheers,
> > Aljoscha
> >
> > P.S. I think it would be best to keep discussions to the ML because
> > comments on the doc will not be visible here for everyone.
> >


Re: [DISCUSS] Enhance Window Evictor in Flink

2016-07-06 Thread Aljoscha Krettek
@Maxim: That's perfect I didn't think about using Iterator.remove() for
that. I'll update the doc. What do you think Vishnu? This should also cover
your before/after case nicely.

@Vishnu: The steps would be these:
 - Converge on a design in this discussion
 - Add a Jira issue here: https://issues.apache.org/jira/browse/FLINK
 - Work on the code an create a pull request on github

The steps are also outlined here
http://flink.apache.org/how-to-contribute.html and here
http://flink.apache.org/contribute-code.html.

-
Aljoscha

On Wed, 6 Jul 2016 at 19:45 Maxim  wrote:

> The new API forces iteration through every element of the buffer even if a
> single value to be evicted. What about implementing Iterator.remove()
> method for elements? The API would look like:
>
> public interface Evictor extends Serializable {
>
>/**
> *  Optionally evicts elements. Called before windowing function.
> *
> * @param elements The elements currently in the pane. Use
> Iterator.remove to evict.
> * @param size The current number of elements in the pane.
> * @param window The {@link Window}
> */
>void evictBefore(Iterable elements, int size, EvictorContext ctx);
>
>/**
> *  Optionally evicts elements. Called after windowing function.
> *
> * @param elements The elements currently in the pane. Use
> Iterator.remove to evict.
> * @param size The current number of elements in the pane.
> * @param window The {@link Window}
> */
>void evictAfter(Iterable elements, int size, EvictorContext ctx);
> }
>
> Such API allows to abort iteration at any point and evict elements in any
> order.
>
> Thanks,
>
> Maxim.
>
> On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
> >
> > Hi Aljoscha,
> >
> > Thanks. Yes the new interface seems to address points 1 and 2. of
> >
> > *1) I am having a use case where I have to create a custom Evictor that
> > will evict elements from the window based on the value (e.g., if I have
> > elements are of case class Item(id: Int, type:String) then evict elements
> > that has type="a"). I believe this is not currently possible.*
> > *2) this is somewhat related to 1) where there should be an option to
> evict
> > elements from anywhere in the window. not only from the beginning of the
> > window. (e.g., apply the delta function to all elements and remove all
> > those don't pass. I checked the code and evict method just returns the
> > number of elements to be removed and processTriggerResult just skips
> those
> > many elements from the beginning.  *
> > *3) Add an option to enables the user to decide if the eviction should
> > happen before the apply function or after the apply function. Currently
> it
> > is before the apply function, but I have a use case where I need to first
> > apply the function and evict afterward.*
> >
> > I would be interested in contributing to the code base. Please let me
> know
> > the steps.
> >
> > Thanks and Regards,
> > Vishnu Viswanath
> >
> > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek 
> > wrote:
> >
> > > Hi,
> > > as mentioned in the thread on improving the Windowing API I also have a
> > > design doc just for improving WindowEvictors. I had this in my head for
> a
> > > while but was hesitant to publish but since people are asking about
> this
> > > now might be a good time to post it. Here's the doc:
> > >
>
> https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing
> > >
> > > Feedback/Suggestions are very welcome! Please let me know what you
> think.
> > >
> > > @Vishnu: Are you interested in contributing a solution for this to the
> > > Flink code base? I'd be very happy to work with you on this.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > P.S. I think it would be best to keep discussions to the ML because
> > > comments on the doc will not be visible here for everyone.
> > >
>


Re: [DISCUSS] FLIP 1 - Flink Improvement Proposal

2016-07-06 Thread Stephan Ewen
Yes, big +1

I had actually talked about the same thing with some people as well.

I am currently sketching a few FLIPs for things, like improvements to the
Yarn/Mesos/Kubernetes integration


One thing we should do here is to actually structure the wiki a bit to make
it easier to find information and proposals.




On Wed, Jul 6, 2016 at 4:24 PM, Ufuk Celebi  wrote:

> Hey Aljoscha,
>
> thanks for this proposal. I've somehow missed it last week. I like the
> idea very much and agree with your assessment about the problems with
> the Google Doc approach.
>
> Regarding the process: I'm also in favour of adopting it from Kafka. I
> would not expect any problems with this, but we can post a quick note
> to their ML.
>
> @Matthias: The name works for me. ;-)
>
> – Ufuk
>
> On Tue, Jun 28, 2016 at 10:19 PM, Matthias J. Sax 
> wrote:
> > FLIP ?? Really? :D
> >
> > http://www.maya.tv/en/character/flip
> >
> > -Matthias
> >
> >
> > On 06/28/2016 06:26 PM, Aljoscha Krettek wrote:
> >> I'm proposing to add a formal process for how we deal with (major)
> >> improvements to Flink and design docs. This has been mentioned several
> >> times recently but we never took any decisive action to actually
> implement
> >> such a process so here we go.
> >>
> >> Right now, we have Jira issues and we sometimes we have design docs
> that we
> >> keep in Google Docs. Jamie recently added links to those that he could
> find
> >> on the mailing list to the Flink wiki:
> >> https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home.
> The
> >> problem with these is that a) the comments on the Google Docs are not
> >> reflected in Jira and the mailing list. There has been some very active
> >> discussion on some of the docs that most people would never notice. The
> >> community therefore might seem less active than it actually is. b) the
> >> documents are not very discoverable, if we had a clearly defined place
> >> where we put them and also prominently link to this on the Flink
> homepage
> >> this would greatly help people that try to find out about current
> >> developments.
> >>
> >> Kafka has a process like this:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> .
> >> They call it KIP, for Kafka Improvement Proposal. We could either adapt
> >> this for Flink or come up with our own process. Doing the former would
> save
> >> us a lot of time and I don't think the Kafka community would mind us
> >> copying their process. The subject also hints at this, our process
> could be
> >> called FLIP, for Flink Improvement Proposal.
> >>
> >> What do you think? Feedback is highly welcome. :-)
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >
>


Re: [DISCUSS] Enhance Window Evictor in Flink

2016-07-06 Thread Maxim
Actually for such evictor to be useful the window should be sorted by some
field, usually event time. What do you think about adding sorted window
abstraction?

On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek 
wrote:

> @Maxim: That's perfect I didn't think about using Iterator.remove() for
> that. I'll update the doc. What do you think Vishnu? This should also cover
> your before/after case nicely.
>
> @Vishnu: The steps would be these:
>  - Converge on a design in this discussion
>  - Add a Jira issue here: https://issues.apache.org/jira/browse/FLINK
>  - Work on the code an create a pull request on github
>
> The steps are also outlined here
> http://flink.apache.org/how-to-contribute.html and here
> http://flink.apache.org/contribute-code.html.
>
> -
> Aljoscha
>
> On Wed, 6 Jul 2016 at 19:45 Maxim  wrote:
>
> > The new API forces iteration through every element of the buffer even if
> a
> > single value to be evicted. What about implementing Iterator.remove()
> > method for elements? The API would look like:
> >
> > public interface Evictor extends Serializable {
> >
> >/**
> > *  Optionally evicts elements. Called before windowing function.
> > *
> > * @param elements The elements currently in the pane. Use
> > Iterator.remove to evict.
> > * @param size The current number of elements in the pane.
> > * @param window The {@link Window}
> > */
> >void evictBefore(Iterable elements, int size, EvictorContext ctx);
> >
> >/**
> > *  Optionally evicts elements. Called after windowing function.
> > *
> > * @param elements The elements currently in the pane. Use
> > Iterator.remove to evict.
> > * @param size The current number of elements in the pane.
> > * @param window The {@link Window}
> > */
> >void evictAfter(Iterable elements, int size, EvictorContext ctx);
> > }
> >
> > Such API allows to abort iteration at any point and evict elements in any
> > order.
> >
> > Thanks,
> >
> > Maxim.
> >
> > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath <
> > vishnu.viswanat...@gmail.com> wrote:
> > >
> > > Hi Aljoscha,
> > >
> > > Thanks. Yes the new interface seems to address points 1 and 2. of
> > >
> > > *1) I am having a use case where I have to create a custom Evictor that
> > > will evict elements from the window based on the value (e.g., if I have
> > > elements are of case class Item(id: Int, type:String) then evict
> elements
> > > that has type="a"). I believe this is not currently possible.*
> > > *2) this is somewhat related to 1) where there should be an option to
> > evict
> > > elements from anywhere in the window. not only from the beginning of
> the
> > > window. (e.g., apply the delta function to all elements and remove all
> > > those don't pass. I checked the code and evict method just returns the
> > > number of elements to be removed and processTriggerResult just skips
> > those
> > > many elements from the beginning.  *
> > > *3) Add an option to enables the user to decide if the eviction should
> > > happen before the apply function or after the apply function. Currently
> > it
> > > is before the apply function, but I have a use case where I need to
> first
> > > apply the function and evict afterward.*
> > >
> > > I would be interested in contributing to the code base. Please let me
> > know
> > > the steps.
> > >
> > > Thanks and Regards,
> > > Vishnu Viswanath
> > >
> > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek  >
> > > wrote:
> > >
> > > > Hi,
> > > > as mentioned in the thread on improving the Windowing API I also
> have a
> > > > design doc just for improving WindowEvictors. I had this in my head
> for
> > a
> > > > while but was hesitant to publish but since people are asking about
> > this
> > > > now might be a good time to post it. Here's the doc:
> > > >
> >
> >
> https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing
> > > >
> > > > Feedback/Suggestions are very welcome! Please let me know what you
> > think.
> > > >
> > > > @Vishnu: Are you interested in contributing a solution for this to
> the
> > > > Flink code base? I'd be very happy to work with you on this.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > P.S. I think it would be best to keep discussions to the ML because
> > > > comments on the doc will not be visible here for everyone.
> > > >
> >
>


Re: [DISCUSS] FLIP 1 - Flink Improvement Proposal

2016-07-06 Thread Aljoscha Krettek
Jip, that's why I referenced the Kafka process which is also in their wiki.

On Wed, 6 Jul 2016 at 21:01 Stephan Ewen  wrote:

> Yes, big +1
>
> I had actually talked about the same thing with some people as well.
>
> I am currently sketching a few FLIPs for things, like improvements to the
> Yarn/Mesos/Kubernetes integration
>
>
> One thing we should do here is to actually structure the wiki a bit to make
> it easier to find information and proposals.
>
>
>
>
> On Wed, Jul 6, 2016 at 4:24 PM, Ufuk Celebi  wrote:
>
> > Hey Aljoscha,
> >
> > thanks for this proposal. I've somehow missed it last week. I like the
> > idea very much and agree with your assessment about the problems with
> > the Google Doc approach.
> >
> > Regarding the process: I'm also in favour of adopting it from Kafka. I
> > would not expect any problems with this, but we can post a quick note
> > to their ML.
> >
> > @Matthias: The name works for me. ;-)
> >
> > – Ufuk
> >
> > On Tue, Jun 28, 2016 at 10:19 PM, Matthias J. Sax 
> > wrote:
> > > FLIP ?? Really? :D
> > >
> > > http://www.maya.tv/en/character/flip
> > >
> > > -Matthias
> > >
> > >
> > > On 06/28/2016 06:26 PM, Aljoscha Krettek wrote:
> > >> I'm proposing to add a formal process for how we deal with (major)
> > >> improvements to Flink and design docs. This has been mentioned several
> > >> times recently but we never took any decisive action to actually
> > implement
> > >> such a process so here we go.
> > >>
> > >> Right now, we have Jira issues and we sometimes we have design docs
> > that we
> > >> keep in Google Docs. Jamie recently added links to those that he could
> > find
> > >> on the mailing list to the Flink wiki:
> > >> https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home.
> > The
> > >> problem with these is that a) the comments on the Google Docs are not
> > >> reflected in Jira and the mailing list. There has been some very
> active
> > >> discussion on some of the docs that most people would never notice.
> The
> > >> community therefore might seem less active than it actually is. b) the
> > >> documents are not very discoverable, if we had a clearly defined place
> > >> where we put them and also prominently link to this on the Flink
> > homepage
> > >> this would greatly help people that try to find out about current
> > >> developments.
> > >>
> > >> Kafka has a process like this:
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > .
> > >> They call it KIP, for Kafka Improvement Proposal. We could either
> adapt
> > >> this for Flink or come up with our own process. Doing the former would
> > save
> > >> us a lot of time and I don't think the Kafka community would mind us
> > >> copying their process. The subject also hints at this, our process
> > could be
> > >> called FLIP, for Flink Improvement Proposal.
> > >>
> > >> What do you think? Feedback is highly welcome. :-)
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >
> >
>


Re: [DISCUSS] Enhance Window Evictor in Flink

2016-07-06 Thread Vishnu Viswanath
Thank you Maxim and Aljoscha.

Yes the beforeEvict and afterEvict should able address point 3.

I have one more use case in my mind (which I might have to do in the later
stages of POC).
What if the `evictAfter` should behave differently based on the window
function.

For example.
I have a window that got triggered and my evict function is being called
after the apply function. In such cases I should be able to decide on what
I should evict based on the window function.
e.g.,
let the window have elements of type `case class Item(id: String, type:
String)`  and let the types be `type1` and `type2`.
If window function is able to find a sequence : `type1 type2 type1`, then
evict all elements of the type type2.
or if the window function is able to find a sequence `type2 type2 type1`,
then evict all elements of type type1
else don't evict any elements.

Is this possible? or at least let the window function choose between two
Evictor functions -(one for success case and one failure case)

@Maxim:
regarding the sorted window, actually I wanted my elements to be sorted but
not for the eviction but while applying the window function (so thought
this could be done easily). But it would be good to have the window sorted
based on EventTime.


Thanks and Regards,
Vishnu Viswanath,




On Wed, Jul 6, 2016 at 3:55 PM, Maxim  wrote:

> Actually for such evictor to be useful the window should be sorted by some
> field, usually event time. What do you think about adding sorted window
> abstraction?
>
> On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek 
> wrote:
>
> > @Maxim: That's perfect I didn't think about using Iterator.remove() for
> > that. I'll update the doc. What do you think Vishnu? This should also
> cover
> > your before/after case nicely.
> >
> > @Vishnu: The steps would be these:
> >  - Converge on a design in this discussion
> >  - Add a Jira issue here: https://issues.apache.org/jira/browse/FLINK
> >  - Work on the code an create a pull request on github
> >
> > The steps are also outlined here
> > http://flink.apache.org/how-to-contribute.html and here
> > http://flink.apache.org/contribute-code.html.
> >
> > -
> > Aljoscha
> >
> > On Wed, 6 Jul 2016 at 19:45 Maxim  wrote:
> >
> > > The new API forces iteration through every element of the buffer even
> if
> > a
> > > single value to be evicted. What about implementing Iterator.remove()
> > > method for elements? The API would look like:
> > >
> > > public interface Evictor extends Serializable {
> > >
> > >/**
> > > *  Optionally evicts elements. Called before windowing function.
> > > *
> > > * @param elements The elements currently in the pane. Use
> > > Iterator.remove to evict.
> > > * @param size The current number of elements in the pane.
> > > * @param window The {@link Window}
> > > */
> > >void evictBefore(Iterable elements, int size, EvictorContext
> ctx);
> > >
> > >/**
> > > *  Optionally evicts elements. Called after windowing function.
> > > *
> > > * @param elements The elements currently in the pane. Use
> > > Iterator.remove to evict.
> > > * @param size The current number of elements in the pane.
> > > * @param window The {@link Window}
> > > */
> > >void evictAfter(Iterable elements, int size, EvictorContext ctx);
> > > }
> > >
> > > Such API allows to abort iteration at any point and evict elements in
> any
> > > order.
> > >
> > > Thanks,
> > >
> > > Maxim.
> > >
> > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath <
> > > vishnu.viswanat...@gmail.com> wrote:
> > > >
> > > > Hi Aljoscha,
> > > >
> > > > Thanks. Yes the new interface seems to address points 1 and 2. of
> > > >
> > > > *1) I am having a use case where I have to create a custom Evictor
> that
> > > > will evict elements from the window based on the value (e.g., if I
> have
> > > > elements are of case class Item(id: Int, type:String) then evict
> > elements
> > > > that has type="a"). I believe this is not currently possible.*
> > > > *2) this is somewhat related to 1) where there should be an option to
> > > evict
> > > > elements from anywhere in the window. not only from the beginning of
> > the
> > > > window. (e.g., apply the delta function to all elements and remove
> all
> > > > those don't pass. I checked the code and evict method just returns
> the
> > > > number of elements to be removed and processTriggerResult just skips
> > > those
> > > > many elements from the beginning.  *
> > > > *3) Add an option to enables the user to decide if the eviction
> should
> > > > happen before the apply function or after the apply function.
> Currently
> > > it
> > > > is before the apply function, but I have a use case where I need to
> > first
> > > > apply the function and evict afterward.*
> > > >
> > > > I would be interested in contributing to the code base. Please let me
> > > know
> > > > the steps.
> > > >
> > > > Thanks and Regards,
> > > > Vishnu Viswanath
> > > >
> > > > On Wed, Jul 6, 2016 at 11:4