As per java doc estimateStatistics does not take into account any operators, any reason why iceberg reader implements this? I wonder if it would help to make it configurable and return default value.
/** * A mix in interface for {@link DataSourceReader}. Data source readers can implement this * interface to report statistics to Spark. * * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the * DataSourceReader. Implementations that return more accurate statistics based on pushed operators * will not improve query performance until the planner can push operators before getting stats. */ On Fri, Jul 17, 2020 at 12:35 PM Sud <sudssf2...@gmail.com> wrote: > ok after adding more instrumentation I see that Reader::estimateStatistics > may be a culprit. > > looks like estimated stats may be performing full table estimate and thats > why it is so slow. does any one know if it is possible to > avoid Reader::estimateStatistics? > > Also does estimateStatistics use appropriate filters ( I have seen same > issue where performing unions) was very slow and estimateStatistics use to > take lot of time. > > @Override > public Statistics estimateStatistics() { > long sizeInBytes = 0L; > long numRows = 0L; > > for (CombinedScanTask task : tasks()) { > for (FileScanTask file : task.files()) { > sizeInBytes += file.length(); > numRows += file.file().recordCount(); > } > } > > return new Stats(sizeInBytes, numRows); > } > > private List<CombinedScanTask> tasks() { > if (tasks == null) { > TableScan scan = table > .newScan() > .caseSensitive(caseSensitive) > .project(lazySchema()); > > if (snapshotId != null) { > scan = scan.useSnapshot(snapshotId); > } > > if (asOfTimestamp != null) { > scan = scan.asOfTime(asOfTimestamp); > } > > if (splitSize != null) { > scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); > } > > if (splitLookback != null) { > scan = scan.option(TableProperties.SPLIT_LOOKBACK, > splitLookback.toString()); > } > > if (splitOpenFileCost != null) { > scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, > splitOpenFileCost.toString()); > } > > if (filterExpressions != null) { > for (Expression filter : filterExpressions) { > scan = scan.filter(filter); > } > } > > try (CloseableIterable<CombinedScanTask> tasksIterable = > scan.planTasks()) { > this.tasks = Lists.newArrayList(tasksIterable); > } catch (IOException e) { > throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); > } > } > > return tasks; > } > > > On Fri, Jul 17, 2020 at 9:35 AM Sud <sudssf2...@gmail.com> wrote: > >> Thanks @Jingsong for reply >> >> Yes one additional data point about the table. >> This table is avro table and generated from stream ingestion. We expect a >> couple of thousand snapshots created daily. >> >> We are using appendsBetween API , I am I think any compaction operation >> will break the API. but I will take a look at compacting datafiles and >> manifest files. >> >> if I understand what you mentioned above, is the ManifestGroup reader >> slow? I wonder if it will help if we add a debug log and log timer for some >> of these operations. >> >> I will keep you updated >> >> -- >> Thanks >> >> On Fri, Jul 17, 2020 at 1:50 AM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi Sud, >>> >>> The batch read of the Iceberg table should just read the latest snapshot. >>> I think this case is that your large tables have a large number of >>> manifest files. >>> >>> 1.The simple way is reducing manifest file numbers: >>> - For reducing manifest file number, you can try >>> `Actions.rewriteManifests`(Thanks Anton) to compact manifest files. >>> - If there are too many small data files, leading to too many manifest >>> files even after compacting, you can try `Actions.rewriteDataFiles`(Thanks >>> jerryshao) to compact data files. >>> >>> 2.Another way is parallelizing the opening of the manifest reader. >>> Your stack looks like the thread is stuck in >>> `DataFileReader.openReader`, at least, it will need to read magic bytes >>> from the input stream in `open`, and this looks slow. (At least, the input >>> stream needs to read an IO block) >>> So can we make `DataFileReader.openReader` parallelize? We should make >>> `ManifestGroup.entries` returns a parallel iterable. >>> >>> Best, >>> Jingsong >>> >>> On Fri, Jul 17, 2020 at 7:23 AM Sud <sudssf2...@gmail.com> wrote: >>> >>>> HI Iceberg-devs >>>> >>>> We are trying to root cause issue where driver get stuck when trying to >>>> read comparatively large tables ( > 2000 snapshots) >>>> >>>> When I tried to look at the thread dump of the driver's main thread I >>>> see that thread is stuck in planning tasks. I also noticed that >>>> iceberg-worker-pool >>>> is idle and mostly 1 thread is active. >>>> Has anyone faced a similar issue? what parameters can be used to >>>> optimize reads for tables with large number of snapshots ( with smaller >>>> data files) >>>> >>>> >>>> java.net.SocketInputStream.socketRead0(Native Method) >>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) >>>> java.net.SocketInputStream.read(SocketInputStream.java:171) >>>> java.net.SocketInputStream.read(SocketInputStream.java:141) >>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:465) >>>> sun.security.ssl.InputRecord.read(InputRecord.java:503) >>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990) => >>>> holding Monitor(java.lang.Object@580832623}) >>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948) >>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding >>>> Monitor(sun.security.ssl.AppInputStream@894541691}) >>>> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) >>>> org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) >>>> org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) >>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) >>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) >>>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) >>>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) >>>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) >>>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) >>>> com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82) >>>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) >>>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) >>>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >>>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >>>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) >>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) >>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) >>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) >>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926) >>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872) >>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1472) >>>> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:148) >>>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281) >>>> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364) => >>>> holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1577065760}) >>>> java.io.DataInputStream.read(DataInputStream.java:149) >>>> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:112) >>>> org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:106) >>>> org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55) >>>> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94) >>>> org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66) >>>> org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:141) >>>> org.apache.iceberg.ManifestReader.read(ManifestReader.java:119) >>>> org.apache.iceberg.ManifestGroup.lambda$entries$13(ManifestGroup.java:212) >>>> org.apache.iceberg.ManifestGroup$$Lambda$89/1042679820.apply(Unknown >>>> Source) >>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$6.transform(Iterators.java:783) >>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:113) >>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:100) >>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:137) >>>> => holding >>>> Monitor(org.apache.iceberg.util.ParallelIterable$ParallelIterator@1724178871}) >>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42) >>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42) >>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309) >>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325) >>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:111) >>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:87) >>>> org.apache.iceberg.io.CloseableIterable$3$1.next(CloseableIterable.java:94) >>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators.addAll(Iterators.java:356) >>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:143) >>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:130) >>>> org.apache.iceberg.spark.source.Reader.tasks(Reader.java:295) >>>> org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:196) >>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76) >>>> => holding >>>> Monitor(org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec@853913013}) >>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75) >>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65) >>>> >>>> >>>> >>>> >>>> -- >>>> Thanks >>>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >>