Thanks Ryan and  Jingsong , I will add one more TODO to see if we can use
spark to parallelize estimation even when predicate pushdown is done. (
spark does this for file system based tables) and possibly for manifest
readers.

I will try to submit PR upstream for adding options and will create issues
for TODOs


--
Thanks

On Fri, Jul 17, 2020 at 9:25 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Thanks Sud for in-depth debugging. And thanks Ryan for the explanation.
>
> +1 to have a table property to disable stats estimation.
>
> IIUC, the difference between stats estimation and scan with filters is
> mainly in the partition filters:
> Iceberg uses filter-push-down to complete partition pruning. So the stats
> estimation will see all partitions of the table, but actually need to
> read a small amount of partitions.
>
> - Scan with partition filters only needs to read manifest list files, and
> do partition pruning, select only the manifest that contains specific
> partitions.
> - Scan without partition filters needs to read all manifest files.
>
> Best,
> Jingsong
>
> On Sat, Jul 18, 2020 at 6:13 AM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Hey, great question. I just caught up on the other thread, but let me
>> provide some context here.
>>
>> Spark uses the stats estimation here to determine whether or not to
>> broadcast. If we returned a default value, then Spark wouldn't be able to
>> use Iceberg tables in broadcast joins. Even though Spark won't push filters
>> down before calling this in 2.4, it is still better in many cases to return
>> the size of the entire table.
>>
>> I think it would make sense to be able to disable this using a table
>> property, when you know that the table is going to be too large to
>> broadcast un-filtered.
>>
>> Also, this is fixed in Spark 3 because we added rules to push filters
>> into the scan before stats are returned.
>>
>> On Fri, Jul 17, 2020 at 1:01 PM Sud <sudssf2...@gmail.com> wrote:
>>
>>> As per java doc estimateStatistics does not take into account any
>>> operators, any reason why iceberg reader implements this? I wonder if it
>>> would help to make it configurable and return default value.
>>>
>>> /**
>>>  * A mix in interface for {@link DataSourceReader}. Data source readers can 
>>> implement this
>>>  * interface to report statistics to Spark.
>>>  *
>>>  * As of Spark 2.4, statistics are reported to the optimizer before any 
>>> operator is pushed to the
>>>  * DataSourceReader. Implementations that return more accurate statistics 
>>> based on pushed operators
>>>  * will not improve query performance until the planner can push operators 
>>> before getting stats.
>>>  */
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 12:35 PM Sud <sudssf2...@gmail.com> wrote:
>>>
>>>> ok after adding more instrumentation I see that
>>>> Reader::estimateStatistics may be a culprit.
>>>>
>>>> looks like estimated stats may be performing full table estimate and
>>>> thats why it is so slow. does any one know if it is possible to
>>>> avoid Reader::estimateStatistics?
>>>>
>>>> Also does estimateStatistics use appropriate filters ( I have seen same
>>>> issue where performing unions) was very slow and estimateStatistics use to
>>>> take lot of time.
>>>>
>>>> @Override
>>>> public Statistics estimateStatistics() {
>>>>   long sizeInBytes = 0L;
>>>>   long numRows = 0L;
>>>>
>>>>   for (CombinedScanTask task : tasks()) {
>>>>     for (FileScanTask file : task.files()) {
>>>>       sizeInBytes += file.length();
>>>>       numRows += file.file().recordCount();
>>>>     }
>>>>   }
>>>>
>>>>   return new Stats(sizeInBytes, numRows);
>>>> }
>>>>
>>>> private List<CombinedScanTask> tasks() {
>>>>   if (tasks == null) {
>>>>     TableScan scan = table
>>>>         .newScan()
>>>>         .caseSensitive(caseSensitive)
>>>>         .project(lazySchema());
>>>>
>>>>     if (snapshotId != null) {
>>>>       scan = scan.useSnapshot(snapshotId);
>>>>     }
>>>>
>>>>     if (asOfTimestamp != null) {
>>>>       scan = scan.asOfTime(asOfTimestamp);
>>>>     }
>>>>
>>>>     if (splitSize != null) {
>>>>       scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
>>>>     }
>>>>
>>>>     if (splitLookback != null) {
>>>>       scan = scan.option(TableProperties.SPLIT_LOOKBACK, 
>>>> splitLookback.toString());
>>>>     }
>>>>
>>>>     if (splitOpenFileCost != null) {
>>>>       scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, 
>>>> splitOpenFileCost.toString());
>>>>     }
>>>>
>>>>     if (filterExpressions != null) {
>>>>       for (Expression filter : filterExpressions) {
>>>>         scan = scan.filter(filter);
>>>>       }
>>>>     }
>>>>
>>>>     try (CloseableIterable<CombinedScanTask> tasksIterable = 
>>>> scan.planTasks()) {
>>>>       this.tasks = Lists.newArrayList(tasksIterable);
>>>>     }  catch (IOException e) {
>>>>       throw new RuntimeIOException(e, "Failed to close table scan: %s", 
>>>> scan);
>>>>     }
>>>>   }
>>>>
>>>>   return tasks;
>>>> }
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 9:35 AM Sud <sudssf2...@gmail.com> wrote:
>>>>
>>>>> Thanks @Jingsong for reply
>>>>>
>>>>> Yes one additional data point about the table.
>>>>> This table is avro table and generated from stream ingestion. We
>>>>> expect a couple of thousand snapshots created daily.
>>>>>
>>>>> We are using appendsBetween API , I am I think any compaction
>>>>> operation will break the API. but I will take a look at compacting
>>>>> datafiles and manifest files.
>>>>>
>>>>> if I understand what you mentioned above, is the ManifestGroup reader
>>>>> slow? I wonder if it will help if we add a debug log and log timer for 
>>>>> some
>>>>> of these operations.
>>>>>
>>>>> I will keep you updated
>>>>>
>>>>> --
>>>>> Thanks
>>>>>
>>>>> On Fri, Jul 17, 2020 at 1:50 AM Jingsong Li <jingsongl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Sud,
>>>>>>
>>>>>> The batch read of the Iceberg table should just read the latest
>>>>>> snapshot.
>>>>>> I think this case is that your large tables have a large number of
>>>>>> manifest files.
>>>>>>
>>>>>> 1.The simple way is reducing manifest file numbers:
>>>>>> - For reducing manifest file number, you can try
>>>>>> `Actions.rewriteManifests`(Thanks Anton) to compact manifest files.
>>>>>> - If there are too many small data files, leading to too many
>>>>>> manifest files even after compacting, you can
>>>>>> try `Actions.rewriteDataFiles`(Thanks jerryshao) to compact data files.
>>>>>>
>>>>>> 2.Another way is parallelizing the opening of the manifest reader.
>>>>>> Your stack looks like the thread is stuck in
>>>>>> `DataFileReader.openReader`, at least, it will need to read magic bytes
>>>>>> from the input stream in `open`, and this looks slow. (At least, the 
>>>>>> input
>>>>>> stream needs to read an IO block)
>>>>>> So can we make `DataFileReader.openReader` parallelize? We should
>>>>>> make `ManifestGroup.entries` returns a parallel iterable.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 7:23 AM Sud <sudssf2...@gmail.com> wrote:
>>>>>>
>>>>>>> HI Iceberg-devs
>>>>>>>
>>>>>>> We are trying to root cause issue where driver get stuck when trying
>>>>>>> to read comparatively large tables ( > 2000 snapshots)
>>>>>>>
>>>>>>> When I tried to look at the thread dump of the driver's main thread
>>>>>>> I see that thread is stuck in planning tasks. I also noticed that 
>>>>>>> iceberg-worker-pool
>>>>>>> is idle and mostly 1 thread is active.
>>>>>>> Has anyone faced a similar issue? what parameters can be used to
>>>>>>> optimize reads for tables with large number of snapshots ( with smaller
>>>>>>> data files)
>>>>>>>
>>>>>>>
>>>>>>> java.net.SocketInputStream.socketRead0(Native Method)
>>>>>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:171)
>>>>>>> java.net.SocketInputStream.read(SocketInputStream.java:141)
>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>>>>>>> sun.security.ssl.InputRecord.read(InputRecord.java:503)
>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990) => 
>>>>>>> holding Monitor(java.lang.Object@580832623})
>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => 
>>>>>>> holding Monitor(sun.security.ssl.AppInputStream@894541691})
>>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
>>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
>>>>>>> org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
>>>>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
>>>>>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
>>>>>>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
>>>>>>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
>>>>>>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
>>>>>>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
>>>>>>> com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
>>>>>>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
>>>>>>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
>>>>>>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>>>>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>>>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>>>>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
>>>>>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926)
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872)
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1472)
>>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:148)
>>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
>>>>>>> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364) 
>>>>>>> => holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1577065760})
>>>>>>> java.io.DataInputStream.read(DataInputStream.java:149)
>>>>>>> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:112)
>>>>>>> org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:106)
>>>>>>> org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55)
>>>>>>> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94)
>>>>>>> org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
>>>>>>> org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:141)
>>>>>>> org.apache.iceberg.ManifestReader.read(ManifestReader.java:119)
>>>>>>> org.apache.iceberg.ManifestGroup.lambda$entries$13(ManifestGroup.java:212)
>>>>>>> org.apache.iceberg.ManifestGroup$$Lambda$89/1042679820.apply(Unknown 
>>>>>>> Source)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:113)
>>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:100)
>>>>>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:137)
>>>>>>>  => holding 
>>>>>>> Monitor(org.apache.iceberg.util.ParallelIterable$ParallelIterator@1724178871})
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
>>>>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:111)
>>>>>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:87)
>>>>>>> org.apache.iceberg.io.CloseableIterable$3$1.next(CloseableIterable.java:94)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>>>>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>>>>>> org.apache.iceberg.spark.source.Reader.tasks(Reader.java:295)
>>>>>>> org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:196)
>>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
>>>>>>>  => holding 
>>>>>>> Monitor(org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec@853913013})
>>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
>>>>>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>
> --
> Best, Jingsong Lee
>

Reply via email to