Flink optimizer optimizations
Hi When i look for what kind of optimizations flink does, i found https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals is it up to date? Also i couldnt understand: "Reusing of partitionings and sort orders across operators. If one operator leaves the data in partitioned fashion (and or sorted order), the next operator will automatically try and reuse these characteristics. The planning for this is done holistically and can cause earlier operators to pick more expensive algorithms, if they allow for better reusing of sort-order and partitioning." Can you give example for "earlier operators to pick more expensive algorithms" ? Regards
Re: Flink optimizer optimizations
Himmm i understand now. Thank you guys:) On Apr 16, 2016 2:21 PM, "Matthias J. Sax" wrote: > Sure. WITHOUT. > > Thanks. Good catch :) > > On 04/16/2016 01:18 PM, Ufuk Celebi wrote: > > On Sat, Apr 16, 2016 at 1:05 PM, Matthias J. Sax > wrote: > >> (with the need to sort the data, because both > >> datasets will be sorted on A already). Thus, the overhead of sorting in > >> the group might pay of in the join. > > > > I think you meant to write withOUT the need to the sort the data, right? > > > >
Data locality and scheduler
Hi, I look at some scheduler documentations but could not find answer to my question. My question is: suppose that i have a big file on 40 node hadoop cluster and since it is a big file every node has at least one chunk of the file. If i write a flink job and want to filter file and if job has parelelism of 4(less that 40 actually) how datalocality is working? Does some tasks read some chunks from remote nodes? Or scheduler schedule tasks in way that keeping max paralelism at 4 but schedule tasks on every node? Regards
Re: Data locality and scheduler
Hi But isnt this behaviour can cause a lot of network activity? Is there any roadmap or plan to change this behaviour? On Apr 26, 2016 7:06 PM, "Fabian Hueske" wrote: > Hi, > > Flink starts four tasks and then lazily assigns input splits to these tasks > with locality preference. So each task may consume more than one split. > This is different from Hadoop MapReduce or Spark which schedule a new task > for each input split. > In your case, the four tasks would be scheduled to four of the 40 machines > and most of the splits will be remotely read. > > Best, Fabian > > > 2016-04-26 16:59 GMT+02:00 CPC : > > > Hi, > > > > I look at some scheduler documentations but could not find answer to my > > question. My question is: suppose that i have a big file on 40 node > hadoop > > cluster and since it is a big file every node has at least one chunk of > the > > file. If i write a flink job and want to filter file and if job has > > parelelism of 4(less that 40 actually) how datalocality is working? Does > > some tasks read some chunks from remote nodes? Or scheduler schedule > tasks > > in way that keeping max paralelism at 4 but schedule tasks on every node? > > > > Regards > > >
Dataset split/demultiplex
Hi folks, Is there any way in dataset api to split Dataset[A] to Dataset[A] and Dataset[B] ? Use case belongs to a custom filter component that we want to implement. We will want to direct input elements whose result is false after we apply the predicate. Actually we want to direct input elements that throw exception to another output as well(demultiplexer like component). Thank you in advance...
Re: Dataset split/demultiplex
Hi Gabor, Yes functionally this helps. But in this case i am processing an element twice and sending whole data to two different operator . What i am trying to achieve is like datastream split like functionality or a little bit more: In filter like scenario i want to do below pseudo operation: def function(iter: Iterator[URLOutputData], trueEvents: >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], errEvents: >> Collector[URLOutputData]) { > > iter.foreach { > > i => > > try { > > if (predicate(i)) > > trueEvents.collect(i) > > else > > falseEvents.collect(i) > > } catch { > > case _ => errEvents.collect(i) > > } > > } > > } > > Another case could be,suppose i have an input set of web events comes from different web apps and i want to split dataset based on application category Thanks, On 12 May 2016 at 17:28, Gábor Gévay wrote: > Hello, > > You can split a DataSet into two DataSets with two filters: > > val xs: DataSet[A] = ... > val split1: DataSet[A] = xs.filter(f1) > val split2: DataSet[A] = xs.filter(f2) > > where f1 and f2 are true for those elements that should go into the > first and second DataSets respectively. So far, the splits will just > contain elements from the input DataSet, but you can of course apply > some map after one of the filters. > > Does this help? > > Best, > Gábor > > > > 2016-05-12 16:03 GMT+02:00 CPC : > > Hi folks, > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > Dataset[B] ? Use case belongs to a custom filter component that we want > to > > implement. We will want to direct input elements whose result is false > > after we apply the predicate. Actually we want to direct input elements > > that throw exception to another output as well(demultiplexer like > > component). > > > > Thank you in advance... >
Re: Dataset split/demultiplex
Hi, if it just require implementing a custom operator(i mean does not require changes to network stack or other engine level changes) i can try to implement it since i am working on optimizer and plan generation for a month. Also we are going to implement our etl framework on flink and this kind of scenario is a good fit and a common requirement in etl like flows. If you can help me which parts of the project I should look for , i can try it. Thanks On May 12, 2016 6:54 PM, "Aljoscha Krettek" wrote: > Hi, > I agree that this would be very nice. Unfortunately Flink does only allow > one output from an operation right now. Maybe we can extends this somehow > in the future. > > Cheers, > Aljoscha > > On Thu, 12 May 2016 at 17:27 CPC wrote: > > > Hi Gabor, > > > > Yes functionally this helps. But in this case i am processing an element > > twice and sending whole data to two different operator . What i am > trying > > to achieve is like datastream split like functionality or a little bit > > more: > > In filter like scenario i want to do below pseudo operation: > > > > def function(iter: Iterator[URLOutputData], trueEvents: > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], > > errEvents: > > >> Collector[URLOutputData]) { > > > > > > iter.foreach { > > > > > > i => > > > > > > try { > > > > > > if (predicate(i)) > > > > > > trueEvents.collect(i) > > > > > > else > > > > > > falseEvents.collect(i) > > > > > > } catch { > > > > > > case _ => errEvents.collect(i) > > > > > > } > > > > > > } > > > > > > } > > > > > > > > Another case could be,suppose i have an input set of web events comes > from > > different web apps and i want to split dataset based on application > > category > > > > Thanks, > > > > > > On 12 May 2016 at 17:28, Gábor Gévay wrote: > > > > > Hello, > > > > > > You can split a DataSet into two DataSets with two filters: > > > > > > val xs: DataSet[A] = ... > > > val split1: DataSet[A] = xs.filter(f1) > > > val split2: DataSet[A] = xs.filter(f2) > > > > > > where f1 and f2 are true for those elements that should go into the > > > first and second DataSets respectively. So far, the splits will just > > > contain elements from the input DataSet, but you can of course apply > > > some map after one of the filters. > > > > > > Does this help? > > > > > > Best, > > > Gábor > > > > > > > > > > > > 2016-05-12 16:03 GMT+02:00 CPC : > > > > Hi folks, > > > > > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > > > Dataset[B] ? Use case belongs to a custom filter component that we > want > > > to > > > > implement. We will want to direct input elements whose result is > false > > > > after we apply the predicate. Actually we want to direct input > elements > > > > that throw exception to another output as well(demultiplexer like > > > > component). > > > > > > > > Thank you in advance... > > > > > >
DataStream split/select behaviour
Hello everyone, When i use DataStream split/select,it always send all selected records to same taskmanager. Is there any reason for this behaviour? Also is it possible to implement same split/select behaviour for DataSet api(without using a different filter for every output )? I found this https://issues.apache.org/jira/browse/FLINK-87 issue but it is still open... Thanks...
Re: DataStream split/select behaviour
Sorry i think i misunderstand the issue. But it seams DataStream partition the data by some field and when i select that field only one taskmanager processing the data. I can achieve same result when i use filter.Below is the code piece: import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.scala._ case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle: String, curTitle: String, ttype: String) object StreamingSelect { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //val rootPath = "gs://cpcflink/wikistream/" val stream: DataStream[String] = env.createInput(new TextInputFormat(new Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv"))) val wikiStream = stream.map { line => val values = line.split("\t") WikiData( if (values(0).isEmpty) None else Some(Integer.parseInt(values(0))), Integer.parseInt(values(1)), Integer.parseInt(values(2)), values(3), values(4), if (values.length < 6) null else values(5) ) } val split = wikiStream .split(i => if (i.curID == 14533) List("14533") else List.empty) val stream14533 = split.select("14533").map(i => (i.curID, i.num)) stream14533.writeAsCsv("/home/capacman/Data/wiki/14533") env.execute() } } On 7 June 2016 at 19:26, CPC wrote: > Hello everyone, > > When i use DataStream split/select,it always send all selected records to > same taskmanager. Is there any reason for this behaviour? Also is it > possible to implement same split/select behaviour for DataSet api(without > using a different filter for every output )? I found this > https://issues.apache.org/jira/browse/FLINK-87 issue but it is still > open... > > Thanks... >
Re: incremental Checkpointing , Rocksdb HA
Cassandra backend would be interesting especially if flink could benefit from cassandra data locality. Cassandra/spark integration is using this for information to schedule spark tasks. On 9 June 2016 at 19:55, Nick Dimiduk wrote: > You might also consider support for a Bigtable > backend: HBase/Accumulo/Cassandra. The data model should be similar > (identical?) to RocksDB and you get HA, recoverability, and support for > really large state "for free". > > On Thursday, June 9, 2016, Chen Qin wrote: > > > Hi there, > > > > What is progress on incremental checkpointing? Does flink dev has plan to > > work on this or JIRA to track this? super interested to know. > > > > I also research and consider use rocksdbstatebackend without running HDFS > > cluster nor talk to S3. Some primitive idea is to use ZK to store / > notify > > state propagation progress and propagate via implement chain replication > on > > top of YARN provisioned storage node. > > > > Thanks, > > Chen > > >
DagConnection tempMode vs breakPipeline property
Hi everybody, I am trying to understand flink optimizer in DataSet api. But i dont understant tempMode and breakPipeline properties. TempMode enum also has a breakPipeline property but they both accessed and set in different places. DagConnection.breakPipeline is used to select network io DataExchangeMode. Though i dont understand DagConnection.TempMode and also tempMode in Channel. Could somebody explain why there is two different breakPipeline property and the duty of TempMode. Thank you in advance.
offheap memory allocation and memory leak bug
Hi, I am making some test about offheap memory usage and encounter an odd behavior. My taskmanager heap limit is 12288 Mb and when i set "taskmanager.memory.off-hep:true" for every job it allocates 11673 Mb off heap area at most which is heapsize*0.95(value of taskmanager.memory.fraction). But when i submit second job it allocated another 11GB and does not free memory since MaxDirectMemorySize set to -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" which is TM_MAX_OFFHEAP_SIZE="8388607T" and my laptop goes to swap then kernel oom killed taskmanager. If i hit perform gc from visualvm between jobs then it release direct memory but memory usage of taskmanager in ps command is still around 20GB(RSS) and 27GB(virtual size) in that case i could submit my test job a few times without oom killed task manager but after 10 submit it killed taskmanager again. I dont understand why jvm memory usage is still high even if all direct memory released. Do you have any idea? Then i set MaxDirectMemorySize to 12 GB in this case it freed direct memory without any explicit gc triggering from visualvm but jvm process memory usage was still high around 20GB(RSS) and 27GB(virtual size). After again maybe 10 submit it killed taskmanager. I think this is a bug and make it imposible to reuse taskmanagers without restarting them in standalone mode. [image: Inline images 1] [image: Inline images 2]
Re: offheap memory allocation and memory leak bug
Hello, I repeated the same test with conf values. > taskmanager.heap.mb: 6500 > > taskmanager.memory.off-heap: true > > taskmanager.memory.fraction: 0.9 > > i set TM_MAX_OFFHEAP_SIZE="6G" in taskmanager sh. Taskmanager started with > capacman 14543 323 56.0 17014744 13731328 pts/1 Sl 16:23 35:25 > /home/capacman/programlama/java/jdk1.7.0_75/bin/java > -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -Xms650M -Xmx650M > -XX:MaxDirectMemorySize=6G -XX:MaxPermSize=256m > -Dlog.file=/home/capacman/Data/programlama/flink-1.0.3/log/flink-capacman-taskmanager-0-capacman-Aspire-V3-771.log > -Dlog4j.configuration=file:/home/capacman/Data/programlama/flink-1.0.3/conf/log4j.properties > -Dlogback.configurationFile=file:/home/capacman/Data/programlama/flink-1.0.3/conf/logback.xml > -classpath > /home/capacman/Data/programlama/flink-1.0.3/lib/flink-dist_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/flink-python_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/log4j-1.2.17.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/slf4j-log4j12-1.7.7.jar::: > org.apache.flink.runtime.taskmanager.TaskManager --configDir > /home/capacman/Data/programlama/flink-1.0.3/conf > but memory usage reach up to 13Gb. Could somebodey explain me why memory usage is so high? I expect it to be at most 8GB with some jvm internal overhead. [image: Inline images 1] [image: Inline images 2] On 17 June 2016 at 20:26, CPC wrote: > Hi, > > I am making some test about offheap memory usage and encounter an odd > behavior. My taskmanager heap limit is 12288 Mb and when i set > "taskmanager.memory.off-hep:true" for every job it allocates 11673 Mb off > heap area at most which is heapsize*0.95(value of > taskmanager.memory.fraction). But when i submit second job it allocated > another 11GB and does not free memory since MaxDirectMemorySize set to > -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" which is > TM_MAX_OFFHEAP_SIZE="8388607T" and my laptop goes to swap then kernel oom > killed taskmanager. If i hit perform gc from visualvm between jobs then it > release direct memory but memory usage of taskmanager in ps command is > still around 20GB(RSS) and 27GB(virtual size) in that case i could submit > my test job a few times without oom killed task manager but after 10 submit > it killed taskmanager again. I dont understand why jvm memory usage is > still high even if all direct memory released. Do you have any idea? Then > i set MaxDirectMemorySize to 12 GB in this case it freed direct memory > without any explicit gc triggering from visualvm but jvm process memory > usage was still high around 20GB(RSS) and 27GB(virtual size). After again > maybe 10 submit it killed taskmanager. I think this is a bug and make it > imposible to reuse taskmanagers without restarting them in standalone mode. > > [image: Inline images 1] > > [image: Inline images 2] >
Re: offheap memory allocation and memory leak bug
Hi, I think i found some information regarding this behavior. In jvm it is almost imposible to free allocated memory via ByteBuffer.allocateDirect. There is no explicit way to say jvm "free this direct bytebuffer". In some forums they said you can free memory with below method: > def releaseBuffers(buffers:List[ByteBuffer]):List[ByteBuffer] = { > > if(!buffers.isEmpty){ > > val cleanerMethod = buffers.head.getClass.getMethod("cleaner") > > cleanerMethod.setAccessible(true) > > buffers.foreach{buffer=> > > val cleaner = cleanerMethod.invoke(buffer) > > val cleanMethod = cleaner.getClass().getMethod("clean") > > cleanMethod.setAccessible(true) > > cleanMethod.invoke(cleaner) > > } > > } > > List.empty[ByteBuffer] > > } > > but since cleaner method is an internal method ,above is not recommended and not working in every jvm and java 9 does not support it also. I also made some tests with above method and behavior is not predictable. If memory allocated by some other thread and that thread exit then it release memory. Actually GC controls directMemory buffers. If there is no gc activity and memory is allocated and then dereferenced by different threads memory usage goes beyond intended and machine goes to swap then os kills taskmanager. In my tests i saw that behaviour: Suppose that thread A allocated 8gb memory exit and there is no reference to allocated memory than thread B allocated 8gb memory exit and there is no reference to allocated memory when i look at direct memory usage from jvisualvm it looks like below(-Xmx512m -XX:MaxDirectMemorySize=12G) [image: Inline images 1] but RSS of the process is 16 GB. If i call System.gc at that point RSS drops to 8GB but not to expected point. This is why Apache cassandra guys select sun.misc.Unsafe( http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Off-heap-caching-through-ByteBuffer-allocateDirect-when-JNA-not-available-td6977711.html ). I think currently only way to limit memory usage in flink if you want to use same taskmanager across jobs is via "taskmanager.memory.preallocate: true". Since it allocate memory at the beginning and not freed its memory usage stays constant. PS: Sorry for my english i am not a native speaker. I hope i can explain what i intended to :) On 18 June 2016 at 16:36, CPC wrote: > Hello, > > I repeated the same test with conf values. > >> taskmanager.heap.mb: 6500 >> >> taskmanager.memory.off-heap: true >> >> taskmanager.memory.fraction: 0.9 >> >> > i set TM_MAX_OFFHEAP_SIZE="6G" in taskmanager sh. Taskmanager started with > >> capacman 14543 323 56.0 17014744 13731328 pts/1 Sl 16:23 35:25 >> /home/capacman/programlama/java/jdk1.7.0_75/bin/java >> -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -Xms650M -Xmx650M >> -XX:MaxDirectMemorySize=6G -XX:MaxPermSize=256m >> -Dlog.file=/home/capacman/Data/programlama/flink-1.0.3/log/flink-capacman-taskmanager-0-capacman-Aspire-V3-771.log >> -Dlog4j.configuration=file:/home/capacman/Data/programlama/flink-1.0.3/conf/log4j.properties >> -Dlogback.configurationFile=file:/home/capacman/Data/programlama/flink-1.0.3/conf/logback.xml >> -classpath >> /home/capacman/Data/programlama/flink-1.0.3/lib/flink-dist_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/flink-python_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/log4j-1.2.17.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/slf4j-log4j12-1.7.7.jar::: >> org.apache.flink.runtime.taskmanager.TaskManager --configDir >> /home/capacman/Data/programlama/flink-1.0.3/conf >> > > but memory usage reach up to 13Gb. Could somebodey explain me why memory > usage is so high? I expect it to be at most 8GB with some jvm internal > overhead. > > [image: Inline images 1] > > [image: Inline images 2] > > On 17 June 2016 at 20:26, CPC wrote: > >> Hi, >> >> I am making some test about offheap memory usage and encounter an odd >> behavior. My taskmanager heap limit is 12288 Mb and when i set >> "taskmanager.memory.off-hep:true" for every job it allocates 11673 Mb off >> heap area at most which is heapsize*0.95(value of >> taskmanager.memory.fraction). But when i submit second job it allocated >> another 11GB and does not free memory since MaxDirectMemorySize set to >> -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" which is >> TM_MAX_OFFHEAP_SIZE="8388607T" and my laptop goes to swap then kernel oom >> killed taskmanager. If i hit perform gc from visualvm between jobs then it >> release direct memory but memory usage o
Re: offheap memory allocation and memory leak bug
Hi Till, I saw jira issue. Do you want me to upload input dataset as well? If you want i can prepare a github repo if it would be more easier. On Jun 20, 2016 1:10 PM, "Till Rohrmann" wrote: > Hi, > > your observation sounds like a bug to me and we have to further investigate > it. I assume that you’re running a batch job, right? Could you maybe share > your complete configuration and the job to reproduce the problem with us? > > I think that your investigation that direct buffers are not properly freed > and garbage collected can be right. I will open a JIRA issue to further > investigate and solve the problem. Thanks for reporting :-) > > At the moment, one way to solve this problem is, as you’ve already stated, > to set taskmanager.memory.preallocate: true in your configuration. For > batch jobs, this should actually improve the runtime performance at the > cost of a slightly longer start-up time for your TaskManagers. > > Cheers, > Till > > > On Sun, Jun 19, 2016 at 6:16 PM, CPC wrote: > > > Hi, > > > > I think i found some information regarding this behavior. In jvm it is > > almost imposible to free allocated memory via ByteBuffer.allocateDirect. > > There is no explicit way to say jvm "free this direct bytebuffer". In > some > > forums they said you can free memory with below method: > > > >> def releaseBuffers(buffers:List[ByteBuffer]):List[ByteBuffer] = { > >> > >> if(!buffers.isEmpty){ > >> > >> val cleanerMethod = buffers.head.getClass.getMethod("cleaner") > >> > >> cleanerMethod.setAccessible(true) > >> > >> buffers.foreach{buffer=> > >> > >> val cleaner = cleanerMethod.invoke(buffer) > >> > >> val cleanMethod = cleaner.getClass().getMethod("clean") > >> > >> cleanMethod.setAccessible(true) > >> > >> cleanMethod.invoke(cleaner) > >> > >> } > >> > >> } > >> > >> List.empty[ByteBuffer] > >> > >> } > >> > >> > > but since cleaner method is an internal method ,above is not recommended > > and not working in every jvm and java 9 does not support it also. I also > > made some tests with above method and behavior is not predictable. If > > memory allocated by some other thread and that thread exit then it > release > > memory. Actually GC controls directMemory buffers. If there is no gc > > activity and memory is allocated and then dereferenced by different > threads > > memory usage goes beyond intended and machine goes to swap then os kills > > taskmanager. In my tests i saw that behaviour: > > > > Suppose that thread A allocated 8gb memory exit and there is no reference > > to allocated memory > > than thread B allocated 8gb memory exit and there is no reference to > > allocated memory > > > > when i look at direct memory usage from jvisualvm it looks like > > below(-Xmx512m -XX:MaxDirectMemorySize=12G) > > > > [image: Inline images 1] > > > > but RSS of the process is 16 GB. If i call System.gc at that point RSS > > drops to 8GB but not to expected point. > > > > This is why Apache cassandra guys select sun.misc.Unsafe( > > > http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Off-heap-caching-through-ByteBuffer-allocateDirect-when-JNA-not-available-td6977711.html > > ). > > > > I think currently only way to limit memory usage in flink if you want to > > use same taskmanager across jobs is via "taskmanager.memory.preallocate: > > true". Since it allocate memory at the beginning and not freed its memory > > usage stays constant. > > > > PS: Sorry for my english i am not a native speaker. I hope i can explain > > what i intended to :) > > > > > > > > On 18 June 2016 at 16:36, CPC wrote: > > > >> Hello, > >> > >> I repeated the same test with conf values. > >> > >>> taskmanager.heap.mb: 6500 > >>> > >>> taskmanager.memory.off-heap: true > >>> > >>> taskmanager.memory.fraction: 0.9 > >>> > >>> > >> i set TM_MAX_OFFHEAP_SIZE="6G" in taskmanager sh. Taskmanager started > >> with > >> > >>> capacman 14543 323 56.0 17014744 13731328 pts/1 Sl 16:23 35:25 > >>> /home/capacman/programlama/java/jdk1.7.0_75/bin/java > >>> -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
Re: DagConnection tempMode vs breakPipeline property
Thank you ufuk. Yes it helps alot. Yardımın için teşekkürler. On Jul 4, 2016 2:02 PM, "Ufuk Celebi" wrote: > The data exchange mode has been introduced recently as a replacement > for the pipeline break logic, which was buggy. I'm not too familiar > with the optimizer, but I would expect everything that goes back to > the DataExchangeMode to be correct. The rest should be an artifact of > the old pipeline breaker logic and not be used any more. Does this > help in any way? > > On Thu, Jun 16, 2016 at 4:46 PM, CPC wrote: > > Hi everybody, > > > > I am trying to understand flink optimizer in DataSet api. But i dont > > understant tempMode and breakPipeline properties. TempMode enum also has > a > > breakPipeline property but they both accessed and set in different > places. > > DagConnection.breakPipeline is used to select network io > DataExchangeMode. > > Though i dont understand DagConnection.TempMode and also tempMode in > > Channel. Could somebody explain why there is two different breakPipeline > > property and the duty of TempMode. Thank you in advance. >
release 1.1 rc2 confusion
Hi, On flink downloads page it says that the latest version is 1.1 and have links for 1.1 binaries but hasnt rc2 voted recently? Are those binaries 1.1 or rc2?
Re: [Discuss] FLIP-13 Side Outputs in Flink
Is it just related to stream api? This feature could be really useful for etl scenarios with dataset api as well. On Oct 26, 2016 22:29, "Fabian Hueske" wrote: > Hi Chen, > > thanks for this interesting proposal. I think side output would be a very > valuable feature to have! > > I went of the FLIP and have a few questions. > > - Will multiple side outputs of the same type be supported? > - If I got it right, the FLIP proposes to change the signatures of many > user-defined functions (FlatMapFunction, WindowFunction, ...). Most of > these interfaces/classes are annotated with @Public, which means we cannot > change them in the Flink 1.x release line. What would be alternatives? I > can think of a) casting the Collector into a RichCollector (as you do in > your prototype) or b) retrieve the RichCollector from the RuntimeContext > that a RichFunction provides. > > I'm not so familiar with the internals of the DataStream API, so I leave > comments on that to other. > > Best, Fabian > > 2016-10-25 18:00 GMT+02:00 Chen Qin : > > > Hey folks, > > > > Please give feedback on FLIP-13! > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > 13+Side+Outputs+in+Flink > > JIRA task link to google doc > > https://issues.apache.org/jira/browse/FLINK-4460 > > > > Thanks, > > Chen Qin > > >
Dataset and eager scheduling
Hi all, Currently our team trying implement a runtime operator also playing with scheduler. We are trying to understand batch optimizer but it will take some time. What we want to know is whether changing batch scheduling mode from LAZY_FROM_SOURCES to EAGER could affect optimizer? I mean whether optimizer have some strong assumptions that batch jobs scheduling mode is always lazy_from_sources? Thanks...
Re: Dataset and eager scheduling
Hi Till, Thank you. On 2 March 2017 at 18:13, Till Rohrmann wrote: > Hi CPC, > > I think that the optimizer does not take the scheduling mode into account > when optimizing a Flink job. > > Cheers, > Till > > On Thu, Mar 2, 2017 at 11:52 AM, CPC wrote: > > > Hi all, > > > > Currently our team trying implement a runtime operator also playing with > > scheduler. We are trying to understand batch optimizer but it will take > > some time. What we want to know is whether changing batch scheduling mode > > from LAZY_FROM_SOURCES to EAGER could affect optimizer? I mean whether > > optimizer have some strong assumptions that batch jobs scheduling mode is > > always lazy_from_sources? > > > > Thanks... > > >
Dataset and select/split functionality
Hi all, We will try to implement select/split functionality for batch api. We looked at streaming side and understand how it works but since streaming side does not include an optimizer it was easier. Since adding such a runtime operator will affect optimizer layer as well, is there a part that you want us to pay particular attention to? Thanks...
Re: Dataset and select/split functionality
Hi Fabian, Thank you for your explanation. Also can you give an example on how the optimizer behaves on the assumption that the outputs of a function are replicated? Thank you... On 3 March 2017 at 13:52, Fabian Hueske wrote: > Hi CPC, > > we had several requests in the past to add this features. However, adding > select/split for DataSet is much! more work than you would expect. > As you pointed out, we have to go through the optimizer, which assumes that > the outputs of a function are replicated. > This is pretty much wired in and you would have to touch a lot of code. > > I'm sorry, but am not comfortable doing such a big change. > IMO, the potential gains are not worth the effort of implementation and > verification and the risk of breaking something. > > Best, Fabian > > > > 2017-03-02 16:31 GMT+01:00 CPC : > > > Hi all, > > > > We will try to implement select/split functionality for batch api. We > > looked at streaming side and understand how it works but since streaming > > side does not include an optimizer it was easier. Since adding such a > > runtime operator will affect optimizer layer as well, is there a part > that > > you want us to pay particular attention to? > > > > Thanks... > > >