Hi Flavio, I’ll try and answer your questions:
Regarding 1. Why do you need to first read the data from HDFS into Kafka (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, String, FileProcessingMode, long) you can monitor a directory in HDFS and process the files that are there and any newly arriving files. For batching your output, you could look into the BucketingSink which will write to files in HDFS (or some other DFS) and start new files (buckets) based on some criteria, for example number of processed elements or time. Regarding 2. I didn’t completely understand this part. Could you maybe elaborate a bit, please? Regarding 3. Yes, I think you can. You would use this to fire of your queries to solr/ES. Best, Aljoscha > On 11. May 2017, at 15:06, Flavio Pompermaier <pomperma...@okkam.it> wrote: > > Hi to all, > we have a particular use case where we have a tabular dataset on HDFS (e.g. a > CSV) that we want to enrich filling some cells with the content returned by a > query to a reverse index (e.g. solr/elasticsearch). > Since we want to be able to make this process resilient and scalable we > thought that Flink streaming could be a good fit since we can control the > "pressure" on the index by adding/removing consumers dynamically and there is > automatic error recovery. > > Right now we developed 2 different solutions to the problem: > move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and then > let the queue consumers do the real job (pull Rows from the queue, enrich and > then persist the enriched Rows). The questions here are: > how to properly manage writing to HDFS ? if we read a set of rows, we enrich > them and we need to write the result back to HDFS, is it possible to > automatically compact files in order to avoid the "too many small files" > problem on HDFS? How to avoid file name collision (put each batch of rows to > a different file)? > how to control the number dynamically? is it possible to change the > parallelism once the job has started? > in order to avoid useless data transfer from HDFS to a queue/topic (since we > don't need all the Row fields to create the query..usually only 2/5 fields > are needed) we can create a Flink job that put the queries into a queue/topic > and wait for the result. The problem with this approach is: > how to correlate queries with their responses? creating a unique response > queue/topic implies that all consumers reads all messages (and discard those > that are not directed to them) while creating a queue/topic for each sub-task > could be expansive (in terms of resources and managment..but we don't have > any evidence/experience of this..it's just a possible problem). > Maybe we can exploit Flink async/IO somehow...? But how? > > Any suggestion/drawbacks on the 2 approaches? > > Thanks in advance, > Flavio