Hi Flavio, sure. This code should be close to what you need:
public static class BatchingMapper implements MapPartitionFunction<String, String[]> { int cnt = 0; String[] batch = new String[1000]; @Override public void mapPartition(Iterable<String> values, Collector<String[]> out) throws Exception { for(String v : values) { batch[cnt++] = v; if (cnt == 1000) { // emit batch out.collect(batch); Arrays.fill(batch, null); cnt = 0; } } // handle the last batch String[] lastBatch = Arrays.copyOf(batch, cnt); out.collect(lastBatch); } } Cheers, Fabian 2016-11-28 20:44 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: > Thanks for the support Fabian! > I think I'll try the tumbling window method, it seems cleaner. Btw, just > for the sake of completeness, can you show me a brief snippet (also in > pseudocode) of a mapPartition that groups together elements into chunks of > size n? > > Best, > Flavio > > On Mon, Nov 28, 2016 at 8:24 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Flavio, >> >> I think the easiest solution is to read the CSV file with the >> CsvInputFormat and use a subsequent MapPartition to batch 1000 rows >> together. >> In each partition, you might end up with an incomplete batch. >> However, I don't see yet how you can feed these batches into the >> JdbcInputFormat which does not accept a DataSet as input. You could create >> a RichMapFunction that contains the logic of the JdbcInputFormat to >> directly query the database with the input of the MapPartitionOperator. >> >> If you want to use the DataStream API, you can use a tumbling count >> window to group IDs together and query the external database in a >> subsequent Map operator. >> >> Hope this helps, >> Fabian >> >> >> 2016-11-28 18:32 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Hi to all, >>> >>> I have a use case where I have to read a huge csv containing ids to >>> fetch from a table in a db. >>> The jdbc input format can handle parameterized queries so I was thinking >>> to fetch data using 1000 id at a time. What is the easiest whay to divide a >>> dataset by slices of 1000 ids each (in order to create parameters for my >>> JDBC Input format)? Is that possible? >>> Or maybe there's an easiest solutions using streaming APIs? >>> >>> Best, >>> Flavio >>> >>