Hi Robert, I tried the approach you suggested and it works nicely. Thanks!
I have a few more questions if you don’t mind: 1. Is there a way to retrieve in one stream data that's stored in another stream? I have a location stream that I can use to store the latest subscriber location. I have another stream that needs access to the latest subscriber location processed by the location stream. I read a bit on broadcast variables but they’re only available for DataSets, not DataStreams. Did I miss a way in Flink to do this? 2. We are planning to test this on a Flink cluster of 3 nodes (1 master and 2 slaves). a. If I use a socket stream, does each node listen for data on its socket or is it only the job manager node? I assume it’s the latter. This is important because I have to figure out how to make the system highly available. b. Is there a way to split the afore-mentioned CSV file across the three nodes in the cluster? Sorry for bombarding you with questions. Thanks, Ali On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetz...@apache.org> wrote: >Hi Ali, > >great, the start-local-streaming.sh script sounds right. > >I can explain why your first approach didn't work: > >You were trying to send the CSV files from the Flink client to the cluster >using our RPC system (Akka). When you submit a job to Flink, we serialize >all the objects the user created (mappers, sources, ...) and send it to >the >cluster. >There is a method StreamExecutionEnvironment.fromElements(..) which allows >users to serialize a few objects along with the job submission. But the >amount of data you can transfer like this is limited by the Akka frame >size. In our case I think the default is 10 megabytes. >After that, Akka will probably just drop or reject the deployment message. > >I'm pretty sure the approach I've suggested will resolve the issue. > >Please let me know if you need further assistance. > >Regards, >Robert > > > >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > >> I did not load the CSV file using the approach you suggested. I was >> loading it outside the operators (at the beginning of the main method of >> my class), since the file will be needed by multiple operators for sure. >> When the file was small, I saw the job registered and started, but when >>I >> used a big CSV file, the job never got registered with the task manager >>(I >> tried the ‘list' command and got nothing). >> >> Here’s what I saw with the small(ish) file: >> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv >> loaded 200000 subscribers from csv file >> 11/02/2015 16:36:59 Job execution switched to status RUNNING. >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream >> Sink(1/1) switched to SCHEDULED >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream >> Sink(1/1) switched to DEPLOYING >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream >> Sink(1/1) switched to RUNNING >> >> >> And here’s what I saw with the big file: >> >> # flink run analytics-flink.jar 19001 subs.csv output.csv >> loaded 1173547 subscribers from csv file >> >> >> I’m already using the streaming mode. I’m running a single Flink node >> right now on Centos 7 using the ‘start-local-streaming.sh’ script. >> >> Thanks, >> Ali >> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org> wrote: >> >> >Okay. >> > >> >you should be able to implement it as you described initially. I would >>do >> >the transformation in a map() operator of Flink. The RichMapFunction >> >provides you with an open() method which is called before the first >>record >> >arrives. >> >In the open() method, I would read the csv file(s) from HDFS or another >> >file system accessible by all nodes. >> > >> >Then, you can access the data from the files in the map operator. >> > >> >In order to utilize the memory best, I would recommend to start Flink >>in >> >the "streaming" mode. (-st argument on YARN). With that enabled, we >> >provide >> >more memory to streaming operators. >> >Also, I would only expose one processing slot per TaskManager, this >>way we >> >ensure that the files are only read once per TaskManager. (make sure >>you >> >have only one TaskManager per machine). >> > >> >Why did your previous approach fail? Do you still have the error >>message? >> > >> >Regards, >> >Robert >> > >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <ali.kash...@emc.com> >>wrote: >> > >> >> Hi Robert, >> >> >> >> The CSV file (or files as there will definitely be more than one) >>can be >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node has >>at >> >> least 64 GB RAM mounted. The CSV files should easily fit in the >>memory >> >>of >> >> each node. >> >> >> >> Regards, >> >> Ali >> >> >> >> >> >> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org> wrote: >> >> >> >> >Hi Ali, >> >> > >> >> >I'm excited to hear that EMC is looking into Apache Flink. I think >>the >> >> >solution to this problem depends on one question: What is the size >>of >> >>the >> >> >data in the CSV file compared to the memory you have available in >>the >> >> >cluster? >> >> >Would the mapping table from the file fit into the memory of all >>nodes >> >> >running Flink? >> >> > >> >> >Regards, >> >> >Robert >> >> > >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case >>you're >> >> >not >> >> >subscribed yet >> >> > >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <ali.kash...@emc.com> >> >>wrote: >> >> > >> >> >> Hi there, >> >> >> >> >> >> I¹m trying to design and implement a use case in Flink where I¹m >> >> >>receiving >> >> >> protocol packets over a socket. Each packet has the subscriber >>IMSI >> >>in >> >> >>it >> >> >> and a bunch of more data. At the same time, I have a csv file >>with a >> >> >> mapping from IMSI -> subscriber group. I need to inject the group >> >>into >> >> >> packet and then send it to the sink. >> >> >> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing >>the >> >>map >> >> >> from within the Flink operators but that only works when the CSV >>is >> >>very >> >> >> small (a few hundred subscribers). I¹ve tried creating another >>stream >> >> >>for >> >> >> the CSV and connecting the streams but that doesn¹t yield anything >> >>as I >> >> >> can¹t have access to objects from both streams at the same time. >> >> >> >> >> >> How would you guys approach this? >> >> >> >> >> >> Thanks, >> >> >> Ali >> >> >> >> >> >> >> >>