Hi Fabian, thanks for your directions! They worked flawlessly. I am aware of the reduced robustness, but then again my input is only available on each worker and not replicated. In case anyone is wondering, here is how I did it: *https://github.com/robert-schmidtke/hdfs-statistics-adapter/tree/2a456a832c9d84b324a966c431171f761f3444f5 <https://github.com/robert-schmidtke/hdfs-statistics-adapter/tree/2a456a832c9d84b324a966c431171f761f3444f5>*
Thanks again! Robert On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Robert, > > this is indeed a bit tricky to do. The problem is mostly with the > generation of the input splits, setup of Flink, and the scheduling of tasks. > > 1) you have to ensure that on each worker at least one DataSource task is > scheduled. The easiest way to do this is to have a bare metal setup (no > YARN) and a single TaskManager per worker. Each TM should have the same > number of slots and the DataSource should have a parallelism of #TMs * > slots. This will ensure that the same number of DataSource tasks is started > on each worker. > > 2) you need to tweak the input split generation. Flink's FileInputFormat > assume that it can access all files to be read via a distributed file > system. Your InputFormat should have a parameter for the list of > taskmanager (hostnames, IP addresses) and the number of slots per TM. The > InputFormat.createInputSplits() should create one input split for each > parallel task. Each split should have (hostname, local index) > > 3) you need to tweak the input split assignment. You need to provide a > custom input split provider that ensures that splits are only assigned to > the correct task manager. Otherwise it might happen that a TaskManager > processes the split of another TM and some data is read twice while other > data is not read at all. > > 4) once a split is assigned to a task the InputFormat.open() method is > called. Based on the local index, the task should decide which files (or > parts of files) it needs to read. This decision must be deterministic (only > depend on local index) and ensure that all data (files / parts of files) > are read exactly once (you'll need the number of slots per host for that). > > You see, this is not trivial. Moreover, such a setup is not flexible, > quite fragile, and not fault tolerant. > Since you need to read local files are not available anywhere else, your > job will fail if a TM goes down. > > If possible, I would recommend to move the data into a distributed file > system. > > Best, > Fabian > > 2016-12-27 13:04 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > >> Hi everyone, >> >> I'm using Flink and/or Hadoop on my cluster, and I'm having them generate >> log data in each worker node's /local folder (regular mount point). Now I >> would like to process these files using Flink, but I'm not quite sure how I >> could tell Flink to use each worker node's /local folder as input path, >> because I'd expect Flink to look in the /local folder of the submitting >> node only. Do I have to put these files into HDFS or is there a way to tell >> Flink the file:///local file URI refers to worker-local data? Thanks in >> advance for any hints and best >> >> Robert >> >> -- >> My GPG Key ID: 336E2680 >> > > -- My GPG Key ID: 336E2680 On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Robert, > > this is indeed a bit tricky to do. The problem is mostly with the > generation of the input splits, setup of Flink, and the scheduling of tasks. > > 1) you have to ensure that on each worker at least one DataSource task is > scheduled. The easiest way to do this is to have a bare metal setup (no > YARN) and a single TaskManager per worker. Each TM should have the same > number of slots and the DataSource should have a parallelism of #TMs * > slots. This will ensure that the same number of DataSource tasks is started > on each worker. > > 2) you need to tweak the input split generation. Flink's FileInputFormat > assume that it can access all files to be read via a distributed file > system. Your InputFormat should have a parameter for the list of > taskmanager (hostnames, IP addresses) and the number of slots per TM. The > InputFormat.createInputSplits() should create one input split for each > parallel task. Each split should have (hostname, local index) > > 3) you need to tweak the input split assignment. You need to provide a > custom input split provider that ensures that splits are only assigned to > the correct task manager. Otherwise it might happen that a TaskManager > processes the split of another TM and some data is read twice while other > data is not read at all. > > 4) once a split is assigned to a task the InputFormat.open() method is > called. Based on the local index, the task should decide which files (or > parts of files) it needs to read. This decision must be deterministic (only > depend on local index) and ensure that all data (files / parts of files) > are read exactly once (you'll need the number of slots per host for that). > > You see, this is not trivial. Moreover, such a setup is not flexible, > quite fragile, and not fault tolerant. > Since you need to read local files are not available anywhere else, your > job will fail if a TM goes down. > > If possible, I would recommend to move the data into a distributed file > system. > > Best, > Fabian > > 2016-12-27 13:04 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > >> Hi everyone, >> >> I'm using Flink and/or Hadoop on my cluster, and I'm having them generate >> log data in each worker node's /local folder (regular mount point). Now I >> would like to process these files using Flink, but I'm not quite sure how I >> could tell Flink to use each worker node's /local folder as input path, >> because I'd expect Flink to look in the /local folder of the submitting >> node only. Do I have to put these files into HDFS or is there a way to tell >> Flink the file:///local file URI refers to worker-local data? Thanks in >> advance for any hints and best >> >> Robert >> >> -- >> My GPG Key ID: 336E2680 >> > > -- My GPG Key ID: 336E2680