Hi Flavio,

sure.
This code should be close to what you need:

public static class BatchingMapper implements
MapPartitionFunction<String, String[]> {

   int cnt = 0;
   String[] batch = new String[1000];

   @Override
   public void mapPartition(Iterable<String> values,
Collector<String[]> out) throws Exception {
      for(String v : values) {
         batch[cnt++] = v;
         if (cnt == 1000) {
            // emit batch
            out.collect(batch);
            Arrays.fill(batch, null);
            cnt = 0;
         }
      }
      // handle the last batch
      String[] lastBatch = Arrays.copyOf(batch, cnt);
      out.collect(lastBatch);
   }
}

Cheers, Fabian

2016-11-28 20:44 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Thanks for the support Fabian!
> I think I'll try the tumbling window method, it seems cleaner. Btw, just
> for the sake of completeness, can you show me a brief snippet (also in
> pseudocode) of a mapPartition that groups together elements into chunks of
> size n?
>
> Best,
> Flavio
>
> On Mon, Nov 28, 2016 at 8:24 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> I think the easiest solution is to read the CSV file with the
>> CsvInputFormat and use a subsequent MapPartition to batch 1000 rows
>> together.
>> In each partition, you might end up with an incomplete batch.
>> However, I don't see yet how you can feed these batches into the
>> JdbcInputFormat which does not accept a DataSet as input. You could create
>> a RichMapFunction that contains the logic of the JdbcInputFormat to
>> directly query the database with the input of the MapPartitionOperator.
>>
>> If you want to use the DataStream API, you can use a tumbling count
>> window to group IDs together and query the external database in a
>> subsequent Map operator.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2016-11-28 18:32 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> Hi to all,
>>>
>>> I have a use case where I have to read a huge csv containing ids to
>>> fetch from a table in a db.
>>> The jdbc input format can handle parameterized queries so I was thinking
>>> to fetch data using 1000 id at a time. What is the easiest whay to divide a
>>> dataset by slices of 1000 ids each (in order to create parameters for my
>>> JDBC Input format)? Is that possible?
>>> Or maybe there's an easiest solutions using streaming APIs?
>>>
>>> Best,
>>> Flavio
>>>
>>

Reply via email to