I did not load the CSV file using the approach you suggested. I was loading it outside the operators (at the beginning of the main method of my class), since the file will be needed by multiple operators for sure. When the file was small, I saw the job registered and started, but when I used a big CSV file, the job never got registered with the task manager (I tried the ‘list' command and got nothing).
Here’s what I saw with the small(ish) file: # flink run analytics-flink.jar 19001 minisubs.csv output.csv loaded 200000 subscribers from csv file 11/02/2015 16:36:59 Job execution switched to status RUNNING. 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to SCHEDULED 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to DEPLOYING 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to RUNNING And here’s what I saw with the big file: # flink run analytics-flink.jar 19001 subs.csv output.csv loaded 1173547 subscribers from csv file I’m already using the streaming mode. I’m running a single Flink node right now on Centos 7 using the ‘start-local-streaming.sh’ script. Thanks, Ali On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org> wrote: >Okay. > >you should be able to implement it as you described initially. I would do >the transformation in a map() operator of Flink. The RichMapFunction >provides you with an open() method which is called before the first record >arrives. >In the open() method, I would read the csv file(s) from HDFS or another >file system accessible by all nodes. > >Then, you can access the data from the files in the map operator. > >In order to utilize the memory best, I would recommend to start Flink in >the "streaming" mode. (-st argument on YARN). With that enabled, we >provide >more memory to streaming operators. >Also, I would only expose one processing slot per TaskManager, this way we >ensure that the files are only read once per TaskManager. (make sure you >have only one TaskManager per machine). > >Why did your previous approach fail? Do you still have the error message? > >Regards, >Robert > >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > >> Hi Robert, >> >> The CSV file (or files as there will definitely be more than one) can be >> large (let¹s say 1 GB). Memory is not an issue though. Each node has at >> least 64 GB RAM mounted. The CSV files should easily fit in the memory >>of >> each node. >> >> Regards, >> Ali >> >> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org> wrote: >> >> >Hi Ali, >> > >> >I'm excited to hear that EMC is looking into Apache Flink. I think the >> >solution to this problem depends on one question: What is the size of >>the >> >data in the CSV file compared to the memory you have available in the >> >cluster? >> >Would the mapping table from the file fit into the memory of all nodes >> >running Flink? >> > >> >Regards, >> >Robert >> > >> >PS: Did you subscribe to the mailing list? I've CCed you in case you're >> >not >> >subscribed yet >> > >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <ali.kash...@emc.com> >>wrote: >> > >> >> Hi there, >> >> >> >> I¹m trying to design and implement a use case in Flink where I¹m >> >>receiving >> >> protocol packets over a socket. Each packet has the subscriber IMSI >>in >> >>it >> >> and a bunch of more data. At the same time, I have a csv file with a >> >> mapping from IMSI -> subscriber group. I need to inject the group >>into >> >> packet and then send it to the sink. >> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing the >>map >> >> from within the Flink operators but that only works when the CSV is >>very >> >> small (a few hundred subscribers). I¹ve tried creating another stream >> >>for >> >> the CSV and connecting the streams but that doesn¹t yield anything >>as I >> >> can¹t have access to objects from both streams at the same time. >> >> >> >> How would you guys approach this? >> >> >> >> Thanks, >> >> Ali >> >> >> >>