[jira] [Created] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session
Stefan Richter created FLINK-4156: - Summary: Job with -m yarn-cluster registers TaskManagers to another running Yarn session Key: FLINK-4156 URL: https://issues.apache.org/jira/browse/FLINK-4156 Project: Flink Issue Type: Bug Components: Distributed Coordination Reporter: Stefan Richter When a job is started using cluster mode (-m yarn-cluster) and a Yarn session is running on the same cluster, the job accidentally registers it's worker tasks with the ongoing Yarn session. This happens because the same Zookeeper namespace is used. We should consider isolating Flink applications from another by using UUIDS, e.g. based on their application ids, in their Zookeeper paths. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Discussion] Query Regarding Operator chaining
Hi, unfortunately the reading of one Kafka partition cannot be split among several parallel instances of the Kafka source. So if you have only 2 partitions your reading parallelism is limited to that. You are right that this can lead to bad performance and underutilization. The only solution I see right now is to have more partitions in Kafka so that more readers can read in parallel. +Robert Adding Robert directly because he might have something more to say about this. Cheers, Aljoscha On Tue, 5 Jul 2016 at 15:48 Vinay Patil wrote: > Hi, > > The re-balance actually distributes it to all the task managers, and now > all TM's are getting utilized, You were right , I am seeing two > boxes(Tasks) now. > > I have one question regarding the task slots : > > For the source the parallelism is set to 56, now when we see on the UI and > click on source sub-task , I see 56 entries , out of which only two are > getting the data from Kafka (this may be because I have two kafka > partitions) > > The 56 entries that I am seeing for a sub-task on UI are the total task > slots of all TM's, right ? > > If yes, only two slots are getting utilized, how do I ensure enough task > slots are getting utilized at the source ? I have 7 task managers (8 cores > per TM), so if only 1 core each of two task manager is performing the > consume operation, wouldn't it hamper the performance. > > Even if two Task managers are utilized , all 16 slots should have been used > , right ? > > For the other sub-task, for all 56 entries I am seeing bytes received. > (this may be because of applying rebalance after the source) > > P.S: I am reading over million records from Kafka , so need to utilize > enough resources [Performance is the key here]. > > > Regards, > Vinay Patil > > On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil > wrote: > > > Thanks a lot guys, this helps to understand better > > > > Regards, > > Vinay Patil > > > > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen wrote: > > > >> Just to be sure: Each *subtask* has one thread - so for each task, there > >> are as many parallel threads (distributed across nodes) as your > >> parallelism > >> indicates. > >> > >> For most cases, having long chains and then a higher parallelism is a > good > >> choice. > >> Cases where individual functions (MapFunction, etc) do something very > CPU > >> intensive are cases where you may want to not chain them, so they get a > >> separate thread. > >> > >> If you see all tasks in one box in the UI, it probably means you have > only > >> "Filter" and "Map" as a function? In that case it is fine to have just > one > >> box (=Task) in the UI. The box still has parallelism via subtasks. > >> > >> If you insert a "rebalance()" between the Kafka Source and the > >> Map/Filter/etc it makes sure that the data distribution in the > >> Map/Filter/etc operators has best utilization independent of how the > data > >> was partitioned in Kafka. > >> You should then also see two boxes in the UI - one for the Kafka Source, > >> one for the actual processing. > >> > >> > >> > >> > >> > >> > >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek > >> wrote: > >> > >> > Hi, > >> > chaining is useful to minimize communication overhead. But in your > case > >> you > >> > might benefit more from having good cluster utilization. There seems > to > >> be > >> > a tradeoff. Maybe you can run some easy tests to see how it behaves > for > >> > you. > >> > > >> > Cheers, > >> > Aljoscha > >> > > >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil > >> wrote: > >> > > >> > > Thanks, > >> > > > >> > > so is operator chaining useful in terms of utilizing the resources > or > >> we > >> > > should keep the chaining to minimal use, say 3-4 operators and > disable > >> > > chaining ? > >> > > I am worried because I am seeing all the operators in one box on > flink > >> > UI. > >> > > > >> > > > >> > > Regards, > >> > > Vinay Patil > >> > > > >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek < > aljos...@apache.org > >> > > >> > > wrote: > >> > > > >> > > > Hi, > >> > > > this is true, yes. If the number of Kafka partitions is less than > >> the > >> > > > parallelism then some of the sources might not be utilized. If you > >> > > insert a > >> > > > rebalance after the sources you should be able to utilize all the > >> > > > downstream operations equally. > >> > > > > >> > > > Cheers, > >> > > > Aljoscha > >> > > > > >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil > >> > wrote: > >> > > > > >> > > > > Just an update, the task will be executed by multiple threads , > my > >> > bad > >> > > I > >> > > > > asked the wrong way. > >> > > > > Can you please clarify other things. > >> > > > > > >> > > > > Out of 8 node only 3 of them are getting utilized, reading the > >> data > >> > > from > >> > > > > Kafka , does it mean that the Kafka partitions are set to less > >> > number ? > >> > > > > > >> > > > > What if we use rescale or rebalance since it evenly distributes > , > >> > would > >> >
[jira] [Created] (FLINK-4157) FlinkKafkaMetrics cause TaskManager shutdown during cancellation
Robert Metzger created FLINK-4157: - Summary: FlinkKafkaMetrics cause TaskManager shutdown during cancellation Key: FLINK-4157 URL: https://issues.apache.org/jira/browse/FLINK-4157 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.0.3 Reporter: Robert Metzger Assignee: Robert Metzger Priority: Critical Fix For: 1.1.0 The following issue was reported by a user: {code} 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Sink: KafkaOutput (59/72) 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task - Sink: KafkaOutput (53/72) switched to CANCELED 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Sink: KafkaOutput (53/72) 2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy - java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) at java.util.HashMap$ValueIterator.next(HashMap.java:1458) at org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106) at org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211) at org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) at org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152) at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.HashMap.internalWriteEntries(HashMap.java:1777) at java.util.HashMap.writeObject(HashMap.java:1354) at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691) at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.util.SerializedValue.(SerializedValue.java:48) at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78) at org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1150) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:407) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265) at scala.runtime.AbstractPartialFunction$mcVL$sp.
[jira] [Created] (FLINK-4158) Scala QuickStart StreamingJob fails to compile
Chesnay Schepler created FLINK-4158: --- Summary: Scala QuickStart StreamingJob fails to compile Key: FLINK-4158 URL: https://issues.apache.org/jira/browse/FLINK-4158 Project: Flink Issue Type: Bug Components: Quickstarts Affects Versions: 1.1.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.1.0 The StreamingJob does not compile since it uses {code} import org.apache.flink.api.scala._ {code} instead of {code} import org.apache.flink.streaming.api.scala._ {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Issues while interacting with DynamoDB
Hi, are you running this on Yarn. If yes, the EMR Yarn installation might already have some of the AWS jars in the classpath and that might interact badly with the Jars that you manually put into the flink/lib folder. Cheers, Aljoscha P.S. In the future, please use the user mailing list for requests like this. The dev mailing list is meant for discussions regarding the development of Flink and adjacent projects. On Tue, 5 Jul 2016 at 02:46 Deepak Jha wrote: > Hi All, > > We've flink (1.0.2) HA setup on AWS cloud and are using IAM roles to > interact with S3 (S3a as suggested in flink best practices) and DynamoDB. > While trying to interact with DynamoDB to perform key-value pair lookup > from one of the operator we are running into the following issue. > > def putItem() = { > val id = 126639L > val item = new Item().withPrimaryKey("Id", > "sfsaf12344").withLong("uniqueIdentifier", id) > table.putItem(item) > > } > > 2016-07-04 17:15:18,379 PDT [INFO] ip-10-6-10-182 > [flink-akka.actor.default-dispatcher-29] > o.a.f.runtime.jobmanager.JobManager - Status of job > 3ec7e145208453b5dbcf6224f373018f (Topology) changed to FAILING. > org.apache.flink.runtime.util.SerializedThrowable: > > com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest; > at > > com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.doPutItem(PutItemImpl.java:82) > at > > com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.putItem(PutItemImpl.java:41) > at com.amazonaws.services.dynamodbv2.document.Table.putItem(Table.java:144) > at > > com.mix.ingestion.url.dupedetection.DynamoDBIO$.putItem(DynamoDBHandler.scala:38) > at > > com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKey(ABC.scala:143) > at > > com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKeyAndUpdateDupeFlag(ABC.scala:135) > at > > com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.updateDupeFlagAndTable(ABC.scala:96) > at > > com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.detectDupe(ABC.scala:111) > at > > com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$.detectDupe(ABC.scala:158) > at > > com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70) > at > > com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70) > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485) > at > > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:168) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Serialized representation of > java.lang.NoSuchMethodError: > > com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest; > ... 18 common frames omitted > > It works if I just run it in standalone fashion using "*java -cp > fatjar.jar:/opt/flink/lib/* a.b.c.d.DynamoDBHandler"* on the same ec2 > instance but I'm running into error when it tries to interact with DynamoDB > from inside an operator. > It fails even if I call the same *putItem* from inside the operator. > > We've aws-java-sdk-1.7.4.jar , hadoop-aws-2.7.2.jar in flink/lib folder. > We're using fatjar to deploy the topology and it contains aws-java-sdk-s3 > and aws-java-sdk-dynamodb both 1.11.3 version. I also experimented with > using aws-java-sdk in fatjar as well but it did not work. I looked into > aws-java-sdk-1.7.4.jar and see that com/amazonaws/services/dynamodbv2 > exists. > > > > Please let me know what am I doing wrong. Any help will be appreciated. > > -- > Thanks, > Deepak Jha >
[jira] [Created] (FLINK-4159) Quickstart poms exclude unused dependencies
Chesnay Schepler created FLINK-4159: --- Summary: Quickstart poms exclude unused dependencies Key: FLINK-4159 URL: https://issues.apache.org/jira/browse/FLINK-4159 Project: Flink Issue Type: Bug Components: Quickstarts Affects Versions: 1.1.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Priority: Minor Fix For: 1.1.0 The Quickstart poms exclude several dependencies from being packaged into the fat-jar, even though they aren't used by Flink according to `mvn dependency:tree`. com.amazonaws:aws-java-sdk com.twitter:chill-avro_* com.twitter:chill-bijection_* com.twitter:bijection-core_* com.twitter:bijection-avro_* de.javakaffee:kryo-serializers org.apache.sling:org.apache.sling.commons.json -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4160) YARN session doesn't show input validation errors
Robert Metzger created FLINK-4160: - Summary: YARN session doesn't show input validation errors Key: FLINK-4160 URL: https://issues.apache.org/jira/browse/FLINK-4160 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 1.1.0 Reporter: Robert Metzger Priority: Critical Setting a jobmanager size below 768 mb causes this error: {code} ~/flink/build-target$ ./bin/yarn-session.sh -n 5 -s 4 -jm 512 Error while starting the YARN Client. Please check log output! {code} The problem is that the logs don't contain any information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4161) Quickstarts can exclude more flink-dist dependencies
Chesnay Schepler created FLINK-4161: --- Summary: Quickstarts can exclude more flink-dist dependencies Key: FLINK-4161 URL: https://issues.apache.org/jira/browse/FLINK-4161 Project: Flink Issue Type: Improvement Components: Quickstarts Affects Versions: 1.1.0 Reporter: Chesnay Schepler Priority: Trivial The Quickstart poms exclude several dependencies that flink-dist contains from being packaged into the fat-jar. However, the following flink-dist dependencies are not excluded: {code} org.apache.flink:flink-streaming-scala_2.10 org.apache.flink:flink-scala-shell_2.10 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Allowed Lateness in Flink
Hi, I cleaned up the document a bit and added sections to address comments on the doc: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing (I also marked proposed features that are already implemented as [done].) The main thing that remains to be figured out is how we deal with purging, i.e. whether the trigger can decide to purge a window or whether the WindowOperator should do this and also what happens when window state is garbage collected. The original proposal was to reduce the current set of trigger results from (CONTINUE, FIRE, PURGE, FIRE_AND_PURGE) to (CONTINUE, FIRE) and have a global flag in the WindowOperator that says whether firing windows should be purged (DISCARDING) or kept for a bit, until the allowed lateness expires (ACCUMULATING). Based on comments by Elias I added a section that sketches an alternative where the triggers are in charge of purging and also decide what should happen in case of window cleanup. One thing we should also keep in mind is how we can make the windowing API easy to use for people that don't need all the bells and whistles of custom triggers, allowed lateness and so on. This is partially covered by the proposal to add composite triggers but I feel we can go further there. In the future, it might be good to to discussions directly on the ML and then change the document accordingly. This way everyone can follow the discussion on the ML. I also feel that Google Doc comments often don't give enough space for expressing more complex opinions. Cheers, Aljoscha On Mon, 30 May 2016 at 11:23 Aljoscha Krettek wrote: > Thanks for the feedback! :-) I already read the comments on the file. > > On Mon, 30 May 2016 at 11:10 Gyula Fóra wrote: > >> Thanks Aljoscha :) I added some comments that might seem relevant from the >> users point of view. >> >> Gyula >> >> Aljoscha Krettek ezt írta (időpont: 2016. máj. 30., >> H, 10:33): >> >> > Hi, >> > I created a new doc specifically about the interplay of lateness and >> > window state garbage collection: >> > >> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing >> > >> > There is still some stuff that needs to be figured out, both in the new >> > doc and the existing doc. For example, we need to decide whether to make >> > accumulating/discarding behavior global for a window operation or >> > controllable by triggers. Initially, I suggested to make >> > accumulating/discarding a global setting for the window operation >> because >> > we can get away with keeping less state if we know that we always >> discard >> > when firing. Please take a look at the new doc to see what I'm talking >> > about there. >> > >> > Feedback very welcome! >> > >> > Cheers, >> > Aljoscha >> > >> > On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek >> wrote: >> > >> >> Hi Max, >> >> thanks for the Feedback and suggestions! I'll try and address each >> >> paragraph separately. >> >> >> >> I'm afraid deciding based on the "StreamTimeCharacteristic is not >> >> possible since a user can use processing-time windows in their job even >> >> though the set the characteristic to event-time. Enabling event time >> does >> >> not disable processing time, it just enables an additional feature. >> (IMHO, >> >> the handling of the StreamTimeCharacteristic is still somewhat >> problematic.) >> >> >> >> Making the decision based purely on the class of the WindowAssigner is >> >> also not possible since we don't know in advance which WindowAssigners >> the >> >> users will write and what time characteristic they will use. >> >> >> >> Regarding the third proposition. Removing 'System.currentTimeMillis()' >> is >> >> very desirable and part of my proposal. However, it is still meant as >> being >> >> separate from "event-time" since a Trigger/WindowAssigner might need >> both. >> >> For example, a Trigger might want to do early triggering a few >> >> (processing-time) seconds after the first elements arrive and proper >> >> triggering once the watermark for the end of the window arrives. >> >> >> >> These are good ideas but I'm afraid we still don't have a good >> solution. >> >> This whole processing time/event time business is just very tricky. >> >> >> >> Cheers, >> >> Aljoscha >> >> >> >> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels >> wrote: >> >> >> >>> Hi Aljoscha, >> >>> >> >>> Thank you for the detailed design document. >> >>> >> >>> Wouldn't it be ok to allow these new concepts regardless of the time >> >>> semantics? For Event Time and Ingestion Time "Lateness" and >> >>> "Accumulating/Discarding" make sense. If the user chooses Processing >> >>> time then these can be ignored during translation of the StreamGraph >> >>> (possibly with a warning). >> >>> >> >>> Detecting when these concepts make sense should be possible by >> >>> checking the "Stream Charateristics" of the ExecutionEnvironment or >> >>> the involved classes (e.g. SlidingProcessingTimeWindows) in t
Re: [DISCUSS] Allowed Lateness in Flink
On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek wrote: > In the future, it might be good to to discussions directly on the ML and > then change the document accordingly. This way everyone can follow the > discussion on the ML. I also feel that Google Doc comments often don't give > enough space for expressing more complex opinions. I agree! Would you mind raising this point as a separate discussion on dev@?
Re: [DISCUSS] Allowed Lateness in Flink
I did: https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e ;-) On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi wrote: > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek > wrote: > > In the future, it might be good to to discussions directly on the ML and > > then change the document accordingly. This way everyone can follow the > > discussion on the ML. I also feel that Google Doc comments often don't > give > > enough space for expressing more complex opinions. > > I agree! Would you mind raising this point as a separate discussion on dev@ > ? >
[jira] [Created] (FLINK-4162) Event-Time CEP Job Fails after Restart
Aljoscha Krettek created FLINK-4162: --- Summary: Event-Time CEP Job Fails after Restart Key: FLINK-4162 URL: https://issues.apache.org/jira/browse/FLINK-4162 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.0.3, 1.1.0 Reporter: Aljoscha Krettek Priority: Blocker With the fix for FLINK-4149 a restarting event-time CEP job fails with this exception: {code} java.lang.IllegalStateException: Detected an underflow in the pruning timestamp. This indicates that either the window length is too long (10) or that the timestamp has not been set correctly (e.g. Long.MIN_VALUE). at org.apache.flink.cep.nfa.NFA.process(NFA.java:185) at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:48) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processWatermark(AbstractKeyedCEPPatternOperator.java:163) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:165) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] FLIP 1 - Flink Improvement Proposal
Hey Aljoscha, thanks for this proposal. I've somehow missed it last week. I like the idea very much and agree with your assessment about the problems with the Google Doc approach. Regarding the process: I'm also in favour of adopting it from Kafka. I would not expect any problems with this, but we can post a quick note to their ML. @Matthias: The name works for me. ;-) – Ufuk On Tue, Jun 28, 2016 at 10:19 PM, Matthias J. Sax wrote: > FLIP ?? Really? :D > > http://www.maya.tv/en/character/flip > > -Matthias > > > On 06/28/2016 06:26 PM, Aljoscha Krettek wrote: >> I'm proposing to add a formal process for how we deal with (major) >> improvements to Flink and design docs. This has been mentioned several >> times recently but we never took any decisive action to actually implement >> such a process so here we go. >> >> Right now, we have Jira issues and we sometimes we have design docs that we >> keep in Google Docs. Jamie recently added links to those that he could find >> on the mailing list to the Flink wiki: >> https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home. The >> problem with these is that a) the comments on the Google Docs are not >> reflected in Jira and the mailing list. There has been some very active >> discussion on some of the docs that most people would never notice. The >> community therefore might seem less active than it actually is. b) the >> documents are not very discoverable, if we had a clearly defined place >> where we put them and also prominently link to this on the Flink homepage >> this would greatly help people that try to find out about current >> developments. >> >> Kafka has a process like this: >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. >> They call it KIP, for Kafka Improvement Proposal. We could either adapt >> this for Flink or come up with our own process. Doing the former would save >> us a lot of time and I don't think the Kafka community would mind us >> copying their process. The subject also hints at this, our process could be >> called FLIP, for Flink Improvement Proposal. >> >> What do you think? Feedback is highly welcome. :-) >> >> Cheers, >> Aljoscha >> >
Connecting Flink and Hive
I’d like to work on creating a Flink Sink for Hive’s streaming ingest[1]. But I recall recently seeing a message on the dev list about moving some of the third party connectors out of Flink as devs were having problems maintaining them. So, is this the sort of thing I should contribute to Flink or is the plan to house these types of things elsewhere? Alan. 1. https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
Re: [DISCUSS] Allowed Lateness in Flink
Hi, I was going through the suggested improvements in window, and I have few questions/suggestion on improvement regarding the Evictor. 1) I am having a use case where I have to create a custom Evictor that will evict elements from the window based on the value (e.g., if I have elements are of case class Item(id: Int, type:String) then evict elements that has type="a"). I believe this is not currently possible. 2) this is somewhat related to 1) where there should be an option to evict elements from anywhere in the window. not only from the beginning of the window. (e.g., apply the delta function to all elements and remove all those don't pass. I checked the code and evict method just returns the number of elements to be removed and processTriggerResult just skips those many elements from the beginning. 3) Add an option to enables the user to decide if the eviction should happen before the apply function or after the apply function. Currently it is before the apply function, but I have a use case where I need to first apply the function and evict afterward. I am doing these for a POC so I think I can modify the flink code base to make these changes and build, but I would appreciate any suggestion on whether these are viable changes or will there any performance issue if these are done. Also any pointer on where to start(e.g, do I create a new class similar to EvictingWindowOperator that extends WindowOperator?) Thanks and Regards, Vishnu Viswanath, On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek wrote: > I did: > > https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e > ;-) > > On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi wrote: > > > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek > > wrote: > > > In the future, it might be good to to discussions directly on the ML > and > > > then change the document accordingly. This way everyone can follow the > > > discussion on the ML. I also feel that Google Doc comments often don't > > give > > > enough space for expressing more complex opinions. > > > > I agree! Would you mind raising this point as a separate discussion on > dev@ > > ? > > >
Re: [DISCUSS] Allowed Lateness in Flink
@Vishnu Funny you should ask that because I have a design doc lying around. I'll open a new mail thread to not hijack this one. On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath wrote: > Hi, > > I was going through the suggested improvements in window, and I have > few questions/suggestion on improvement regarding the Evictor. > > 1) I am having a use case where I have to create a custom Evictor that will > evict elements from the window based on the value (e.g., if I have elements > are of case class Item(id: Int, type:String) then evict elements that has > type="a"). I believe this is not currently possible. > 2) this is somewhat related to 1) where there should be an option to evict > elements from anywhere in the window. not only from the beginning of the > window. (e.g., apply the delta function to all elements and remove all > those don't pass. I checked the code and evict method just returns the > number of elements to be removed and processTriggerResult just skips those > many elements from the beginning. > 3) Add an option to enables the user to decide if the eviction should > happen before the apply function or after the apply function. Currently it > is before the apply function, but I have a use case where I need to first > apply the function and evict afterward. > > I am doing these for a POC so I think I can modify the flink code base to > make these changes and build, but I would appreciate any suggestion on > whether these are viable changes or will there any performance issue if > these are done. Also any pointer on where to start(e.g, do I create a new > class similar to EvictingWindowOperator that extends WindowOperator?) > > Thanks and Regards, > Vishnu Viswanath, > > On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek > wrote: > > > I did: > > > > > https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e > > ;-) > > > > On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi wrote: > > > > > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek > > > wrote: > > > > In the future, it might be good to to discussions directly on the ML > > and > > > > then change the document accordingly. This way everyone can follow > the > > > > discussion on the ML. I also feel that Google Doc comments often > don't > > > give > > > > enough space for expressing more complex opinions. > > > > > > I agree! Would you mind raising this point as a separate discussion on > > dev@ > > > ? > > > > > >
[DISCUSS] Enhance Window Evictor in Flink
Hi, as mentioned in the thread on improving the Windowing API I also have a design doc just for improving WindowEvictors. I had this in my head for a while but was hesitant to publish but since people are asking about this now might be a good time to post it. Here's the doc: https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing Feedback/Suggestions are very welcome! Please let me know what you think. @Vishnu: Are you interested in contributing a solution for this to the Flink code base? I'd be very happy to work with you on this. Cheers, Aljoscha P.S. I think it would be best to keep discussions to the ML because comments on the doc will not be visible here for everyone.
Re: [DISCUSS] Enhance Window Evictor in Flink
Hi Aljoscha, Thanks. Yes the new interface seems to address points 1 and 2. of *1) I am having a use case where I have to create a custom Evictor that will evict elements from the window based on the value (e.g., if I have elements are of case class Item(id: Int, type:String) then evict elements that has type="a"). I believe this is not currently possible.* *2) this is somewhat related to 1) where there should be an option to evict elements from anywhere in the window. not only from the beginning of the window. (e.g., apply the delta function to all elements and remove all those don't pass. I checked the code and evict method just returns the number of elements to be removed and processTriggerResult just skips those many elements from the beginning. * *3) Add an option to enables the user to decide if the eviction should happen before the apply function or after the apply function. Currently it is before the apply function, but I have a use case where I need to first apply the function and evict afterward.* I would be interested in contributing to the code base. Please let me know the steps. Thanks and Regards, Vishnu Viswanath On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek wrote: > Hi, > as mentioned in the thread on improving the Windowing API I also have a > design doc just for improving WindowEvictors. I had this in my head for a > while but was hesitant to publish but since people are asking about this > now might be a good time to post it. Here's the doc: > https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing > > Feedback/Suggestions are very welcome! Please let me know what you think. > > @Vishnu: Are you interested in contributing a solution for this to the > Flink code base? I'd be very happy to work with you on this. > > Cheers, > Aljoscha > > P.S. I think it would be best to keep discussions to the ML because > comments on the doc will not be visible here for everyone. >
Re: [DISCUSS] Enhance Window Evictor in Flink
The new API forces iteration through every element of the buffer even if a single value to be evicted. What about implementing Iterator.remove() method for elements? The API would look like: public interface Evictor extends Serializable { /** * Optionally evicts elements. Called before windowing function. * * @param elements The elements currently in the pane. Use Iterator.remove to evict. * @param size The current number of elements in the pane. * @param window The {@link Window} */ void evictBefore(Iterable elements, int size, EvictorContext ctx); /** * Optionally evicts elements. Called after windowing function. * * @param elements The elements currently in the pane. Use Iterator.remove to evict. * @param size The current number of elements in the pane. * @param window The {@link Window} */ void evictAfter(Iterable elements, int size, EvictorContext ctx); } Such API allows to abort iteration at any point and evict elements in any order. Thanks, Maxim. On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath < vishnu.viswanat...@gmail.com> wrote: > > Hi Aljoscha, > > Thanks. Yes the new interface seems to address points 1 and 2. of > > *1) I am having a use case where I have to create a custom Evictor that > will evict elements from the window based on the value (e.g., if I have > elements are of case class Item(id: Int, type:String) then evict elements > that has type="a"). I believe this is not currently possible.* > *2) this is somewhat related to 1) where there should be an option to evict > elements from anywhere in the window. not only from the beginning of the > window. (e.g., apply the delta function to all elements and remove all > those don't pass. I checked the code and evict method just returns the > number of elements to be removed and processTriggerResult just skips those > many elements from the beginning. * > *3) Add an option to enables the user to decide if the eviction should > happen before the apply function or after the apply function. Currently it > is before the apply function, but I have a use case where I need to first > apply the function and evict afterward.* > > I would be interested in contributing to the code base. Please let me know > the steps. > > Thanks and Regards, > Vishnu Viswanath > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek > wrote: > > > Hi, > > as mentioned in the thread on improving the Windowing API I also have a > > design doc just for improving WindowEvictors. I had this in my head for a > > while but was hesitant to publish but since people are asking about this > > now might be a good time to post it. Here's the doc: > > https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing > > > > Feedback/Suggestions are very welcome! Please let me know what you think. > > > > @Vishnu: Are you interested in contributing a solution for this to the > > Flink code base? I'd be very happy to work with you on this. > > > > Cheers, > > Aljoscha > > > > P.S. I think it would be best to keep discussions to the ML because > > comments on the doc will not be visible here for everyone. > >
Re: [DISCUSS] Enhance Window Evictor in Flink
@Maxim: That's perfect I didn't think about using Iterator.remove() for that. I'll update the doc. What do you think Vishnu? This should also cover your before/after case nicely. @Vishnu: The steps would be these: - Converge on a design in this discussion - Add a Jira issue here: https://issues.apache.org/jira/browse/FLINK - Work on the code an create a pull request on github The steps are also outlined here http://flink.apache.org/how-to-contribute.html and here http://flink.apache.org/contribute-code.html. - Aljoscha On Wed, 6 Jul 2016 at 19:45 Maxim wrote: > The new API forces iteration through every element of the buffer even if a > single value to be evicted. What about implementing Iterator.remove() > method for elements? The API would look like: > > public interface Evictor extends Serializable { > >/** > * Optionally evicts elements. Called before windowing function. > * > * @param elements The elements currently in the pane. Use > Iterator.remove to evict. > * @param size The current number of elements in the pane. > * @param window The {@link Window} > */ >void evictBefore(Iterable elements, int size, EvictorContext ctx); > >/** > * Optionally evicts elements. Called after windowing function. > * > * @param elements The elements currently in the pane. Use > Iterator.remove to evict. > * @param size The current number of elements in the pane. > * @param window The {@link Window} > */ >void evictAfter(Iterable elements, int size, EvictorContext ctx); > } > > Such API allows to abort iteration at any point and evict elements in any > order. > > Thanks, > > Maxim. > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath < > vishnu.viswanat...@gmail.com> wrote: > > > > Hi Aljoscha, > > > > Thanks. Yes the new interface seems to address points 1 and 2. of > > > > *1) I am having a use case where I have to create a custom Evictor that > > will evict elements from the window based on the value (e.g., if I have > > elements are of case class Item(id: Int, type:String) then evict elements > > that has type="a"). I believe this is not currently possible.* > > *2) this is somewhat related to 1) where there should be an option to > evict > > elements from anywhere in the window. not only from the beginning of the > > window. (e.g., apply the delta function to all elements and remove all > > those don't pass. I checked the code and evict method just returns the > > number of elements to be removed and processTriggerResult just skips > those > > many elements from the beginning. * > > *3) Add an option to enables the user to decide if the eviction should > > happen before the apply function or after the apply function. Currently > it > > is before the apply function, but I have a use case where I need to first > > apply the function and evict afterward.* > > > > I would be interested in contributing to the code base. Please let me > know > > the steps. > > > > Thanks and Regards, > > Vishnu Viswanath > > > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek > > wrote: > > > > > Hi, > > > as mentioned in the thread on improving the Windowing API I also have a > > > design doc just for improving WindowEvictors. I had this in my head for > a > > > while but was hesitant to publish but since people are asking about > this > > > now might be a good time to post it. Here's the doc: > > > > > https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing > > > > > > Feedback/Suggestions are very welcome! Please let me know what you > think. > > > > > > @Vishnu: Are you interested in contributing a solution for this to the > > > Flink code base? I'd be very happy to work with you on this. > > > > > > Cheers, > > > Aljoscha > > > > > > P.S. I think it would be best to keep discussions to the ML because > > > comments on the doc will not be visible here for everyone. > > > >
Re: [DISCUSS] FLIP 1 - Flink Improvement Proposal
Yes, big +1 I had actually talked about the same thing with some people as well. I am currently sketching a few FLIPs for things, like improvements to the Yarn/Mesos/Kubernetes integration One thing we should do here is to actually structure the wiki a bit to make it easier to find information and proposals. On Wed, Jul 6, 2016 at 4:24 PM, Ufuk Celebi wrote: > Hey Aljoscha, > > thanks for this proposal. I've somehow missed it last week. I like the > idea very much and agree with your assessment about the problems with > the Google Doc approach. > > Regarding the process: I'm also in favour of adopting it from Kafka. I > would not expect any problems with this, but we can post a quick note > to their ML. > > @Matthias: The name works for me. ;-) > > – Ufuk > > On Tue, Jun 28, 2016 at 10:19 PM, Matthias J. Sax > wrote: > > FLIP ?? Really? :D > > > > http://www.maya.tv/en/character/flip > > > > -Matthias > > > > > > On 06/28/2016 06:26 PM, Aljoscha Krettek wrote: > >> I'm proposing to add a formal process for how we deal with (major) > >> improvements to Flink and design docs. This has been mentioned several > >> times recently but we never took any decisive action to actually > implement > >> such a process so here we go. > >> > >> Right now, we have Jira issues and we sometimes we have design docs > that we > >> keep in Google Docs. Jamie recently added links to those that he could > find > >> on the mailing list to the Flink wiki: > >> https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home. > The > >> problem with these is that a) the comments on the Google Docs are not > >> reflected in Jira and the mailing list. There has been some very active > >> discussion on some of the docs that most people would never notice. The > >> community therefore might seem less active than it actually is. b) the > >> documents are not very discoverable, if we had a clearly defined place > >> where we put them and also prominently link to this on the Flink > homepage > >> this would greatly help people that try to find out about current > >> developments. > >> > >> Kafka has a process like this: > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > . > >> They call it KIP, for Kafka Improvement Proposal. We could either adapt > >> this for Flink or come up with our own process. Doing the former would > save > >> us a lot of time and I don't think the Kafka community would mind us > >> copying their process. The subject also hints at this, our process > could be > >> called FLIP, for Flink Improvement Proposal. > >> > >> What do you think? Feedback is highly welcome. :-) > >> > >> Cheers, > >> Aljoscha > >> > > >
Re: [DISCUSS] Enhance Window Evictor in Flink
Actually for such evictor to be useful the window should be sorted by some field, usually event time. What do you think about adding sorted window abstraction? On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek wrote: > @Maxim: That's perfect I didn't think about using Iterator.remove() for > that. I'll update the doc. What do you think Vishnu? This should also cover > your before/after case nicely. > > @Vishnu: The steps would be these: > - Converge on a design in this discussion > - Add a Jira issue here: https://issues.apache.org/jira/browse/FLINK > - Work on the code an create a pull request on github > > The steps are also outlined here > http://flink.apache.org/how-to-contribute.html and here > http://flink.apache.org/contribute-code.html. > > - > Aljoscha > > On Wed, 6 Jul 2016 at 19:45 Maxim wrote: > > > The new API forces iteration through every element of the buffer even if > a > > single value to be evicted. What about implementing Iterator.remove() > > method for elements? The API would look like: > > > > public interface Evictor extends Serializable { > > > >/** > > * Optionally evicts elements. Called before windowing function. > > * > > * @param elements The elements currently in the pane. Use > > Iterator.remove to evict. > > * @param size The current number of elements in the pane. > > * @param window The {@link Window} > > */ > >void evictBefore(Iterable elements, int size, EvictorContext ctx); > > > >/** > > * Optionally evicts elements. Called after windowing function. > > * > > * @param elements The elements currently in the pane. Use > > Iterator.remove to evict. > > * @param size The current number of elements in the pane. > > * @param window The {@link Window} > > */ > >void evictAfter(Iterable elements, int size, EvictorContext ctx); > > } > > > > Such API allows to abort iteration at any point and evict elements in any > > order. > > > > Thanks, > > > > Maxim. > > > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath < > > vishnu.viswanat...@gmail.com> wrote: > > > > > > Hi Aljoscha, > > > > > > Thanks. Yes the new interface seems to address points 1 and 2. of > > > > > > *1) I am having a use case where I have to create a custom Evictor that > > > will evict elements from the window based on the value (e.g., if I have > > > elements are of case class Item(id: Int, type:String) then evict > elements > > > that has type="a"). I believe this is not currently possible.* > > > *2) this is somewhat related to 1) where there should be an option to > > evict > > > elements from anywhere in the window. not only from the beginning of > the > > > window. (e.g., apply the delta function to all elements and remove all > > > those don't pass. I checked the code and evict method just returns the > > > number of elements to be removed and processTriggerResult just skips > > those > > > many elements from the beginning. * > > > *3) Add an option to enables the user to decide if the eviction should > > > happen before the apply function or after the apply function. Currently > > it > > > is before the apply function, but I have a use case where I need to > first > > > apply the function and evict afterward.* > > > > > > I would be interested in contributing to the code base. Please let me > > know > > > the steps. > > > > > > Thanks and Regards, > > > Vishnu Viswanath > > > > > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha Krettek > > > > wrote: > > > > > > > Hi, > > > > as mentioned in the thread on improving the Windowing API I also > have a > > > > design doc just for improving WindowEvictors. I had this in my head > for > > a > > > > while but was hesitant to publish but since people are asking about > > this > > > > now might be a good time to post it. Here's the doc: > > > > > > > > > https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit?usp=sharing > > > > > > > > Feedback/Suggestions are very welcome! Please let me know what you > > think. > > > > > > > > @Vishnu: Are you interested in contributing a solution for this to > the > > > > Flink code base? I'd be very happy to work with you on this. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > P.S. I think it would be best to keep discussions to the ML because > > > > comments on the doc will not be visible here for everyone. > > > > > > >
Re: [DISCUSS] FLIP 1 - Flink Improvement Proposal
Jip, that's why I referenced the Kafka process which is also in their wiki. On Wed, 6 Jul 2016 at 21:01 Stephan Ewen wrote: > Yes, big +1 > > I had actually talked about the same thing with some people as well. > > I am currently sketching a few FLIPs for things, like improvements to the > Yarn/Mesos/Kubernetes integration > > > One thing we should do here is to actually structure the wiki a bit to make > it easier to find information and proposals. > > > > > On Wed, Jul 6, 2016 at 4:24 PM, Ufuk Celebi wrote: > > > Hey Aljoscha, > > > > thanks for this proposal. I've somehow missed it last week. I like the > > idea very much and agree with your assessment about the problems with > > the Google Doc approach. > > > > Regarding the process: I'm also in favour of adopting it from Kafka. I > > would not expect any problems with this, but we can post a quick note > > to their ML. > > > > @Matthias: The name works for me. ;-) > > > > – Ufuk > > > > On Tue, Jun 28, 2016 at 10:19 PM, Matthias J. Sax > > wrote: > > > FLIP ?? Really? :D > > > > > > http://www.maya.tv/en/character/flip > > > > > > -Matthias > > > > > > > > > On 06/28/2016 06:26 PM, Aljoscha Krettek wrote: > > >> I'm proposing to add a formal process for how we deal with (major) > > >> improvements to Flink and design docs. This has been mentioned several > > >> times recently but we never took any decisive action to actually > > implement > > >> such a process so here we go. > > >> > > >> Right now, we have Jira issues and we sometimes we have design docs > > that we > > >> keep in Google Docs. Jamie recently added links to those that he could > > find > > >> on the mailing list to the Flink wiki: > > >> https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home. > > The > > >> problem with these is that a) the comments on the Google Docs are not > > >> reflected in Jira and the mailing list. There has been some very > active > > >> discussion on some of the docs that most people would never notice. > The > > >> community therefore might seem less active than it actually is. b) the > > >> documents are not very discoverable, if we had a clearly defined place > > >> where we put them and also prominently link to this on the Flink > > homepage > > >> this would greatly help people that try to find out about current > > >> developments. > > >> > > >> Kafka has a process like this: > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > > . > > >> They call it KIP, for Kafka Improvement Proposal. We could either > adapt > > >> this for Flink or come up with our own process. Doing the former would > > save > > >> us a lot of time and I don't think the Kafka community would mind us > > >> copying their process. The subject also hints at this, our process > > could be > > >> called FLIP, for Flink Improvement Proposal. > > >> > > >> What do you think? Feedback is highly welcome. :-) > > >> > > >> Cheers, > > >> Aljoscha > > >> > > > > > >
Re: [DISCUSS] Enhance Window Evictor in Flink
Thank you Maxim and Aljoscha. Yes the beforeEvict and afterEvict should able address point 3. I have one more use case in my mind (which I might have to do in the later stages of POC). What if the `evictAfter` should behave differently based on the window function. For example. I have a window that got triggered and my evict function is being called after the apply function. In such cases I should be able to decide on what I should evict based on the window function. e.g., let the window have elements of type `case class Item(id: String, type: String)` and let the types be `type1` and `type2`. If window function is able to find a sequence : `type1 type2 type1`, then evict all elements of the type type2. or if the window function is able to find a sequence `type2 type2 type1`, then evict all elements of type type1 else don't evict any elements. Is this possible? or at least let the window function choose between two Evictor functions -(one for success case and one failure case) @Maxim: regarding the sorted window, actually I wanted my elements to be sorted but not for the eviction but while applying the window function (so thought this could be done easily). But it would be good to have the window sorted based on EventTime. Thanks and Regards, Vishnu Viswanath, On Wed, Jul 6, 2016 at 3:55 PM, Maxim wrote: > Actually for such evictor to be useful the window should be sorted by some > field, usually event time. What do you think about adding sorted window > abstraction? > > On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek > wrote: > > > @Maxim: That's perfect I didn't think about using Iterator.remove() for > > that. I'll update the doc. What do you think Vishnu? This should also > cover > > your before/after case nicely. > > > > @Vishnu: The steps would be these: > > - Converge on a design in this discussion > > - Add a Jira issue here: https://issues.apache.org/jira/browse/FLINK > > - Work on the code an create a pull request on github > > > > The steps are also outlined here > > http://flink.apache.org/how-to-contribute.html and here > > http://flink.apache.org/contribute-code.html. > > > > - > > Aljoscha > > > > On Wed, 6 Jul 2016 at 19:45 Maxim wrote: > > > > > The new API forces iteration through every element of the buffer even > if > > a > > > single value to be evicted. What about implementing Iterator.remove() > > > method for elements? The API would look like: > > > > > > public interface Evictor extends Serializable { > > > > > >/** > > > * Optionally evicts elements. Called before windowing function. > > > * > > > * @param elements The elements currently in the pane. Use > > > Iterator.remove to evict. > > > * @param size The current number of elements in the pane. > > > * @param window The {@link Window} > > > */ > > >void evictBefore(Iterable elements, int size, EvictorContext > ctx); > > > > > >/** > > > * Optionally evicts elements. Called after windowing function. > > > * > > > * @param elements The elements currently in the pane. Use > > > Iterator.remove to evict. > > > * @param size The current number of elements in the pane. > > > * @param window The {@link Window} > > > */ > > >void evictAfter(Iterable elements, int size, EvictorContext ctx); > > > } > > > > > > Such API allows to abort iteration at any point and evict elements in > any > > > order. > > > > > > Thanks, > > > > > > Maxim. > > > > > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu Viswanath < > > > vishnu.viswanat...@gmail.com> wrote: > > > > > > > > Hi Aljoscha, > > > > > > > > Thanks. Yes the new interface seems to address points 1 and 2. of > > > > > > > > *1) I am having a use case where I have to create a custom Evictor > that > > > > will evict elements from the window based on the value (e.g., if I > have > > > > elements are of case class Item(id: Int, type:String) then evict > > elements > > > > that has type="a"). I believe this is not currently possible.* > > > > *2) this is somewhat related to 1) where there should be an option to > > > evict > > > > elements from anywhere in the window. not only from the beginning of > > the > > > > window. (e.g., apply the delta function to all elements and remove > all > > > > those don't pass. I checked the code and evict method just returns > the > > > > number of elements to be removed and processTriggerResult just skips > > > those > > > > many elements from the beginning. * > > > > *3) Add an option to enables the user to decide if the eviction > should > > > > happen before the apply function or after the apply function. > Currently > > > it > > > > is before the apply function, but I have a use case where I need to > > first > > > > apply the function and evict afterward.* > > > > > > > > I would be interested in contributing to the code base. Please let me > > > know > > > > the steps. > > > > > > > > Thanks and Regards, > > > > Vishnu Viswanath > > > > > > > > On Wed, Jul 6, 2016 at 11:4