Hello, I will describe my use case shortly with steps for easier understanding: 1) currently my job is loading data from parquet files using HadoopInputFormat along with AvroParquetInputFormat, with current approach: AvroParquetInputFormat<GenericRecord> inputFormat = new AvroParquetInputFormat<GenericRecord>(); AvroParquetInputFormat.setAvroReadSchema(job, schema); AvroParquetInputFormat.setUnboundRecordFilter(job, recordFilterClass); HadoopInputFormat<Void, GenericRecord> hadoopInputFormat = HadoopInputs.createHadoopInput(inputFormat, Void.class, GenericRecord.class, job); return environment.createInput(hadoopInputFormat); 2) data is loaded into DataSource and after various transformations is grouped by my "user_id" key, 3) in GroupReduceFunction I am dealing with values for given user, 4) for each group in reduce function I am extracting the key (which has been used for earlier grouping) and would like to read additional data (parquet files from HDFS for specific key extracted before), which are required for further grouped data processing 5) after processing inside reduce function, I would like to store results in parquet files using AvroParquerWriter class.
My question is how additional data loading inside reduce function (or any other transformation) can be achieved in step number 4). In my perfect scenario I would like to use HadoopInputFormat (just like for loading initial data in first step), however I am missing environment context here (probably?). Is there any way to achieve this or this scenarios is completely wrong and therefore badly designed? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/