No, there is no size or cardinality estimation happening at the moment. Best, Fabian
2018-02-19 21:56 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>: > Thanks , is there a metric or other way to know how much space each > task/job is taking? Does execution plan has these details? > > Thanks > > On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> that's a difficult question without knowing the details of your job. >> A NoSpaceLeftOnDevice error occurs when a file system is full. >> >> This can happen if: >> - A Flink algorithm writes to disk, e.g., an external sort or the hash >> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct, >> or any other operation that requires to group or join data. Filters will >> never spill to disk. >> - An OutputFormat writes to disk. >> >> The data is written to a temp directory, that can be configured in the >> ./conf/flink-conf.yaml file. >> >> Did you check how the tasks are distributed across the task managers? >> The web UI can help to diagnose such problems. >> >> Best, Fabian >> >> 2018-02-19 11:22 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>: >> >>> Thanks Fabian for such detailed explanation. >>> >>> I am using a datset in between so i guess csv is read once. Now to my >>> real issue i have 6 task managers each having 4 cores and i have 2 slots >>> per task manager. >>> >>> Now my csv file is jus 1 gb and i create table and transform to dataset >>> and then run 15 different filters and extra processing which all run in >>> almost parallel. >>> >>> However it fails with error no space left on device on one of the task >>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why >>> it is running out of space. I do use some joins with othrr tables but those >>> are few megabytes. >>> >>> So i was assuming that somehow all parallel executions were storing data >>> in /tmp and were filling it. >>> >>> So i would like to know wht could be filling space. >>> >>> Thanks >>> >>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fhue...@gmail.com> wrote: >>> >>> Hi, >>> >>> this works as follows. >>> >>> - Table API and SQL queries are translated into regular DataSet jobs >>> (assuming you are running in a batch ExecutionEnvironment). >>> - A query is translated into a sequence of DataSet operators when you 1) >>> transform the Table into a DataSet or 2) write it to a TableSink. In both >>> cases, the optimizer is invoked and recursively goes back from the >>> converted/emitted Table back to its roots, i.e., a TableSource or a >>> DataSet. >>> >>> This means, that if you create a Table from a TableSource and apply >>> multiple filters on it and write each filter to a TableSink, the CSV file >>> will be read 10 times, filtered 10 times and written 10 times. This is not >>> efficient, because, you could also just read the file once and apply all >>> filters in parallel. >>> You can do this by converting the Table that you read with a TableSource >>> into a DataSet and register the DataSet again as a Table. In that case, the >>> translations of all TableSinks will stop at the DataSet and not include the >>> TableSource which reads the file. >>> >>> The following figures illustrate the difference: >>> >>> 1) Without DataSet in the middle: >>> >>> TableSource -> Filter1 -> TableSink1 >>> TableSource -> Filter2 -> TableSink2 >>> TableSource -> Filter3 -> TableSink3 >>> >>> 2) With DataSet in the middle: >>> >>> /-> Filter1 -> TableSink1 >>> TableSource -<-> Filter2 -> TableSink2 >>> \-> Filter3 -> TableSink3 >>> >>> I'll likely add a feature to internally translate an intermediate Table >>> to make this a bit easier. >>> The underlying problem is that the SQL optimizer cannot translate >>> queries with multiple sinks. >>> Instead, each sink is individually translated and the optimizer does not >>> know that common execution paths could be shared. >>> >>> Best, >>> Fabian >>> >>> >>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>: >>> >>>> Thanks for reply. >>>> >>>> I guess I am not looking for alternate. I am trying to understand what >>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure >>>> they will be reading csv as there is no other way. >>>> >>>> Thanks >>>> >>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <nic...@hedhman.org> >>>> wrote: >>>> >>>>> >>>>> Do you really need the large single table created in step 2? >>>>> >>>>> If not, what you typically do is that the Csv source first do the >>>>> common transformations. Then depending on whether the 10 outputs have >>>>> different processing paths or not, you either do a split() to do >>>>> individual >>>>> processing depending on some criteria, or you just have the sink put each >>>>> record in separate tables. >>>>> You have full control, at each step along the transformation path >>>>> whether it can be parallelized or not, and if there are no sequential >>>>> constraints on your model, then you can easily fill all cores on all hosts >>>>> quite easily. >>>>> >>>>> Even if you need the step 2 table, I would still just treat that as a >>>>> split(), a branch ending in a Sink that does the storage there. No need to >>>>> read records from file over and over again, nor to store them first in >>>>> step >>>>> 2 table and read them out again. >>>>> >>>>> Don't ask *me* about what happens in failure scenarios... I have >>>>> myself not figured that out yet. >>>>> >>>>> HTH >>>>> Niclas >>>>> >>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <darshan.m...@gmail.com >>>>> > wrote: >>>>> >>>>>> Hi I would like to understand the execution model. >>>>>> >>>>>> 1. I have a csv files which is say 10 GB. >>>>>> 2. I created a table from this file. >>>>>> >>>>>> 3. Now I have created filtered tables on this say 10 of these. >>>>>> 4. Now I created a writetosink for all these 10 filtered tables. >>>>>> >>>>>> Now my question is that are these 10 filetered tables be written in >>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as >>>>>> well. >>>>>> >>>>>> Next question I have is that the table which I created form the csv >>>>>> file which is common wont be persisted by flink internally rather for all >>>>>> 10 filtered tables it will read csv files and then apply the filter and >>>>>> write to sink. >>>>>> >>>>>> I think that for all 10 filtered tables it will read csv again and >>>>>> again in this case it will be read 10 times. Is my understanding correct >>>>>> or I am missing something. >>>>>> >>>>>> What if I step 2 I change table to dataset and back? >>>>>> >>>>>> Thanks >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Niclas Hedhman, Software Developer >>>>> http://polygene.apache.org - New Energy for Java >>>>> >>>> >>>> >>> >>> >> >