Thanks Sud for in-depth debugging. And thanks Ryan for the explanation.

+1 to have a table property to disable stats estimation.

IIUC, the difference between stats estimation and scan with filters is
mainly in the partition filters:
Iceberg uses filter-push-down to complete partition pruning. So the stats
estimation will see all partitions of the table, but actually need to
read a small amount of partitions.

- Scan with partition filters only needs to read manifest list files, and
do partition pruning, select only the manifest that contains specific
partitions.
- Scan without partition filters needs to read all manifest files.

Best,
Jingsong

On Sat, Jul 18, 2020 at 6:13 AM Ryan Blue <rb...@netflix.com.invalid> wrote:

> Hey, great question. I just caught up on the other thread, but let me
> provide some context here.
>
> Spark uses the stats estimation here to determine whether or not to
> broadcast. If we returned a default value, then Spark wouldn't be able to
> use Iceberg tables in broadcast joins. Even though Spark won't push filters
> down before calling this in 2.4, it is still better in many cases to return
> the size of the entire table.
>
> I think it would make sense to be able to disable this using a table
> property, when you know that the table is going to be too large to
> broadcast un-filtered.
>
> Also, this is fixed in Spark 3 because we added rules to push filters into
> the scan before stats are returned.
>
> On Fri, Jul 17, 2020 at 1:01 PM Sud <sudssf2...@gmail.com> wrote:
>
>> As per java doc estimateStatistics does not take into account any
>> operators, any reason why iceberg reader implements this? I wonder if it
>> would help to make it configurable and return default value.
>>
>> /**
>>  * A mix in interface for {@link DataSourceReader}. Data source readers can 
>> implement this
>>  * interface to report statistics to Spark.
>>  *
>>  * As of Spark 2.4, statistics are reported to the optimizer before any 
>> operator is pushed to the
>>  * DataSourceReader. Implementations that return more accurate statistics 
>> based on pushed operators
>>  * will not improve query performance until the planner can push operators 
>> before getting stats.
>>  */
>>
>>
>>
>> On Fri, Jul 17, 2020 at 12:35 PM Sud <sudssf2...@gmail.com> wrote:
>>
>>> ok after adding more instrumentation I see that
>>> Reader::estimateStatistics may be a culprit.
>>>
>>> looks like estimated stats may be performing full table estimate and
>>> thats why it is so slow. does any one know if it is possible to
>>> avoid Reader::estimateStatistics?
>>>
>>> Also does estimateStatistics use appropriate filters ( I have seen same
>>> issue where performing unions) was very slow and estimateStatistics use to
>>> take lot of time.
>>>
>>> @Override
>>> public Statistics estimateStatistics() {
>>>   long sizeInBytes = 0L;
>>>   long numRows = 0L;
>>>
>>>   for (CombinedScanTask task : tasks()) {
>>>     for (FileScanTask file : task.files()) {
>>>       sizeInBytes += file.length();
>>>       numRows += file.file().recordCount();
>>>     }
>>>   }
>>>
>>>   return new Stats(sizeInBytes, numRows);
>>> }
>>>
>>> private List<CombinedScanTask> tasks() {
>>>   if (tasks == null) {
>>>     TableScan scan = table
>>>         .newScan()
>>>         .caseSensitive(caseSensitive)
>>>         .project(lazySchema());
>>>
>>>     if (snapshotId != null) {
>>>       scan = scan.useSnapshot(snapshotId);
>>>     }
>>>
>>>     if (asOfTimestamp != null) {
>>>       scan = scan.asOfTime(asOfTimestamp);
>>>     }
>>>
>>>     if (splitSize != null) {
>>>       scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
>>>     }
>>>
>>>     if (splitLookback != null) {
>>>       scan = scan.option(TableProperties.SPLIT_LOOKBACK, 
>>> splitLookback.toString());
>>>     }
>>>
>>>     if (splitOpenFileCost != null) {
>>>       scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, 
>>> splitOpenFileCost.toString());
>>>     }
>>>
>>>     if (filterExpressions != null) {
>>>       for (Expression filter : filterExpressions) {
>>>         scan = scan.filter(filter);
>>>       }
>>>     }
>>>
>>>     try (CloseableIterable<CombinedScanTask> tasksIterable = 
>>> scan.planTasks()) {
>>>       this.tasks = Lists.newArrayList(tasksIterable);
>>>     }  catch (IOException e) {
>>>       throw new RuntimeIOException(e, "Failed to close table scan: %s", 
>>> scan);
>>>     }
>>>   }
>>>
>>>   return tasks;
>>> }
>>>
>>>
>>> On Fri, Jul 17, 2020 at 9:35 AM Sud <sudssf2...@gmail.com> wrote:
>>>
>>>> Thanks @Jingsong for reply
>>>>
>>>> Yes one additional data point about the table.
>>>> This table is avro table and generated from stream ingestion. We expect
>>>> a couple of thousand snapshots created daily.
>>>>
>>>> We are using appendsBetween API , I am I think any compaction
>>>> operation will break the API. but I will take a look at compacting
>>>> datafiles and manifest files.
>>>>
>>>> if I understand what you mentioned above, is the ManifestGroup reader
>>>> slow? I wonder if it will help if we add a debug log and log timer for some
>>>> of these operations.
>>>>
>>>> I will keep you updated
>>>>
>>>> --
>>>> Thanks
>>>>
>>>> On Fri, Jul 17, 2020 at 1:50 AM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sud,
>>>>>
>>>>> The batch read of the Iceberg table should just read the latest
>>>>> snapshot.
>>>>> I think this case is that your large tables have a large number of
>>>>> manifest files.
>>>>>
>>>>> 1.The simple way is reducing manifest file numbers:
>>>>> - For reducing manifest file number, you can try
>>>>> `Actions.rewriteManifests`(Thanks Anton) to compact manifest files.
>>>>> - If there are too many small data files, leading to too many manifest
>>>>> files even after compacting, you can try `Actions.rewriteDataFiles`(Thanks
>>>>> jerryshao) to compact data files.
>>>>>
>>>>> 2.Another way is parallelizing the opening of the manifest reader.
>>>>> Your stack looks like the thread is stuck in
>>>>> `DataFileReader.openReader`, at least, it will need to read magic bytes
>>>>> from the input stream in `open`, and this looks slow. (At least, the input
>>>>> stream needs to read an IO block)
>>>>> So can we make `DataFileReader.openReader` parallelize? We should make
>>>>> `ManifestGroup.entries` returns a parallel iterable.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Fri, Jul 17, 2020 at 7:23 AM Sud <sudssf2...@gmail.com> wrote:
>>>>>
>>>>>> HI Iceberg-devs
>>>>>>
>>>>>> We are trying to root cause issue where driver get stuck when trying
>>>>>> to read comparatively large tables ( > 2000 snapshots)
>>>>>>
>>>>>> When I tried to look at the thread dump of the driver's main thread I
>>>>>> see that thread is stuck in planning tasks. I also noticed that 
>>>>>> iceberg-worker-pool
>>>>>> is idle and mostly 1 thread is active.
>>>>>> Has anyone faced a similar issue? what parameters can be used to
>>>>>> optimize reads for tables with large number of snapshots ( with smaller
>>>>>> data files)
>>>>>>
>>>>>>
>>>>>> java.net.SocketInputStream.socketRead0(Native Method)
>>>>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:171)
>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:141)
>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>>>>>> sun.security.ssl.InputRecord.read(InputRecord.java:503)
>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990) => 
>>>>>> holding Monitor(java.lang.Object@580832623})
>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding 
>>>>>> Monitor(sun.security.ssl.AppInputStream@894541691})
>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
>>>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
>>>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
>>>>>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
>>>>>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
>>>>>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
>>>>>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
>>>>>> com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
>>>>>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
>>>>>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
>>>>>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>>>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>>>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926)
>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872)
>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1472)
>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:148)
>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364) => 
>>>>>> holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1577065760})
>>>>>> java.io.DataInputStream.read(DataInputStream.java:149)
>>>>>> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:112)
>>>>>> org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:106)
>>>>>> org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55)
>>>>>> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94)
>>>>>> org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
>>>>>> org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:141)
>>>>>> org.apache.iceberg.ManifestReader.read(ManifestReader.java:119)
>>>>>> org.apache.iceberg.ManifestGroup.lambda$entries$13(ManifestGroup.java:212)
>>>>>> org.apache.iceberg.ManifestGroup$$Lambda$89/1042679820.apply(Unknown 
>>>>>> Source)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:113)
>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:100)
>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:137)
>>>>>>  => holding 
>>>>>> Monitor(org.apache.iceberg.util.ParallelIterable$ParallelIterator@1724178871})
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
>>>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:111)
>>>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:87)
>>>>>> org.apache.iceberg.io.CloseableIterable$3$1.next(CloseableIterable.java:94)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>>>>> org.apache.iceberg.spark.source.Reader.tasks(Reader.java:295)
>>>>>> org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:196)
>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
>>>>>>  => holding 
>>>>>> Monitor(org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec@853913013})
>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Best, Jingsong Lee

Reply via email to