ok after adding more instrumentation I see that Reader::estimateStatistics
may be a culprit.

looks like estimated stats may be performing full table estimate and thats
why it is so slow. does any one know if it is possible to
avoid Reader::estimateStatistics?

Also does estimateStatistics use appropriate filters ( I have seen same
issue where performing unions) was very slow and estimateStatistics use to
take lot of time.

@Override
public Statistics estimateStatistics() {
  long sizeInBytes = 0L;
  long numRows = 0L;

  for (CombinedScanTask task : tasks()) {
    for (FileScanTask file : task.files()) {
      sizeInBytes += file.length();
      numRows += file.file().recordCount();
    }
  }

  return new Stats(sizeInBytes, numRows);
}

private List<CombinedScanTask> tasks() {
  if (tasks == null) {
    TableScan scan = table
        .newScan()
        .caseSensitive(caseSensitive)
        .project(lazySchema());

    if (snapshotId != null) {
      scan = scan.useSnapshot(snapshotId);
    }

    if (asOfTimestamp != null) {
      scan = scan.asOfTime(asOfTimestamp);
    }

    if (splitSize != null) {
      scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
    }

    if (splitLookback != null) {
      scan = scan.option(TableProperties.SPLIT_LOOKBACK,
splitLookback.toString());
    }

    if (splitOpenFileCost != null) {
      scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST,
splitOpenFileCost.toString());
    }

    if (filterExpressions != null) {
      for (Expression filter : filterExpressions) {
        scan = scan.filter(filter);
      }
    }

    try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
      this.tasks = Lists.newArrayList(tasksIterable);
    }  catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
    }
  }

  return tasks;
}


On Fri, Jul 17, 2020 at 9:35 AM Sud <sudssf2...@gmail.com> wrote:

> Thanks @Jingsong for reply
>
> Yes one additional data point about the table.
> This table is avro table and generated from stream ingestion. We expect a
> couple of thousand snapshots created daily.
>
> We are using appendsBetween API , I am I think any compaction operation
> will break the API. but I will take a look at compacting datafiles and
> manifest files.
>
> if I understand what you mentioned above, is the ManifestGroup reader
> slow? I wonder if it will help if we add a debug log and log timer for some
> of these operations.
>
> I will keep you updated
>
> --
> Thanks
>
> On Fri, Jul 17, 2020 at 1:50 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
>> Hi Sud,
>>
>> The batch read of the Iceberg table should just read the latest snapshot.
>> I think this case is that your large tables have a large number of
>> manifest files.
>>
>> 1.The simple way is reducing manifest file numbers:
>> - For reducing manifest file number, you can try
>> `Actions.rewriteManifests`(Thanks Anton) to compact manifest files.
>> - If there are too many small data files, leading to too many manifest
>> files even after compacting, you can try `Actions.rewriteDataFiles`(Thanks
>> jerryshao) to compact data files.
>>
>> 2.Another way is parallelizing the opening of the manifest reader.
>> Your stack looks like the thread is stuck in `DataFileReader.openReader`,
>> at least, it will need to read magic bytes from the input stream in `open`,
>> and this looks slow. (At least, the input stream needs to read an IO block)
>> So can we make `DataFileReader.openReader` parallelize? We should make
>> `ManifestGroup.entries` returns a parallel iterable.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Jul 17, 2020 at 7:23 AM Sud <sudssf2...@gmail.com> wrote:
>>
>>> HI Iceberg-devs
>>>
>>> We are trying to root cause issue where driver get stuck when trying to
>>> read comparatively large tables ( > 2000 snapshots)
>>>
>>> When I tried to look at the thread dump of the driver's main thread I
>>> see that thread is stuck in planning tasks. I also noticed that 
>>> iceberg-worker-pool
>>> is idle and mostly 1 thread is active.
>>> Has anyone faced a similar issue? what parameters can be used to
>>> optimize reads for tables with large number of snapshots ( with smaller
>>> data files)
>>>
>>>
>>> java.net.SocketInputStream.socketRead0(Native Method)
>>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>>> java.net.SocketInputStream.read(SocketInputStream.java:171)
>>> java.net.SocketInputStream.read(SocketInputStream.java:141)
>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>>> sun.security.ssl.InputRecord.read(InputRecord.java:503)
>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990) => 
>>> holding Monitor(java.lang.Object@580832623})
>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding 
>>> Monitor(sun.security.ssl.AppInputStream@894541691})
>>> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
>>> org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
>>> org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
>>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
>>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
>>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
>>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
>>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
>>> com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
>>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
>>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
>>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926)
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872)
>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1472)
>>> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:148)
>>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
>>> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364) => 
>>> holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1577065760})
>>> java.io.DataInputStream.read(DataInputStream.java:149)
>>> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:112)
>>> org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:106)
>>> org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55)
>>> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94)
>>> org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
>>> org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:141)
>>> org.apache.iceberg.ManifestReader.read(ManifestReader.java:119)
>>> org.apache.iceberg.ManifestGroup.lambda$entries$13(ManifestGroup.java:212)
>>> org.apache.iceberg.ManifestGroup$$Lambda$89/1042679820.apply(Unknown Source)
>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:113)
>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:100)
>>> org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:137)
>>>  => holding 
>>> Monitor(org.apache.iceberg.util.ParallelIterable$ParallelIterator@1724178871})
>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>>> org.apache.iceberg.shaded.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:111)
>>> org.apache.iceberg.util.BinPacking$PackingIterator.next(BinPacking.java:87)
>>> org.apache.iceberg.io.CloseableIterable$3$1.next(CloseableIterable.java:94)
>>> org.apache.iceberg.shaded.com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>> org.apache.iceberg.shaded.com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>> org.apache.iceberg.spark.source.Reader.tasks(Reader.java:295)
>>> org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:196)
>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
>>>  => holding 
>>> Monitor(org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec@853913013})
>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
>>> org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Reply via email to