Thanks Ryan and Jingsong , I will add one more TODO to see if we can use spark to parallelize estimation even when predicate pushdown is done. ( spark does this for file system based tables) and possibly for manifest readers.
I will try to submit PR upstream for adding options and will create issues for TODOs -- Thanks On Fri, Jul 17, 2020 at 9:25 PM Jingsong Li <jingsongl...@gmail.com> wrote: > Thanks Sud for in-depth debugging. And thanks Ryan for the explanation. > > +1 to have a table property to disable stats estimation. > > IIUC, the difference between stats estimation and scan with filters is > mainly in the partition filters: > Iceberg uses filter-push-down to complete partition pruning. So the stats > estimation will see all partitions of the table, but actually need to > read a small amount of partitions. > > - Scan with partition filters only needs to read manifest list files, and > do partition pruning, select only the manifest that contains specific > partitions. > - Scan without partition filters needs to read all manifest files. > > Best, > Jingsong > > On Sat, Jul 18, 2020 at 6:13 AM Ryan Blue <rb...@netflix.com.invalid> > wrote: > >> Hey, great question. I just caught up on the other thread, but let me >> provide some context here. >> >> Spark uses the stats estimation here to determine whether or not to >> broadcast. If we returned a default value, then Spark wouldn't be able to >> use Iceberg tables in broadcast joins. Even though Spark won't push filters >> down before calling this in 2.4, it is still better in many cases to return >> the size of the entire table. >> >> I think it would make sense to be able to disable this using a table >> property, when you know that the table is going to be too large to >> broadcast un-filtered. >> >> Also, this is fixed in Spark 3 because we added rules to push filters >> into the scan before stats are returned. >> >> On Fri, Jul 17, 2020 at 1:01 PM Sud <sudssf2...@gmail.com> wrote: >> >>> As per java doc estimateStatistics does not take into account any >>> operators, any reason why iceberg reader implements this? I wonder if it >>> would help to make it configurable and return default value. >>> >>> /** >>> * A mix in interface for {@link DataSourceReader}. Data source readers can >>> implement this >>> * interface to report statistics to Spark. >>> * >>> * As of Spark 2.4, statistics are reported to the optimizer before any >>> operator is pushed to the >>> * DataSourceReader. Implementations that return more accurate statistics >>> based on pushed operators >>> * will not improve query performance until the planner can push operators >>> before getting stats. >>> */ >>> >>> >>> >>> On Fri, Jul 17, 2020 at 12:35 PM Sud <sudssf2...@gmail.com> wrote: >>> >>>> ok after adding more instrumentation I see that >>>> Reader::estimateStatistics may be a culprit. >>>> >>>> looks like estimated stats may be performing full table estimate and >>>> thats why it is so slow. does any one know if it is possible to >>>> avoid Reader::estimateStatistics? >>>> >>>> Also does estimateStatistics use appropriate filters ( I have seen same >>>> issue where performing unions) was very slow and estimateStatistics use to >>>> take lot of time. >>>> >>>> @Override >>>> public Statistics estimateStatistics() { >>>> long sizeInBytes = 0L; >>>> long numRows = 0L; >>>> >>>> for (CombinedScanTask task : tasks()) { >>>> for (FileScanTask file : task.files()) { >>>> sizeInBytes += file.length(); >>>> numRows += file.file().recordCount(); >>>> } >>>> } >>>> >>>> return new Stats(sizeInBytes, numRows); >>>> } >>>> >>>> private List<CombinedScanTask> tasks() { >>>> if (tasks == null) { >>>> TableScan scan = table >>>> .newScan() >>>> .caseSensitive(caseSensitive) >>>> .project(lazySchema()); >>>> >>>> if (snapshotId != null) { >>>> scan = scan.useSnapshot(snapshotId); >>>> } >>>> >>>> if (asOfTimestamp != null) { >>>> scan = scan.asOfTime(asOfTimestamp); >>>> } >>>> >>>> if (splitSize != null) { >>>> scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); >>>> } >>>> >>>> if (splitLookback != null) { >>>> scan = scan.option(TableProperties.SPLIT_LOOKBACK, >>>> splitLookback.toString()); >>>> } >>>> >>>> if (splitOpenFileCost != null) { >>>> scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, >>>> splitOpenFileCost.toString()); >>>> } >>>> >>>> if (filterExpressions != null) { >>>> for (Expression filter : filterExpressions) { >>>> scan = scan.filter(filter); >>>> } >>>> } >>>> >>>> try (CloseableIterable<CombinedScanTask> tasksIterable = >>>> scan.planTasks()) { >>>> this.tasks = Lists.newArrayList(tasksIterable); >>>> } catch (IOException e) { >>>> throw new RuntimeIOException(e, "Failed to close table scan: %s", >>>> scan); >>>> } >>>> } >>>> >>>> return tasks; >>>> } >>>> >>>> >>>> On Fri, Jul 17, 2020 at 9:35 AM Sud <sudssf2...@gmail.com> wrote: >>>> >>>>> Thanks @Jingsong for reply >>>>> >>>>> Yes one additional data point about the table. >>>>> This table is avro table and generated from stream ingestion. We >>>>> expect a couple of thousand snapshots created daily. >>>>> >>>>> We are using appendsBetween API , I am I think any compaction >>>>> operation will break the API. but I will take a look at compacting >>>>> datafiles and manifest files. >>>>> >>>>> if I understand what you mentioned above, is the ManifestGroup reader >>>>> slow? I wonder if it will help if we add a debug log and log timer for >>>>> some >>>>> of these operations. >>>>> >>>>> I will keep you updated >>>>> >>>>> -- >>>>> Thanks >>>>> >>>>> On Fri, Jul 17, 2020 at 1:50 AM Jingsong Li <jingsongl...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Sud, >>>>>> >>>>>> The batch read of the Iceberg table should just read the latest >>>>>> snapshot. >>>>>> I think this case is that your large tables have a large number of >>>>>> manifest files. >>>>>> >>>>>> 1.The simple way is reducing manifest file numbers: >>>>>> - For reducing manifest file number, you can try >>>>>> `Actions.rewriteManifests`(Thanks Anton) to compact manifest files. >>>>>> - If there are too many small data files, leading to too many >>>>>> manifest files even after compacting, you can >>>>>> try `Actions.rewriteDataFiles`(Thanks jerryshao) to compact data files. >>>>>> >>>>>> 2.Another way is parallelizing the opening of the manifest reader. >>>>>> Your stack looks like the thread is stuck in >>>>>> `DataFileReader.openReader`, at least, it will need to read magic bytes >>>>>> from the input stream in `open`, and this looks slow. (At least, the >>>>>> input >>>>>> stream needs to read an IO block) >>>>>> So can we make `DataFileReader.openReader` parallelize? We should >>>>>> make `ManifestGroup.entries` returns a parallel iterable. >>>>>> >>>>>> Best, >>>>>> Jingsong >>>>>> >>>>>> On Fri, Jul 17, 2020 at 7:23 AM Sud <sudssf2...@gmail.com> wrote: >>>>>> >>>>>>> HI Iceberg-devs >>>>>>> >>>>>>> We are trying to root cause issue where driver get stuck when trying >>>>>>> to read comparatively large tables ( > 2000 snapshots) >>>>>>> >>>>>>> When I tried to look at the thread dump of the driver's main thread >>>>>>> I see that thread is stuck in planning tasks. I also noticed that >>>>>>> iceberg-worker-pool >>>>>>> is idle and mostly 1 thread is active. >>>>>>> Has anyone faced a similar issue? what parameters can be used to >>>>>>> optimize reads for tables with large number of snapshots ( with smaller >>>>>>> data files) >>>>>>> >>>>>>> >>>>>>> java.net.SocketInputStream.socketRead0(Native Method) >>>>>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116) >>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:171) >>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:141) >>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:465) >>>>>>> sun.security.ssl.InputRecord.read(InputRecord.java:503) >>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990) => >>>>>>> holding Monitor(java.lang.Object@580832623}) >>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948) >>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => >>>>>>> holding Monitor(sun.security.ssl.AppInputStream@894541691}) >>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) >>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) >>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) >>>>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) >>>>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) >>>>>>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) >>>>>>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) >>>>>>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) >>>>>>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) >>>>>>> com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82) >>>>>>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) >>>>>>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) >>>>>>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >>>>>>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >>>>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >>>>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >>>>>>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) >>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) >>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) >>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) >>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926) >>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872) >>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1472) >>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:148) >>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281) >>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364) >>>>>>> => holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1577065760}) >>>>>>> java.io.DataInputStream.read(DataInputStream.java:149) >>>>>>> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:112) >>>>>>> org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:106) >>>>>>> org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55) >>>>>>> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94) >>>>>>> org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66) >>>>>>> org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:141) >>>>>>> org.apache.iceberg.ManifestReader.read(ManifestReader.java:119) >>>>>>> org.apache.iceberg.ManifestGroup.lambda$entries$13(ManifestGroup.java:212) >>>>>>> org.apache.iceberg.ManifestGroup$$Lambda$89/1042679820.apply(Unknown >>>>>>> Source) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$6.transform(Iterators.java:783) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:113) >>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:100) >>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:137) >>>>>>> => holding >>>>>>> Monitor(org.apache.iceberg.util.ParallelIterable$ParallelIterator@1724178871}) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325) >>>>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:111) >>>>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:87) >>>>>>> org.apache.iceberg.io.CloseableIterable$3$1.next(CloseableIterable.java:94) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators.addAll(Iterators.java:356) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:143) >>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:130) >>>>>>> org.apache.iceberg.spark.source.Reader.tasks(Reader.java:295) >>>>>>> org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:196) >>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76) >>>>>>> => holding >>>>>>> Monitor(org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec@853913013}) >>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75) >>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best, Jingsong Lee >>>>>> >>>>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > > > -- > Best, Jingsong Lee >