Hi Robert, Thanks for the help! I’ve managed to implement my use case using your suggested approach of combining the streams.
Just a follow up on 2b) below, I’m not clear on this statement "partition (split) the data stream so that the right protocol packets end up at the right machine”. How do I know which machine the data is ending up at? My understanding is that the Flink program is agnostic of the cluster nodes. Maybe it would help if I explained this use case: 1. Load a CSV file and split it equally, using the ID in the CSV record, across the Flink cluster to be stored in memory (operator’s memory maybe?). This is basically an initialization step. 2. Once 1) is done, read events from a socket (for now) and use the ID in the event to add attributes from the matching CSV record to the event. Store the updated events in a file. Based on those two requirements, what can be accomplished using Flink and what can’t be? Is the stuff that can’t be done in Flink’s roadmap? Thanks, Ali On 2015-11-05, 5:29 PM, "Robert Metzger" <rmetz...@apache.org> wrote: >Hi Ali, > >1. You can connect two streams and then use the co-map operator to consume >data from both streams. I'm not sure how much data arrives from one or the >other stream, but maybe you can store (update) the data in memory. >Read more here >https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guid >e.html#datastream-abstraction > >2 a) No, I think all the taskmanager nodes are listening to data. For >making this highly available, I would recommend to let the system which is >producing the data write it to Apache Kafka. Then, consume the data from >Kafka using Flink. >This way you get very good high availability and througput and you don't >have to worry about the sockets. > >2 b) Sure, you can implement the splitting yourself (each mapper reads N >lines of the file) and then partition (split) the data stream so that the >right protocol packets end up at the right machine. >However, if the entire CSV file fits into the entire memory of one >machine, >its probably faster to not split the stream and use each machine to join >the data locally. > >Its really no problem that you're asking questions, that's what the >mailing >list is made for. >I'm looking forward to the next set of questions ;) > >Regards, >Robert > > > >On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > >> Hi Robert, >> >> I tried the approach you suggested and it works nicely. Thanks! >> >> I have a few more questions if you don’t mind: >> >> 1. Is there a way to retrieve in one stream data that's stored in >>another >> stream? I have a location stream that I can use to store the latest >> subscriber location. I have another stream that needs access to the >>latest >> subscriber location processed by the location stream. I read a bit on >> broadcast variables but they’re only available for DataSets, not >> DataStreams. Did I miss a way in Flink to do this? >> >> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master >> and 2 slaves). >> >> a. If I use a socket stream, does each node listen for data on its >> socket or is it only the job manager node? I assume it’s the latter. >>This >> is important because I have to figure out how to make the >> system highly >> available. >> b. Is there a way to split the afore-mentioned CSV file across the >> three nodes in the cluster? >> >> Sorry for bombarding you with questions. >> >> Thanks, >> Ali >> >> >> On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetz...@apache.org> wrote: >> >> >Hi Ali, >> > >> >great, the start-local-streaming.sh script sounds right. >> > >> >I can explain why your first approach didn't work: >> > >> >You were trying to send the CSV files from the Flink client to the >>cluster >> >using our RPC system (Akka). When you submit a job to Flink, we >>serialize >> >all the objects the user created (mappers, sources, ...) and send it to >> >the >> >cluster. >> >There is a method StreamExecutionEnvironment.fromElements(..) which >>allows >> >users to serialize a few objects along with the job submission. But the >> >amount of data you can transfer like this is limited by the Akka frame >> >size. In our case I think the default is 10 megabytes. >> >After that, Akka will probably just drop or reject the deployment >>message. >> > >> >I'm pretty sure the approach I've suggested will resolve the issue. >> > >> >Please let me know if you need further assistance. >> > >> >Regards, >> >Robert >> > >> > >> > >> >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <ali.kash...@emc.com> >>wrote: >> > >> >> I did not load the CSV file using the approach you suggested. I was >> >> loading it outside the operators (at the beginning of the main >>method of >> >> my class), since the file will be needed by multiple operators for >>sure. >> >> When the file was small, I saw the job registered and started, but >>when >> >>I >> >> used a big CSV file, the job never got registered with the task >>manager >> >>(I >> >> tried the ‘list' command and got nothing). >> >> >> >> Here’s what I saw with the small(ish) file: >> >> >> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv >> >> loaded 200000 subscribers from csv file >> >> 11/02/2015 16:36:59 Job execution switched to status RUNNING. >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> >>Stream >> >> Sink(1/1) switched to SCHEDULED >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> >>Stream >> >> Sink(1/1) switched to DEPLOYING >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> >>Stream >> >> Sink(1/1) switched to RUNNING >> >> >> >> >> >> And here’s what I saw with the big file: >> >> >> >> # flink run analytics-flink.jar 19001 subs.csv output.csv >> >> loaded 1173547 subscribers from csv file >> >> >> >> >> >> I’m already using the streaming mode. I’m running a single Flink node >> >> right now on Centos 7 using the ‘start-local-streaming.sh’ script. >> >> >> >> Thanks, >> >> Ali >> >> >> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org> >>wrote: >> >> >> >> >Okay. >> >> > >> >> >you should be able to implement it as you described initially. I >>would >> >>do >> >> >the transformation in a map() operator of Flink. The RichMapFunction >> >> >provides you with an open() method which is called before the first >> >>record >> >> >arrives. >> >> >In the open() method, I would read the csv file(s) from HDFS or >>another >> >> >file system accessible by all nodes. >> >> > >> >> >Then, you can access the data from the files in the map operator. >> >> > >> >> >In order to utilize the memory best, I would recommend to start >>Flink >> >>in >> >> >the "streaming" mode. (-st argument on YARN). With that enabled, we >> >> >provide >> >> >more memory to streaming operators. >> >> >Also, I would only expose one processing slot per TaskManager, this >> >>way we >> >> >ensure that the files are only read once per TaskManager. (make sure >> >>you >> >> >have only one TaskManager per machine). >> >> > >> >> >Why did your previous approach fail? Do you still have the error >> >>message? >> >> > >> >> >Regards, >> >> >Robert >> >> > >> >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <ali.kash...@emc.com> >> >>wrote: >> >> > >> >> >> Hi Robert, >> >> >> >> >> >> The CSV file (or files as there will definitely be more than one) >> >>can be >> >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node >>has >> >>at >> >> >> least 64 GB RAM mounted. The CSV files should easily fit in the >> >>memory >> >> >>of >> >> >> each node. >> >> >> >> >> >> Regards, >> >> >> Ali >> >> >> >> >> >> >> >> >> >> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org> >> wrote: >> >> >> >> >> >> >Hi Ali, >> >> >> > >> >> >> >I'm excited to hear that EMC is looking into Apache Flink. I >>think >> >>the >> >> >> >solution to this problem depends on one question: What is the >>size >> >>of >> >> >>the >> >> >> >data in the CSV file compared to the memory you have available in >> >>the >> >> >> >cluster? >> >> >> >Would the mapping table from the file fit into the memory of all >> >>nodes >> >> >> >running Flink? >> >> >> > >> >> >> >Regards, >> >> >> >Robert >> >> >> > >> >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case >> >>you're >> >> >> >not >> >> >> >subscribed yet >> >> >> > >> >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali >><ali.kash...@emc.com> >> >> >>wrote: >> >> >> > >> >> >> >> Hi there, >> >> >> >> >> >> >> >> I¹m trying to design and implement a use case in Flink where >>I¹m >> >> >> >>receiving >> >> >> >> protocol packets over a socket. Each packet has the subscriber >> >>IMSI >> >> >>in >> >> >> >>it >> >> >> >> and a bunch of more data. At the same time, I have a csv file >> >>with a >> >> >> >> mapping from IMSI -> subscriber group. I need to inject the >>group >> >> >>into >> >> >> >> packet and then send it to the sink. >> >> >> >> >> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing >> >>the >> >> >>map >> >> >> >> from within the Flink operators but that only works when the >>CSV >> >>is >> >> >>very >> >> >> >> small (a few hundred subscribers). I¹ve tried creating another >> >>stream >> >> >> >>for >> >> >> >> the CSV and connecting the streams but that doesn¹t yield >>anything >> >> >>as I >> >> >> >> can¹t have access to objects from both streams at the same >>time. >> >> >> >> >> >> >> >> How would you guys approach this? >> >> >> >> >> >> >> >> Thanks, >> >> >> >> Ali >> >> >> >> >> >> >> >> >> >> >> >> >> >>