olivbrau opened a new issue, #3248:
URL: https://github.com/apache/parquet-java/issues/3248
### Describe the usage question you have. Please include as many useful
details as possible.
Hello,
I would like some advice on how to use the Java-parquet library as
efficiently as possible.
To export a Parquet file, I noticed there are several possible approaches.
I tried several of them, but each time I obtained performance significantly
lower than the R function `write_parquet()`, which I used as a benchmark
reference.
I ended up with the method I will describe just below, but I would like to
know if there are things I could improve.
The idea is, if possible, not to use intermediate structures
(`RecordConsumer`, `SimpleGroup`, etc.), which duplicate the data and seem to
slow down the export.
My goal is to use the lowest-level API of Parquet to make the process faster.
The data I want to export is stored in a memory structure (essentially plain
Java arrays):
- I create a `Configuration` object, notably specifying compression settings
- I create a schema (`MessageType`) describing the table I want to export
- I create a `ParquetFileWriter` with default options, assigned to a
`LocalOutputFile`
- I create a `ParquetProperties`, specifying version `2_0`, various options
(dictionaries,
`withPageSize = 8MB`, `withPageRowCountLimit = 1000`)
- I create a `CompressionCodecFactory.BytesInputCompressor`, using SNAPPY
- Then I start writing the data:
- I call `fileWriter.start()`
- I manually compute the number of row groups and how many rows each row
group will contain
- For each row group:
- I create a `ColumnChunkPageWriteStore`
- I create a `ColumnWriteStore` from that `ColumnChunkPageWriteStore`
- I create a `MessageColumnIO`
- I create as many `ColumnWriter` as there are columns to export
- For each column, I call a series of `write()` for each row of the
current row group
- At the end of the row group, I call a series of `endRecord()` on my
`ColumnWriteStore`
- Then I call `startBlock()` on my `fileWriter`, `flush()` on the
`ColumnWriteStore`,
`flushToFileWriter()` on the `ColumnChunkPageWriteStore`, and finally
`endBlock()`
All of this seems so complex that I wonder if I’m doing things the right
way.
I tried parallelizing the writing phase, but the time-consuming part is the
`flush` phase
at the end of the row group: that’s when the compressors do their work,
sequentially,
and I find it unfortunate that it's impossible to parallelize this step — it
would save a lot of time.
Moreover, the entire row group is kept in memory (duplicated from the
original data) before flushing.
This consumes a lot of memory, forcing me to use many row groups. But the
more row groups there are,
the larger the final file becomes, as each row group seems to take up a fair
amount of space (headers, etc.) in the Parquet file,
and the export is also significantly slower.
I’m attaching my Java code that implements these steps.
Thank you in advance for any advice on how to improve this.
```
package ob.analyseurdata.core.io.tableexport.parquet;
import java.awt.Component;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.stream.IntStream;
import javax.swing.JOptionPane;
import jsat.utils.concurrent.ParallelUtils;
import ob.analyseurdata.core.data.Column;
import ob.common.data.UtilsData;
import static ob.common.data.UtilsData.DataTypePrimitif.DBL;
import static ob.common.data.UtilsData.DataTypePrimitif.EMPTY;
import static ob.common.data.UtilsData.DataTypePrimitif.GEOM;
import static ob.common.data.UtilsData.DataTypePrimitif.INT;
import static ob.common.data.UtilsData.DataTypePrimitif.STR;
import ob.common.errors.AbortedException;
import ob.common.errors.MyException;
import ob.common.errors.MyInternalException;
import ob.common.ihm.longtask.I_LongTaskManager;
import ob.common.ihm.longtask.LongTask;
import ob.common.utils.Logger;
import ob.common.utils.UtilsTime;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.apache.parquet.util.AutoCloseables;
public class TableExporterPARQUET_lowLevelAPI extends LongTask {
private final TableExportParamsPARQUET exportParams;
private boolean hasError = false;
private Throwable th;
private ParquetFileWriter fileWriter;
private boolean closed = false;
private boolean aborted = false;
private Configuration conf;
private ParquetProperties props;
// On reprend ici les variables du couple ParquetWriter +
InternalParquetRecordWriter
private CompressionCodecFactory.BytesInputCompressor compressor;
private MessageType schema;
private int rowGroupOrdinal = 0; // numéro du RowGroup courant
private int recordCount; // nb record dans le row group courant
private ColumnWriteStore columnStore;
private ColumnChunkPageWriteStore pageStore;
private BloomFilterWriteStore bloomFilterWriteStore;
private ColumnWriter[] columnWriters;
public TableExporterPARQUET_lowLevelAPI(TableExportParamsPARQUET
exportParams, I_LongTaskManager ltManager) {
super(ltManager, "Export", 0);
this.exportParams = exportParams;
}
public void export(boolean bShowConfirm, Component compParentConfirm) {
// // tentative de charger les lib native ...
// // telechargée ici : https://github.com/cdarlint/winutils
// // mais ca ne fonctionne pas : NativeCodeLoader rale qu'il ne
trouve pas ce qu'il cherche
// String old = System.getProperty("java.library.path");
// System.setProperty("hadoop.home.dir", old +
";D:\\Users\\braultoli\\Desktop\\hadoop-3.3.6\\bin");
// System.setProperty("java.library.path", old +
";D:\\Users\\braultoli\\Desktop\\hadoop-3.3.6\\bin");
// System.out.println(System.getProperty("java.library.path"));
// // ==> ce n'est qu'en mettant
-Djava.library.path=D:\Users\braultoli\Desktop\hadoop-3.3.6\bin
// // que la librairie native se charge
// // cependant, elle n'inclut pas les compresseur natifs ...
try {
int nbColToWrite = exportParams.getNbColToWrite();
int nbRowToWrite = exportParams.getNbLineToWrite();
setNbStepTotal(nbRowToWrite);
setName("Exporting " + exportParams.table.getFullName()
+ " --> " + exportParams.pathFile);
//****************************************************
Logger.log_INFO1("***********************************************************");
Logger.log_INFO1("Export PARQUET " +
exportParams.table.getName());
Logger.log_INFO2(" -> nb de lignes : " +
nbRowToWrite);
Logger.log_INFO2(" -> nb de colonnes : " +
nbColToWrite);
// Fichier déjà existant ?
if (new File(exportParams.pathFile).exists()) {
if (bShowConfirm) {
int res =
JOptionPane.showConfirmDialog(compParentConfirm, "Fichier déjà existant : \n" +
exportParams.pathFile + "\n\nVoulez-vous le remplacer ?", "Ecraser le fichier
?", JOptionPane.OK_CANCEL_OPTION, JOptionPane.QUESTION_MESSAGE);
if (res != JOptionPane.OK_OPTION) {
Logger.log_INFO1("Export
ANNULE");
Logger.log_INFO1("***********************************************************\n");
return;
}
}
Files.deleteIfExists(Path.of(exportParams.pathFile));
}
if (nbColToWrite==0)
// Je ne sais pas s'il est valide de créer un
fichier parquet vide...
// Donc on émet une erreur si rien à exporter
throw new MyException("Aucune colonne à
exporter");
//************* Export
*************************************************
UtilsTime.startTimer("export parquet");
taskStarted();
// Configuration Hadoop
conf = createConfiguration();
// Format de la table
schema = createSchema();
// Fichier de sortie
fileWriter = createFileWriter();
// Propriétés parquet
props = createEncodingProperties();
// Création du codec
compressor = createCompressor(conf, props);
// Export
writeData();
} catch (AbortedException ex) {
aborted = true;
throw new MyException("Annulation de l'export par
l'utilisateur");
} catch (OutOfMemoryError ex) {
hasError = true;
throw ex; // on re-transmet
} catch (IOException | MyException ex) {
hasError = true;
throw new MyException("Impossible d'exporter la table",
ex);
} catch (IllegalArgumentException ex) {
hasError = true;
throw new MyException("Impossible d'exporter la table à
cause d'un paramètre invalide", ex);
} catch (Throwable thex) {
hasError = true;
// toute autre erreur de type non géré, on génère une
erreur interne
throw new MyInternalException("Impossible d'exporter la
table", thex);
} finally {
// On change le nom car cela peut etre long (et non
interruptible) si ecriture sur lecteur réseau
Logger.log_INFO2(" -> Closing IO streams ...");
if (fileWriter != null)
try {
fileWriter.close();
} catch (IOException ex) {}
taskFinished();
if (aborted || hasError) {
// en cas d'annulation, on doit supprimer le
fichier, qui a toutes les chances
// d'être dans un état corrompu
try {
Files.deleteIfExists(Path.of(exportParams.pathFile));
} catch (IOException ex) {
}
}
}
if (hasError) {
throw new OutOfMemoryError("Impossible d'exporter la
table, pas assez de mémoire.\n-> Essayer de réduire le paramètre RowGroupSize");
}
UtilsTime.stopTimer("export parquet");
Logger.log_INFO2(" -> fin de l'export");
Logger.log_INFO2(" -> durée de l'export : " +
UtilsTime.getDuree_ms("export parquet") + " ms");
Logger.log_INFO1("***********************************************************");
}
private void writeData() throws IOException {
fileWriter.start();
int nbLineWrite = exportParams.getNbLineToWrite();
int nbRowByRowGroup = estimNbRowForRowGroup();
int nbRowGroup = nbLineWrite / nbRowByRowGroup;
if (nbRowGroup == 0)
nbRowGroup = 1;
Logger.log_INFO2(" - nb row group : " + nbRowGroup);
int nbCol = exportParams.getNbColToWrite();
// Pour chaque RowGroup
for (int numRowGroup = 0; numRowGroup < nbRowGroup; numRowGroup
++) {
// Création du RowGroup
initStore();
int row1 = ParallelUtils.getStartBlock(nbLineWrite,
numRowGroup, nbRowGroup);
int row2 = ParallelUtils.getEndBlock (nbLineWrite,
numRowGroup, nbRowGroup);
IntStream stream =
ob.common.thread.ParallelUtils.intStream(0, nbCol, exportParams.multiThread);
stream.forEach(c -> {
if (!hasError) {
exportColCurrentRowGroup(c, row1, row2);
}
});
if (hasError)
break;
recordCount += row2 - row1;
// On declare les records ajoutés
for (int i = 0; i <recordCount; i++) {
columnStore.endRecord();
}
// Ecriture du rowgroup et démarrage d'un nouveau
flushRowGroupToStore();
rowGroupOrdinal++;
Logger.log_INFO2(" - fin row group n° " + numRowGroup);
}
close();
}
private synchronized void toto() {
long ram = 0;
for (int c = 0; c <columnWriters.length; c++) {
ColumnWriter w = columnWriters[c];
ram += w.getBufferedSizeInMemory();
}
System.out.println("!! RAM utilisée : " + ram);
}
private void exportColCurrentRowGroup(int c, int row1, int row2) {
Column col = exportParams.vColToExport.get(c);
ColumnWriter w = columnWriters[c];
try {
switch (col.getDataType()) {
case DBL -> {
if (exportParams.doubleAsFloat) {
for (int i = row1; i < row2; i++) {
if (hasError) break;
int rowToWrite =
exportParams.rowsToExport.getRow_0b(i);
double val =
col.getValue_DBL_fast(rowToWrite);
if (!UtilsData.isNull(val))
w.write((float)val, 0,
1);
else
// Pour écrire null en
parquet, il faut ecrire avec un definitionLevel de 0
w.writeNull(0, 0);
}
} else {
for (int i = row1; i < row2; i++) {
if (hasError) break;
int rowToWrite =
exportParams.rowsToExport.getRow_0b(i);
double val =
col.getValue_DBL_fast(rowToWrite);
if (!UtilsData.isNull(val))
w.write(val, 0, 1);
else
// Pour écrire null en
parquet, il faut ecrire avec un definitionLevel de 0
w.writeNull(0, 0);
}
}
}
case INT -> {
for (int i = row1; i < row2; i++) {
if (hasError) break;
int rowToWrite =
exportParams.rowsToExport.getRow_0b(i);
int val =
col.getValue_INT_fast(rowToWrite);
if (!UtilsData.isNull(val))
w.write(val, 0, 1);
else
// Pour écrire null en parquet,
il faut ecrire avec un definitionLevel de 0
w.writeNull(0, 0);
}
}
case STR -> {
String lastVal = null;
Binary lastBinary = null;
for (int i = row1; i < row2; i++) {
if (hasError) break;
int rowToWrite =
exportParams.rowsToExport.getRow_0b(i);
String val =
col.getValue_STR_fast(rowToWrite);
if (val == null)
throw new MyException("String
null non encore gérées pour l'export au format parquet");
// mini cache String si 2 valeurs
successives égales
if (val.equals(lastVal))
w.write(lastBinary, 0, 1);
else {
lastVal = val;
lastBinary =
Binary.fromString(val);
w.write(lastBinary, 0, 1);
}
}
}
case EMPTY -> {
// ecriture de null de type int (les col EMPTY
ont été définies comme INT32 dans le schéma
for (int i = row1; i < row2; i++) {
if (hasError) break;
// Pour écrire null en parquet, il faut
ecrire n'importe quoi, mais avec un definitionLevel de 0
w.writeNull(0, 0);
}
}
default -> {
throw new MyException("Type de colonne non pris
en charge pour un export en parquet");
}
}
} catch (OutOfMemoryError err) {
hasError = true;
w.close(); // libération memoire, permettant à ZAZ de
survivre si possible
}
}
private int estimNbRowForRowGroup() {
int sizeByte1Row = exportParams.getSize1row();
int nbRow = (int) (exportParams.sizeRowGroup_B / sizeByte1Row);
if (nbRow<4) // cas pathologique
nbRow = 4;
return nbRow;
}
// A appeler pour chaque nouveau RowGroup
// Un peu long, donc il faut veiller à ne pas trop limiter la taille
des RowGroup
private void initStore() {
ColumnChunkPageWriteStore columnChunkPageWriteStore = new
ColumnChunkPageWriteStore(
compressor,
schema,
props.getAllocator(),
props.getColumnIndexTruncateLength(),
props.getPageWriteChecksumEnabled(),
fileWriter.getEncryptor(),
rowGroupOrdinal);
pageStore = columnChunkPageWriteStore;
bloomFilterWriteStore = columnChunkPageWriteStore;
columnStore = props.newColumnWriteStore(schema, pageStore,
bloomFilterWriteStore);
MessageColumnIO columnIO = new
ColumnIOFactory(true).getColumnIO(schema);
// creation des ColumnWriter
int nbCol = exportParams.getNbColToWrite();
this.columnWriters = new ColumnWriter[nbCol];
for (int c = 0; c <nbCol; c++) {
ColumnWriter w =
columnStore.getColumnWriter(columnIO.getLeaves().get(c).getColumnDescriptor());
columnWriters[c] = w;
}
}
public void close() throws IOException {
if (!closed) {
try {
if (aborted)
return;
//flushRowGroupToStore(); // déjà fait, en fait
fileWriter.end(new HashMap<>(0));
} finally {
AutoCloseables.uncheckedClose(columnStore,
pageStore, bloomFilterWriteStore, fileWriter);
closed = true;
}
}
}
// Ecriture du rowgroup
private void flushRowGroupToStore() throws IOException {
try {
if (recordCount > 0) {
fileWriter.startBlock(recordCount);
columnStore.flush();
pageStore.flushToFileWriter(fileWriter);
fileWriter.endBlock();
recordCount = 0;
}
} finally {
AutoCloseables.uncheckedClose(columnStore, pageStore,
bloomFilterWriteStore);
columnStore = null;
pageStore = null;
bloomFilterWriteStore = null;
}
}
private MessageType createSchema() {
Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
for (int i = 0; i <exportParams.getNbColToWrite(); i++) {
Column col = exportParams.vColToExport.get(i);
switch (col.getDataType()) {
// on met optional pour pouvoir mettre des
valeurs null
case DBL -> {
if (exportParams.doubleAsFloat)
schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(col.getName());
else
schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(col.getName());
}
case INT -> {
schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(col.getName());
}
case STR -> {
schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named(col.getName());
}
case EMPTY -> {
// on met INT32 par défaut
schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(col.getName());
}
default -> {
throw new MyException("Type de colonne
non pris en charge pour un export en parquet");
}
}
}
return schemaBuilder.named("schema");
}
private Configuration createConfiguration() throws IOException {
// Configuration Hadoop
Configuration config = new Configuration();
// C'est dans cet objet que l'on peut spécifier les taux de
compression des divers codec
// GZIP :
if (exportParams.compressionLevel <= 9) // peut etre supérieur
pour d'autres codec (ex. ZSTD)
config.set("zlib.compress.level",
ZlibCompressor.CompressionLevel.values()[exportParams.compressionLevel].name());
// cf. ZlibCompressor.CompressionLevel
// ZSTD :
config.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL,
exportParams.compressionLevel);
//config.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS,
8); a quoi ca sert ??? mettre 8 au lieu de 0 semble ralentir
return config;
}
private ParquetFileWriter createFileWriter() throws IOException {
// on doit utiliser LocalOutputFile pour qu'il n'y ait pas
besoin de binaries hadoop
OutputFile out = new LocalOutputFile(new
File(exportParams.pathFile).toPath());
return new ParquetFileWriter(out, schema,
ParquetFileWriter.Mode.OVERWRITE,
ParquetWriter.DEFAULT_BLOCK_SIZE,ParquetWriter.MAX_PADDING_SIZE_DEFAULT,
ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
false//ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED
);
}
private ParquetProperties createEncodingProperties() {
ParquetProperties.Builder encodingPropsBuilder =
ParquetProperties.builder();
encodingPropsBuilder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0);
encodingPropsBuilder.withStatisticsEnabled(false); //
n'accélere pas ?? à rechecker
// voir l'intéret ...
// avec nos données (ex. LF) ce paramétrage ne semble pas
diminuer la taille sauf si forcage en FLOAT
encodingPropsBuilder.withByteStreamSplitEncoding(exportParams.bByteSplitting);
// Dictionnaire
encodingPropsBuilder.withDictionaryEncoding(exportParams.bWithDictionnary);
encodingPropsBuilder.withDictionaryPageSize(exportParams.dictionnaryMaxSize_B);
// paramétrage spécifiques pour certaines colonnes
for (Column col : exportParams.vColWithDictionnary)
encodingPropsBuilder.withDictionaryEncoding(col.getName(), true);
for (Column col : exportParams.vColNoDictionnary)
encodingPropsBuilder.withDictionaryEncoding(col.getName(), false);
encodingPropsBuilder.withPageSize(exportParams.pageSize_B);
encodingPropsBuilder.withPageRowCountLimit(exportParams.pageRowCountLimit);
encodingPropsBuilder.withMinRowCountForPageSizeCheck(1000); //
100 par defaut ralentit un peu ?
// La taille du rowGroup sera géré par nous-même dans la
fonction writeData()
// -> gestion de façon approxiative mais efficace lors de
l'écriture, donc pas besoin de le paramétrer ici
//encodingPropsBuilder.withRowGroupSize(128l * 1024 * 1024); //
128MB = par defaut
// On complète avec des propriétés additionnelles
// mais qui nécessitent d'avoir déjà un fichier properties ...
(bof, mais bon)
// c'est le cas d'un ValuesWriterFactory perso, mais qui a
besoin d'un properties
// déjà buildé pour la création des ValueWriter
ParquetProperties propsTemp = encodingPropsBuilder.build();
encodingPropsBuilder = ParquetProperties.copy(propsTemp);
return encodingPropsBuilder.build();
}
private CompressionCodecFactory.BytesInputCompressor
createCompressor(Configuration conf, ParquetProperties props) {
... SNAPPY
}
}
```
### Component(s)
Benchmark, Core
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]