Is there any plans for this in future. I could see at the plans and without these stats I am bit lost on what to look for like what are pain points etc. I can see some very obvious things but not too much with these plans.
My question is there a guide or document which describes what your plans should look like and what needs to look into this? Also, I would like to know if there is a very complex execution plan(maybe not expensive but very complex) is it usually beneficial to save the intermediate datasets/tables and read them back and do the next steps. Thanks On Tue, Feb 20, 2018 at 9:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > No, there is no size or cardinality estimation happening at the moment. > > Best, Fabian > > 2018-02-19 21:56 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>: > >> Thanks , is there a metric or other way to know how much space each >> task/job is taking? Does execution plan has these details? >> >> Thanks >> >> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> >>> Hi, >>> >>> that's a difficult question without knowing the details of your job. >>> A NoSpaceLeftOnDevice error occurs when a file system is full. >>> >>> This can happen if: >>> - A Flink algorithm writes to disk, e.g., an external sort or the hash >>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct, >>> or any other operation that requires to group or join data. Filters will >>> never spill to disk. >>> - An OutputFormat writes to disk. >>> >>> The data is written to a temp directory, that can be configured in the >>> ./conf/flink-conf.yaml file. >>> >>> Did you check how the tasks are distributed across the task managers? >>> The web UI can help to diagnose such problems. >>> >>> Best, Fabian >>> >>> 2018-02-19 11:22 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>: >>> >>>> Thanks Fabian for such detailed explanation. >>>> >>>> I am using a datset in between so i guess csv is read once. Now to my >>>> real issue i have 6 task managers each having 4 cores and i have 2 slots >>>> per task manager. >>>> >>>> Now my csv file is jus 1 gb and i create table and transform to dataset >>>> and then run 15 different filters and extra processing which all run in >>>> almost parallel. >>>> >>>> However it fails with error no space left on device on one of the task >>>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why >>>> it is running out of space. I do use some joins with othrr tables but those >>>> are few megabytes. >>>> >>>> So i was assuming that somehow all parallel executions were storing >>>> data in /tmp and were filling it. >>>> >>>> So i would like to know wht could be filling space. >>>> >>>> Thanks >>>> >>>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fhue...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> this works as follows. >>>> >>>> - Table API and SQL queries are translated into regular DataSet jobs >>>> (assuming you are running in a batch ExecutionEnvironment). >>>> - A query is translated into a sequence of DataSet operators when you >>>> 1) transform the Table into a DataSet or 2) write it to a TableSink. In >>>> both cases, the optimizer is invoked and recursively goes back from the >>>> converted/emitted Table back to its roots, i.e., a TableSource or a >>>> DataSet. >>>> >>>> This means, that if you create a Table from a TableSource and apply >>>> multiple filters on it and write each filter to a TableSink, the CSV file >>>> will be read 10 times, filtered 10 times and written 10 times. This is not >>>> efficient, because, you could also just read the file once and apply all >>>> filters in parallel. >>>> You can do this by converting the Table that you read with a >>>> TableSource into a DataSet and register the DataSet again as a Table. In >>>> that case, the translations of all TableSinks will stop at the DataSet and >>>> not include the TableSource which reads the file. >>>> >>>> The following figures illustrate the difference: >>>> >>>> 1) Without DataSet in the middle: >>>> >>>> TableSource -> Filter1 -> TableSink1 >>>> TableSource -> Filter2 -> TableSink2 >>>> TableSource -> Filter3 -> TableSink3 >>>> >>>> 2) With DataSet in the middle: >>>> >>>> /-> Filter1 -> TableSink1 >>>> TableSource -<-> Filter2 -> TableSink2 >>>> \-> Filter3 -> TableSink3 >>>> >>>> I'll likely add a feature to internally translate an intermediate Table >>>> to make this a bit easier. >>>> The underlying problem is that the SQL optimizer cannot translate >>>> queries with multiple sinks. >>>> Instead, each sink is individually translated and the optimizer does >>>> not know that common execution paths could be shared. >>>> >>>> Best, >>>> Fabian >>>> >>>> >>>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>: >>>> >>>>> Thanks for reply. >>>>> >>>>> I guess I am not looking for alternate. I am trying to understand what >>>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am >>>>> sure >>>>> they will be reading csv as there is no other way. >>>>> >>>>> Thanks >>>>> >>>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <nic...@hedhman.org> >>>>> wrote: >>>>> >>>>>> >>>>>> Do you really need the large single table created in step 2? >>>>>> >>>>>> If not, what you typically do is that the Csv source first do the >>>>>> common transformations. Then depending on whether the 10 outputs have >>>>>> different processing paths or not, you either do a split() to do >>>>>> individual >>>>>> processing depending on some criteria, or you just have the sink put each >>>>>> record in separate tables. >>>>>> You have full control, at each step along the transformation path >>>>>> whether it can be parallelized or not, and if there are no sequential >>>>>> constraints on your model, then you can easily fill all cores on all >>>>>> hosts >>>>>> quite easily. >>>>>> >>>>>> Even if you need the step 2 table, I would still just treat that as a >>>>>> split(), a branch ending in a Sink that does the storage there. No need >>>>>> to >>>>>> read records from file over and over again, nor to store them first in >>>>>> step >>>>>> 2 table and read them out again. >>>>>> >>>>>> Don't ask *me* about what happens in failure scenarios... I have >>>>>> myself not figured that out yet. >>>>>> >>>>>> HTH >>>>>> Niclas >>>>>> >>>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh < >>>>>> darshan.m...@gmail.com> wrote: >>>>>> >>>>>>> Hi I would like to understand the execution model. >>>>>>> >>>>>>> 1. I have a csv files which is say 10 GB. >>>>>>> 2. I created a table from this file. >>>>>>> >>>>>>> 3. Now I have created filtered tables on this say 10 of these. >>>>>>> 4. Now I created a writetosink for all these 10 filtered tables. >>>>>>> >>>>>>> Now my question is that are these 10 filetered tables be written in >>>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as >>>>>>> well. >>>>>>> >>>>>>> Next question I have is that the table which I created form the csv >>>>>>> file which is common wont be persisted by flink internally rather for >>>>>>> all >>>>>>> 10 filtered tables it will read csv files and then apply the filter and >>>>>>> write to sink. >>>>>>> >>>>>>> I think that for all 10 filtered tables it will read csv again and >>>>>>> again in this case it will be read 10 times. Is my understanding >>>>>>> correct >>>>>>> or I am missing something. >>>>>>> >>>>>>> What if I step 2 I change table to dataset and back? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Niclas Hedhman, Software Developer >>>>>> http://polygene.apache.org - New Energy for Java >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >