This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new ba2efd0d8 feat(csharp/src/Drivers/Databricks): Fix for older DBR
versions incorrect ResultFormat (#3020)
ba2efd0d8 is described below
commit ba2efd0d8533374d357393ad3b3a916e36721eb2
Author: Todd Meng <[email protected]>
AuthorDate: Mon Jun 30 19:07:31 2025 -0700
feat(csharp/src/Drivers/Databricks): Fix for older DBR versions incorrect
ResultFormat (#3020)
This is because there is a bug on runtime-side:
> Sometimes TSparkDirectResults.TFetchResultsResp.TRowSet could be
non-link responses (ARROW_BASED_SET), but still
TSparkDirectResults.TGetResultSetMetadataResp.ResultFormat ==
URL_BASED_SET
This PR gets around that by directly inspecting the results. Since we do
not always have DirectResults, we make an additional FetchResult call to
determine whether to use `CloudFetchReader` or arrow-based
`DatabricksReader`. Only if the result contains links does it decide to
use `CloudFetchReader`.
This test does not pass in 11.3, 10.4 DBR until this commit.
dotnet test --filter "FullyQualifiedName~LZ4DecompressionCapabilityTest"
## Remaining work:
Remaining work: need to update status polling to start earlier, ideally
should be managed by new DatabricksCompositeReader
---
.../CloudFetch/CloudFetchDownloadManager.cs | 4 +-
.../Databricks/CloudFetch/CloudFetchReader.cs | 7 +-
.../CloudFetch/CloudFetchResultFetcher.cs | 22 +++-
.../Databricks/DatabricksCompositeReader.cs | 115 +++++++++++++++++
.../src/Drivers/Databricks/DatabricksConnection.cs | 25 +---
csharp/src/Drivers/Databricks/DatabricksReader.cs | 14 +--
.../CloudFetch/CloudFetchResultFetcherTest.cs | 139 ++++++++++++++++++++-
7 files changed, 289 insertions(+), 37 deletions(-)
diff --git
a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
index 97190dfec..484f1af25 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
@@ -22,6 +22,7 @@ using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Hive.Service.Rpc.Thrift;
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
{
@@ -59,7 +60,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
/// <param name="statement">The HiveServer2 statement.</param>
/// <param name="schema">The Arrow schema.</param>
/// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
- public CloudFetchDownloadManager(DatabricksStatement statement, Schema
schema, bool isLz4Compressed, HttpClient httpClient)
+ public CloudFetchDownloadManager(DatabricksStatement statement, Schema
schema, TFetchResultsResp? initialResults, bool isLz4Compressed, HttpClient
httpClient)
{
_statement = statement ?? throw new
ArgumentNullException(nameof(statement));
_schema = schema ?? throw new
ArgumentNullException(nameof(schema));
@@ -193,6 +194,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
// Initialize the result fetcher with URL management capabilities
_resultFetcher = new CloudFetchResultFetcher(
_statement,
+ initialResults,
_memoryManager,
_downloadQueue,
DefaultFetchBatchSize,
diff --git a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
index d5a48d58f..397d249cc 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
@@ -23,6 +23,7 @@ using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
{
@@ -46,7 +47,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
/// <param name="statement">The Databricks statement.</param>
/// <param name="schema">The Arrow schema.</param>
/// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
- public CloudFetchReader(DatabricksStatement statement, Schema schema,
bool isLz4Compressed, HttpClient httpClient)
+ public CloudFetchReader(DatabricksStatement statement, Schema schema,
TFetchResultsResp? initialResults, bool isLz4Compressed, HttpClient httpClient)
: base(statement, schema, isLz4Compressed)
{
// Check if prefetch is enabled
@@ -67,14 +68,14 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
// Initialize the download manager
if (isPrefetchEnabled)
{
- downloadManager = new CloudFetchDownloadManager(statement,
schema, isLz4Compressed, httpClient);
+ downloadManager = new CloudFetchDownloadManager(statement,
schema, initialResults, isLz4Compressed, httpClient);
downloadManager.StartAsync().Wait();
}
else
{
// For now, we only support the prefetch implementation
// This flag is reserved for future use if we need to support
a non-prefetch mode
- downloadManager = new CloudFetchDownloadManager(statement,
schema, isLz4Compressed, httpClient);
+ downloadManager = new CloudFetchDownloadManager(statement,
schema, initialResults, isLz4Compressed, httpClient);
downloadManager.StartAsync().Wait();
}
}
diff --git
a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
index 3167fd93a..b90ba6c32 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
@@ -32,6 +32,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
internal class CloudFetchResultFetcher : ICloudFetchResultFetcher
{
private readonly IHiveServer2Statement _statement;
+ private readonly TFetchResultsResp? _initialResults;
private readonly ICloudFetchMemoryBufferManager _memoryManager;
private readonly BlockingCollection<IDownloadResult> _downloadQueue;
private readonly SemaphoreSlim _fetchLock = new SemaphoreSlim(1, 1);
@@ -57,6 +58,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
/// <param name="clock">Clock implementation for time operations. If
null, uses system clock.</param>
public CloudFetchResultFetcher(
IHiveServer2Statement statement,
+ TFetchResultsResp? initialResults,
ICloudFetchMemoryBufferManager memoryManager,
BlockingCollection<IDownloadResult> downloadQueue,
long batchSize,
@@ -64,6 +66,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
IClock? clock = null)
{
_statement = statement ?? throw new
ArgumentNullException(nameof(statement));
+ _initialResults = initialResults;
_memoryManager = memoryManager ?? throw new
ArgumentNullException(nameof(memoryManager));
_downloadQueue = downloadQueue ?? throw new
ArgumentNullException(nameof(downloadQueue));
_batchSize = batchSize;
@@ -210,7 +213,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
try
{
// Process direct results first, if available
- if (_statement.HasDirectResults &&
_statement.DirectResults?.ResultSet?.Results?.ResultLinks?.Count > 0)
+ if (_statement.HasDirectResults &&
+
(_statement.DirectResults?.ResultSet?.Results?.ResultLinks?.Count > 0 ||
+ _initialResults?.Results?.ResultLinks?.Count > 0))
{
// Yield execution so the download queue doesn't get
blocked before downloader is started
await Task.Yield();
@@ -328,7 +333,18 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
private void ProcessDirectResultsAsync(CancellationToken
cancellationToken)
{
- List<TSparkArrowResultLink> resultLinks =
_statement.DirectResults!.ResultSet.Results.ResultLinks;
+ TFetchResultsResp fetchResults;
+ if (_statement.HasDirectResults &&
_statement.DirectResults?.ResultSet?.Results?.ResultLinks?.Count > 0)
+ {
+ fetchResults = _statement.DirectResults!.ResultSet;
+ }
+ else
+ {
+ fetchResults = _initialResults!;
+ }
+
+ List<TSparkArrowResultLink> resultLinks =
fetchResults.Results.ResultLinks;
+
long maxOffset = 0;
// Process each link
@@ -350,7 +366,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
_startOffset = maxOffset;
// Update whether there are more results
- _hasMoreResults = _statement.DirectResults!.ResultSet.HasMoreRows;
+ _hasMoreResults = fetchResults.HasMoreRows;
}
}
}
diff --git a/csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs
b/csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs
new file mode 100644
index 000000000..495aedeea
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow;
+using Apache.Arrow.Adbc;
+using Apache.Arrow.Adbc.Drivers.Apache;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch;
+using Apache.Arrow.Adbc.Tracing;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+ /// <summary>
+ /// A composite reader for Databricks that delegates to either
CloudFetchReader or DatabricksReader
+ /// based on CloudFetch configuration and result set characteristics.
+ /// </summary>
+ internal sealed class DatabricksCompositeReader : TracingReader
+ {
+ private static readonly string s_assemblyName =
ApacheUtility.GetAssemblyName(typeof(DatabricksCompositeReader));
+ private static readonly string s_assemblyVersion =
ApacheUtility.GetAssemblyVersion(typeof(DatabricksCompositeReader));
+
+ public override string AssemblyName => s_assemblyName;
+
+ public override string AssemblyVersion => s_assemblyVersion;
+
+ public override Schema Schema { get { return _schema; } }
+
+ private BaseDatabricksReader? _activeReader;
+ private readonly DatabricksStatement _statement;
+ private readonly Schema _schema;
+ private readonly bool _isLz4Compressed;
+ private readonly TlsProperties _tlsOptions;
+ private readonly HiveServer2ProxyConfigurator _proxyConfigurator;
+
+ /// <summary>
+ /// Initializes a new instance of the <see
cref="DatabricksCompositeReader"/> class.
+ /// </summary>
+ /// <param name="statement">The Databricks statement.</param>
+ /// <param name="schema">The Arrow schema.</param>
+ /// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
+ /// <param name="httpClient">The HTTP client for CloudFetch
operations.</param>
+ internal DatabricksCompositeReader(DatabricksStatement statement,
Schema schema, bool isLz4Compressed, TlsProperties tlsOptions,
HiveServer2ProxyConfigurator proxyConfigurator): base(statement)
+ {
+ _statement = statement ?? throw new
ArgumentNullException(nameof(statement));
+ _schema = schema ?? throw new
ArgumentNullException(nameof(schema));
+ _isLz4Compressed = isLz4Compressed;
+ _tlsOptions = tlsOptions;
+ _proxyConfigurator = proxyConfigurator;
+
+ // use direct results if available
+ if (_statement.HasDirectResults && _statement.DirectResults !=
null && _statement.DirectResults.__isset.resultSet)
+ {
+ _activeReader =
DetermineReader(_statement.DirectResults.ResultSet);
+ }
+ }
+
+ private BaseDatabricksReader DetermineReader(TFetchResultsResp
initialResults)
+ {
+ // if it has links, use cloud fetch
+ if (initialResults.__isset.results &&
+ initialResults.Results.__isset.resultLinks &&
+ initialResults.Results.ResultLinks?.Count > 0)
+ {
+ HttpClient cloudFetchHttpClient = new
HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(_tlsOptions,
_proxyConfigurator));
+ return new CloudFetchReader(_statement, _schema,
initialResults, _isLz4Compressed, cloudFetchHttpClient);
+ }
+ else
+ {
+ return new DatabricksReader(_statement, _schema,
initialResults, _isLz4Compressed);
+ }
+ }
+
+ /// <summary>
+ /// Reads the next record batch from the active reader.
+ /// </summary>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The next record batch, or null if there are no more
batches.</returns>
+ public override async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ // Initialize the active reader if not already done
+ if (_activeReader == null)
+ {
+ // if no reader, we did not have direct results
+ // Make a FetchResults call to get the initial result set
+ // and determine the reader based on the result set
+ TFetchResultsReq request = new
TFetchResultsReq(this._statement.OperationHandle!,
TFetchOrientation.FETCH_NEXT, this._statement.BatchSize);
+ TFetchResultsResp response = await
this._statement.Connection.Client!.FetchResults(request, cancellationToken);
+ _activeReader = DetermineReader(response);
+ }
+
+ return await
_activeReader.ReadNextRecordBatchAsync(cancellationToken);
+ }
+ }
+}
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 2ffa7b552..18beb5c2c 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -326,8 +326,6 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
internal override IArrowArrayStream NewReader<T>(T statement, Schema
schema, TGetResultSetMetadataResp? metadataResp = null)
{
- // Get result format from metadata response if available
- TSparkRowSetType resultFormat = TSparkRowSetType.ARROW_BASED_SET;
bool isLz4Compressed = false;
DatabricksStatement? databricksStatement = statement as
DatabricksStatement;
@@ -337,29 +335,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
throw new InvalidOperationException("Cannot obtain a reader
for Databricks");
}
- if (metadataResp != null)
+ if (metadataResp != null && metadataResp.__isset.lz4Compressed)
{
- if (metadataResp.__isset.resultFormat)
- {
- resultFormat = metadataResp.ResultFormat;
- }
-
- if (metadataResp.__isset.lz4Compressed)
- {
- isLz4Compressed = metadataResp.Lz4Compressed;
- }
+ isLz4Compressed = metadataResp.Lz4Compressed;
}
- // Choose the appropriate reader based on the result format
- if (resultFormat == TSparkRowSetType.URL_BASED_SET)
- {
- HttpClient cloudFetchHttpClient = new
HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions,
_proxyConfigurator));
- return new CloudFetchReader(databricksStatement, schema,
isLz4Compressed, cloudFetchHttpClient);
- }
- else
- {
- return new DatabricksReader(databricksStatement, schema,
isLz4Compressed);
- }
+ return new DatabricksCompositeReader(databricksStatement, schema,
isLz4Compressed, TlsOptions, _proxyConfigurator);
}
internal override SchemaParser SchemaParser => new
DatabricksSchemaParser();
diff --git a/csharp/src/Drivers/Databricks/DatabricksReader.cs
b/csharp/src/Drivers/Databricks/DatabricksReader.cs
index bcf5cc269..13ca37e7d 100644
--- a/csharp/src/Drivers/Databricks/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksReader.cs
@@ -35,18 +35,18 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
int index;
IArrowReader? reader;
- public DatabricksReader(DatabricksStatement statement, Schema schema,
bool isLz4Compressed) : base(statement, schema, isLz4Compressed)
+ public DatabricksReader(DatabricksStatement statement, Schema schema,
TFetchResultsResp? initialResults, bool isLz4Compressed) : base(statement,
schema, isLz4Compressed)
{
// If we have direct results, initialize the batches from them
if (statement.HasDirectResults)
{
this.batches =
statement.DirectResults!.ResultSet.Results.ArrowBatches;
-
- if (!statement.DirectResults.ResultSet.HasMoreRows)
- {
- this.hasNoMoreRows = true;
- return;
- }
+ this.hasNoMoreRows =
!statement.DirectResults.ResultSet.HasMoreRows;
+ }
+ else if (initialResults != null)
+ {
+ this.batches = initialResults.Results.ArrowBatches;
+ this.hasNoMoreRows = !initialResults.HasMoreRows;
}
}
diff --git
a/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
b/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
index d70a378e0..7c9de8371 100644
--- a/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
+++ b/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
@@ -427,6 +427,131 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
#endregion
+ #region Initial Results Tests
+
+ [Fact]
+ public async Task InitialResults_ProcessesInitialResultsCorrectly()
+ {
+ // Arrange
+ var initialResultLinks = new List<TSparkArrowResultLink>
+ {
+ CreateTestResultLink(0, 100, "http://test.com/initial1", 3600),
+ CreateTestResultLink(100, 100, "http://test.com/initial2",
3600)
+ };
+
+ var initialResults =
CreateFetchResultsResponse(initialResultLinks, false);
+ var fetcherWithInitialResults =
CreateResultFetcherWithInitialResults(initialResults);
+
+ // Act
+ await fetcherWithInitialResults.StartAsync(CancellationToken.None);
+
+ // Wait for the fetcher to process the initial results
+ await Task.Delay(200);
+
+ // Assert
+ // The download queue should contain our initial result links
+ Assert.True(_downloadQueue.Count >= initialResultLinks.Count,
+ $"Expected at least {initialResultLinks.Count} items in queue,
but found {_downloadQueue.Count}");
+
+ // Take all items from the queue and verify they match our initial
result links
+ var downloadResults = new List<IDownloadResult>();
+ while (_downloadQueue.TryTake(out var result))
+ {
+ // Skip the end of results guard
+ if (result == EndOfResultsGuard.Instance)
+ {
+ continue;
+ }
+ downloadResults.Add(result);
+ }
+
+ Assert.Equal(initialResultLinks.Count, downloadResults.Count);
+
+ // Verify each download result has the correct link
+ for (int i = 0; i < initialResultLinks.Count; i++)
+ {
+ Assert.Equal(initialResultLinks[i].FileLink,
downloadResults[i].Link.FileLink);
+ Assert.Equal(initialResultLinks[i].StartRowOffset,
downloadResults[i].Link.StartRowOffset);
+ Assert.Equal(initialResultLinks[i].RowCount,
downloadResults[i].Link.RowCount);
+ }
+
+ // Verify the fetcher completed
+ Assert.True(fetcherWithInitialResults.IsCompleted);
+ Assert.False(fetcherWithInitialResults.HasMoreResults);
+
+ // Cleanup
+ await fetcherWithInitialResults.StopAsync();
+ }
+
+ [Fact]
+ public async Task InitialResults_WithMoreRows_ContinuesFetching()
+ {
+ // Arrange
+ var initialResultLinks = new List<TSparkArrowResultLink>
+ {
+ CreateTestResultLink(0, 100, "http://test.com/initial1", 3600)
+ };
+
+ var additionalResultLinks = new List<TSparkArrowResultLink>
+ {
+ CreateTestResultLink(100, 100, "http://test.com/additional1",
3600)
+ };
+
+ // Initial results indicate more rows are available
+ var initialResults =
CreateFetchResultsResponse(initialResultLinks, true);
+ var fetcherWithInitialResults =
CreateResultFetcherWithInitialResults(initialResults);
+
+ // Setup mock for additional fetch
+ SetupMockClientFetchResults(additionalResultLinks, false);
+
+ // Act
+ await fetcherWithInitialResults.StartAsync(CancellationToken.None);
+
+ // Wait for the fetcher to process all results
+ await Task.Delay(300);
+
+ // Assert
+ // The download queue should contain both initial and additional
result links
+ var expectedCount = initialResultLinks.Count +
additionalResultLinks.Count;
+ Assert.True(_downloadQueue.Count >= expectedCount,
+ $"Expected at least {expectedCount} items in queue, but found
{_downloadQueue.Count}");
+
+ // Take all items from the queue
+ var downloadResults = new List<IDownloadResult>();
+ while (_downloadQueue.TryTake(out var result))
+ {
+ // Skip the end of results guard
+ if (result == EndOfResultsGuard.Instance)
+ {
+ continue;
+ }
+ downloadResults.Add(result);
+ }
+
+ Assert.Equal(expectedCount, downloadResults.Count);
+
+ // Verify the fetcher completed
+ Assert.True(fetcherWithInitialResults.IsCompleted);
+ Assert.False(fetcherWithInitialResults.HasMoreResults);
+
+ // Cleanup
+ await fetcherWithInitialResults.StopAsync();
+ }
+
+ private CloudFetchResultFetcherWithMockClock
CreateResultFetcherWithInitialResults(TFetchResultsResp initialResults)
+ {
+ return new CloudFetchResultFetcherWithMockClock(
+ _mockStatement.Object,
+ initialResults,
+ _mockMemoryManager.Object,
+ _downloadQueue,
+ 100, // batchSize
+ _mockClock,
+ 60); // expirationBufferSeconds
+ }
+
+ #endregion
+
#region Helper Methods
private TSparkArrowResultLink CreateTestResultLink(long
startRowOffset, int rowCount, string fileLink, int expirySeconds)
@@ -515,7 +640,19 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
long batchSize,
IClock clock,
int expirationBufferSeconds = 60)
- : base(statement, memoryManager, downloadQueue, batchSize,
expirationBufferSeconds, clock)
+ : base(statement, null, memoryManager, downloadQueue, batchSize,
expirationBufferSeconds, clock)
+ {
+ }
+
+ public CloudFetchResultFetcherWithMockClock(
+ IHiveServer2Statement statement,
+ TFetchResultsResp? initialResults,
+ ICloudFetchMemoryBufferManager memoryManager,
+ BlockingCollection<IDownloadResult> downloadQueue,
+ long batchSize,
+ IClock clock,
+ int expirationBufferSeconds = 60)
+ : base(statement, initialResults, memoryManager, downloadQueue,
batchSize, expirationBufferSeconds, clock)
{
}
}