Hey, great question. I just caught up on the other thread, but let me provide some context here.
Spark uses the stats estimation here to determine whether or not to broadcast. If we returned a default value, then Spark wouldn't be able to use Iceberg tables in broadcast joins. Even though Spark won't push filters down before calling this in 2.4, it is still better in many cases to return the size of the entire table. I think it would make sense to be able to disable this using a table property, when you know that the table is going to be too large to broadcast un-filtered. Also, this is fixed in Spark 3 because we added rules to push filters into the scan before stats are returned. On Fri, Jul 17, 2020 at 1:01 PM Sud <sudssf2...@gmail.com> wrote: > As per java doc estimateStatistics does not take into account any > operators, any reason why iceberg reader implements this? I wonder if it > would help to make it configurable and return default value. > > /** > * A mix in interface for {@link DataSourceReader}. Data source readers can > implement this > * interface to report statistics to Spark. > * > * As of Spark 2.4, statistics are reported to the optimizer before any > operator is pushed to the > * DataSourceReader. Implementations that return more accurate statistics > based on pushed operators > * will not improve query performance until the planner can push operators > before getting stats. > */ > > > > On Fri, Jul 17, 2020 at 12:35 PM Sud <sudssf2...@gmail.com> wrote: > >> ok after adding more instrumentation I see that >> Reader::estimateStatistics may be a culprit. >> >> looks like estimated stats may be performing full table estimate and >> thats why it is so slow. does any one know if it is possible to >> avoid Reader::estimateStatistics? >> >> Also does estimateStatistics use appropriate filters ( I have seen same >> issue where performing unions) was very slow and estimateStatistics use to >> take lot of time. >> >> @Override >> public Statistics estimateStatistics() { >> long sizeInBytes = 0L; >> long numRows = 0L; >> >> for (CombinedScanTask task : tasks()) { >> for (FileScanTask file : task.files()) { >> sizeInBytes += file.length(); >> numRows += file.file().recordCount(); >> } >> } >> >> return new Stats(sizeInBytes, numRows); >> } >> >> private List<CombinedScanTask> tasks() { >> if (tasks == null) { >> TableScan scan = table >> .newScan() >> .caseSensitive(caseSensitive) >> .project(lazySchema()); >> >> if (snapshotId != null) { >> scan = scan.useSnapshot(snapshotId); >> } >> >> if (asOfTimestamp != null) { >> scan = scan.asOfTime(asOfTimestamp); >> } >> >> if (splitSize != null) { >> scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); >> } >> >> if (splitLookback != null) { >> scan = scan.option(TableProperties.SPLIT_LOOKBACK, >> splitLookback.toString()); >> } >> >> if (splitOpenFileCost != null) { >> scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, >> splitOpenFileCost.toString()); >> } >> >> if (filterExpressions != null) { >> for (Expression filter : filterExpressions) { >> scan = scan.filter(filter); >> } >> } >> >> try (CloseableIterable<CombinedScanTask> tasksIterable = >> scan.planTasks()) { >> this.tasks = Lists.newArrayList(tasksIterable); >> } catch (IOException e) { >> throw new RuntimeIOException(e, "Failed to close table scan: %s", >> scan); >> } >> } >> >> return tasks; >> } >> >> >> On Fri, Jul 17, 2020 at 9:35 AM Sud <sudssf2...@gmail.com> wrote: >> >>> Thanks @Jingsong for reply >>> >>> Yes one additional data point about the table. >>> This table is avro table and generated from stream ingestion. We expect >>> a couple of thousand snapshots created daily. >>> >>> We are using appendsBetween API , I am I think any compaction operation >>> will break the API. but I will take a look at compacting datafiles and >>> manifest files. >>> >>> if I understand what you mentioned above, is the ManifestGroup reader >>> slow? I wonder if it will help if we add a debug log and log timer for some >>> of these operations. >>> >>> I will keep you updated >>> >>> -- >>> Thanks >>> >>> On Fri, Jul 17, 2020 at 1:50 AM Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>>> Hi Sud, >>>> >>>> The batch read of the Iceberg table should just read the latest >>>> snapshot. >>>> I think this case is that your large tables have a large number of >>>> manifest files. >>>> >>>> 1.The simple way is reducing manifest file numbers: >>>> - For reducing manifest file number, you can try >>>> `Actions.rewriteManifests`(Thanks Anton) to compact manifest files. >>>> - If there are too many small data files, leading to too many manifest >>>> files even after compacting, you can try `Actions.rewriteDataFiles`(Thanks >>>> jerryshao) to compact data files. >>>> >>>> 2.Another way is parallelizing the opening of the manifest reader. >>>> Your stack looks like the thread is stuck in >>>> `DataFileReader.openReader`, at least, it will need to read magic bytes >>>> from the input stream in `open`, and this looks slow. (At least, the input >>>> stream needs to read an IO block) >>>> So can we make `DataFileReader.openReader` parallelize? We should make >>>> `ManifestGroup.entries` returns a parallel iterable. >>>> >>>> Best, >>>> Jingsong >>>> >>>> On Fri, Jul 17, 2020 at 7:23 AM Sud <sudssf2...@gmail.com> wrote: >>>> >>>>> HI Iceberg-devs >>>>> >>>>> We are trying to root cause issue where driver get stuck when trying >>>>> to read comparatively large tables ( > 2000 snapshots) >>>>> >>>>> When I tried to look at the thread dump of the driver's main thread I >>>>> see that thread is stuck in planning tasks. I also noticed that >>>>> iceberg-worker-pool >>>>> is idle and mostly 1 thread is active. >>>>> Has anyone faced a similar issue? what parameters can be used to >>>>> optimize reads for tables with large number of snapshots ( with smaller >>>>> data files) >>>>> >>>>> >>>>> java.net.SocketInputStream.socketRead0(Native Method) >>>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) >>>>> java.net.SocketInputStream.read(SocketInputStream.java:171) >>>>> java.net.SocketInputStream.read(SocketInputStream.java:141) >>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:465) >>>>> sun.security.ssl.InputRecord.read(InputRecord.java:503) >>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990) => >>>>> holding Monitor(java.lang.Object@580832623}) >>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948) >>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding >>>>> Monitor(sun.security.ssl.AppInputStream@894541691}) >>>>> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) >>>>> org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) >>>>> org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) >>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) >>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) >>>>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) >>>>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) >>>>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) >>>>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) >>>>> com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82) >>>>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) >>>>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) >>>>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >>>>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >>>>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) >>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) >>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) >>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) >>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926) >>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872) >>>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1472) >>>>> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:148) >>>>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281) >>>>> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364) => >>>>> holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1577065760}) >>>>> java.io.DataInputStream.read(DataInputStream.java:149) >>>>> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:112) >>>>> org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:106) >>>>> org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55) >>>>> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94) >>>>> org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66) >>>>> org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:141) >>>>> org.apache.iceberg.ManifestReader.read(ManifestReader.java:119) >>>>> org.apache.iceberg.ManifestGroup.lambda$entries$13(ManifestGroup.java:212) >>>>> org.apache.iceberg.ManifestGroup$$Lambda$89/1042679820.apply(Unknown >>>>> Source) >>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$6.transform(Iterators.java:783) >>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:113) >>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:100) >>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:137) >>>>> => holding >>>>> Monitor(org.apache.iceberg.util.ParallelIterable$ParallelIterator@1724178871}) >>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42) >>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42) >>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309) >>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325) >>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:111) >>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:87) >>>>> org.apache.iceberg.io.CloseableIterable$3$1.next(CloseableIterable.java:94) >>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators.addAll(Iterators.java:356) >>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:143) >>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:130) >>>>> org.apache.iceberg.spark.source.Reader.tasks(Reader.java:295) >>>>> org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:196) >>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76) >>>>> => holding >>>>> Monitor(org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec@853913013}) >>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75) >>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65) >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks >>>>> >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> -- Ryan Blue Software Engineer Netflix