This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new ba2efd0d8 feat(csharp/src/Drivers/Databricks): Fix for older DBR 
versions incorrect ResultFormat (#3020)
ba2efd0d8 is described below

commit ba2efd0d8533374d357393ad3b3a916e36721eb2
Author: Todd Meng <[email protected]>
AuthorDate: Mon Jun 30 19:07:31 2025 -0700

    feat(csharp/src/Drivers/Databricks): Fix for older DBR versions incorrect 
ResultFormat (#3020)
    
    This is because there is a bug on runtime-side:
    
    > Sometimes TSparkDirectResults.TFetchResultsResp.TRowSet could be
    non-link responses (ARROW_BASED_SET), but still
    TSparkDirectResults.TGetResultSetMetadataResp.ResultFormat ==
    URL_BASED_SET
    
    This PR gets around that by directly inspecting the results. Since we do
    not always have DirectResults, we make an additional FetchResult call to
    determine whether to use `CloudFetchReader` or arrow-based
    `DatabricksReader`. Only if the result contains links does it decide to
    use `CloudFetchReader`.
    
    This test does not pass in 11.3, 10.4 DBR until this commit.
    dotnet test --filter "FullyQualifiedName~LZ4DecompressionCapabilityTest"
    
    
    
    ## Remaining work:
    Remaining work: need to update status polling to start earlier, ideally
    should be managed by new DatabricksCompositeReader
---
 .../CloudFetch/CloudFetchDownloadManager.cs        |   4 +-
 .../Databricks/CloudFetch/CloudFetchReader.cs      |   7 +-
 .../CloudFetch/CloudFetchResultFetcher.cs          |  22 +++-
 .../Databricks/DatabricksCompositeReader.cs        | 115 +++++++++++++++++
 .../src/Drivers/Databricks/DatabricksConnection.cs |  25 +---
 csharp/src/Drivers/Databricks/DatabricksReader.cs  |  14 +--
 .../CloudFetch/CloudFetchResultFetcherTest.cs      | 139 ++++++++++++++++++++-
 7 files changed, 289 insertions(+), 37 deletions(-)

diff --git 
a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs 
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
index 97190dfec..484f1af25 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
@@ -22,6 +22,7 @@ using System.Net.Http;
 using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Hive.Service.Rpc.Thrift;
 
 namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
 {
@@ -59,7 +60,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
         /// <param name="statement">The HiveServer2 statement.</param>
         /// <param name="schema">The Arrow schema.</param>
         /// <param name="isLz4Compressed">Whether the results are LZ4 
compressed.</param>
-        public CloudFetchDownloadManager(DatabricksStatement statement, Schema 
schema, bool isLz4Compressed, HttpClient httpClient)
+        public CloudFetchDownloadManager(DatabricksStatement statement, Schema 
schema, TFetchResultsResp? initialResults, bool isLz4Compressed, HttpClient 
httpClient)
         {
             _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
             _schema = schema ?? throw new 
ArgumentNullException(nameof(schema));
@@ -193,6 +194,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
             // Initialize the result fetcher with URL management capabilities
             _resultFetcher = new CloudFetchResultFetcher(
                 _statement,
+                initialResults,
                 _memoryManager,
                 _downloadQueue,
                 DefaultFetchBatchSize,
diff --git a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs 
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
index d5a48d58f..397d249cc 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
@@ -23,6 +23,7 @@ using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Tracing;
 using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
 
 namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
 {
@@ -46,7 +47,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
         /// <param name="statement">The Databricks statement.</param>
         /// <param name="schema">The Arrow schema.</param>
         /// <param name="isLz4Compressed">Whether the results are LZ4 
compressed.</param>
-        public CloudFetchReader(DatabricksStatement statement, Schema schema, 
bool isLz4Compressed, HttpClient httpClient)
+        public CloudFetchReader(DatabricksStatement statement, Schema schema, 
TFetchResultsResp? initialResults, bool isLz4Compressed, HttpClient httpClient)
             : base(statement, schema, isLz4Compressed)
         {
             // Check if prefetch is enabled
@@ -67,14 +68,14 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
             // Initialize the download manager
             if (isPrefetchEnabled)
             {
-                downloadManager = new CloudFetchDownloadManager(statement, 
schema, isLz4Compressed, httpClient);
+                downloadManager = new CloudFetchDownloadManager(statement, 
schema, initialResults, isLz4Compressed, httpClient);
                 downloadManager.StartAsync().Wait();
             }
             else
             {
                 // For now, we only support the prefetch implementation
                 // This flag is reserved for future use if we need to support 
a non-prefetch mode
-                downloadManager = new CloudFetchDownloadManager(statement, 
schema, isLz4Compressed, httpClient);
+                downloadManager = new CloudFetchDownloadManager(statement, 
schema, initialResults, isLz4Compressed, httpClient);
                 downloadManager.StartAsync().Wait();
             }
         }
diff --git 
a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs 
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
index 3167fd93a..b90ba6c32 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs
@@ -32,6 +32,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
     internal class CloudFetchResultFetcher : ICloudFetchResultFetcher
     {
         private readonly IHiveServer2Statement _statement;
+        private readonly TFetchResultsResp? _initialResults;
         private readonly ICloudFetchMemoryBufferManager _memoryManager;
         private readonly BlockingCollection<IDownloadResult> _downloadQueue;
         private readonly SemaphoreSlim _fetchLock = new SemaphoreSlim(1, 1);
@@ -57,6 +58,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
         /// <param name="clock">Clock implementation for time operations. If 
null, uses system clock.</param>
         public CloudFetchResultFetcher(
             IHiveServer2Statement statement,
+            TFetchResultsResp? initialResults,
             ICloudFetchMemoryBufferManager memoryManager,
             BlockingCollection<IDownloadResult> downloadQueue,
             long batchSize,
@@ -64,6 +66,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
             IClock? clock = null)
         {
             _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
+            _initialResults = initialResults;
             _memoryManager = memoryManager ?? throw new 
ArgumentNullException(nameof(memoryManager));
             _downloadQueue = downloadQueue ?? throw new 
ArgumentNullException(nameof(downloadQueue));
             _batchSize = batchSize;
@@ -210,7 +213,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
             try
             {
                 // Process direct results first, if available
-                if (_statement.HasDirectResults && 
_statement.DirectResults?.ResultSet?.Results?.ResultLinks?.Count > 0)
+                if (_statement.HasDirectResults &&
+                    
(_statement.DirectResults?.ResultSet?.Results?.ResultLinks?.Count > 0 ||
+                    _initialResults?.Results?.ResultLinks?.Count > 0))
                 {
                     // Yield execution so the download queue doesn't get 
blocked before downloader is started
                     await Task.Yield();
@@ -328,7 +333,18 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
 
         private void ProcessDirectResultsAsync(CancellationToken 
cancellationToken)
         {
-            List<TSparkArrowResultLink> resultLinks = 
_statement.DirectResults!.ResultSet.Results.ResultLinks;
+            TFetchResultsResp fetchResults;
+            if (_statement.HasDirectResults && 
_statement.DirectResults?.ResultSet?.Results?.ResultLinks?.Count > 0)
+            {
+                fetchResults = _statement.DirectResults!.ResultSet;
+            }
+            else
+            {
+                fetchResults = _initialResults!;
+            }
+
+            List<TSparkArrowResultLink> resultLinks = 
fetchResults.Results.ResultLinks;
+
             long maxOffset = 0;
 
             // Process each link
@@ -350,7 +366,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
             _startOffset = maxOffset;
 
             // Update whether there are more results
-            _hasMoreResults = _statement.DirectResults!.ResultSet.HasMoreRows;
+            _hasMoreResults = fetchResults.HasMoreRows;
         }
     }
 }
diff --git a/csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs 
b/csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs
new file mode 100644
index 000000000..495aedeea
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow;
+using Apache.Arrow.Adbc;
+using Apache.Arrow.Adbc.Drivers.Apache;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch;
+using Apache.Arrow.Adbc.Tracing;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+    /// <summary>
+    /// A composite reader for Databricks that delegates to either 
CloudFetchReader or DatabricksReader
+    /// based on CloudFetch configuration and result set characteristics.
+    /// </summary>
+    internal sealed class DatabricksCompositeReader : TracingReader
+    {
+        private static readonly string s_assemblyName = 
ApacheUtility.GetAssemblyName(typeof(DatabricksCompositeReader));
+        private static readonly string s_assemblyVersion = 
ApacheUtility.GetAssemblyVersion(typeof(DatabricksCompositeReader));
+
+        public override string AssemblyName => s_assemblyName;
+
+        public override string AssemblyVersion => s_assemblyVersion;
+
+        public override Schema Schema { get { return _schema; } }
+
+        private BaseDatabricksReader? _activeReader;
+        private readonly DatabricksStatement _statement;
+        private readonly Schema _schema;
+        private readonly bool _isLz4Compressed;
+        private readonly TlsProperties _tlsOptions;
+        private readonly HiveServer2ProxyConfigurator _proxyConfigurator;
+
+        /// <summary>
+        /// Initializes a new instance of the <see 
cref="DatabricksCompositeReader"/> class.
+        /// </summary>
+        /// <param name="statement">The Databricks statement.</param>
+        /// <param name="schema">The Arrow schema.</param>
+        /// <param name="isLz4Compressed">Whether the results are LZ4 
compressed.</param>
+        /// <param name="httpClient">The HTTP client for CloudFetch 
operations.</param>
+        internal DatabricksCompositeReader(DatabricksStatement statement, 
Schema schema, bool isLz4Compressed, TlsProperties tlsOptions, 
HiveServer2ProxyConfigurator proxyConfigurator): base(statement)
+        {
+            _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
+            _schema = schema ?? throw new 
ArgumentNullException(nameof(schema));
+            _isLz4Compressed = isLz4Compressed;
+            _tlsOptions = tlsOptions;
+            _proxyConfigurator = proxyConfigurator;
+
+            // use direct results if available
+            if (_statement.HasDirectResults && _statement.DirectResults != 
null && _statement.DirectResults.__isset.resultSet)
+            {
+                _activeReader = 
DetermineReader(_statement.DirectResults.ResultSet);
+            }
+        }
+
+        private BaseDatabricksReader DetermineReader(TFetchResultsResp 
initialResults)
+        {
+            // if it has links, use cloud fetch
+            if (initialResults.__isset.results &&
+                initialResults.Results.__isset.resultLinks &&
+                initialResults.Results.ResultLinks?.Count > 0)
+            {
+                HttpClient cloudFetchHttpClient = new 
HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(_tlsOptions, 
_proxyConfigurator));
+                return new CloudFetchReader(_statement, _schema, 
initialResults, _isLz4Compressed, cloudFetchHttpClient);
+            }
+            else
+            {
+                return new DatabricksReader(_statement, _schema, 
initialResults, _isLz4Compressed);
+            }
+        }
+
+        /// <summary>
+        /// Reads the next record batch from the active reader.
+        /// </summary>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The next record batch, or null if there are no more 
batches.</returns>
+        public override async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+        {
+            // Initialize the active reader if not already done
+            if (_activeReader == null)
+            {
+                // if no reader, we did not have direct results
+                // Make a FetchResults call to get the initial result set
+                // and determine the reader based on the result set
+                TFetchResultsReq request = new 
TFetchResultsReq(this._statement.OperationHandle!, 
TFetchOrientation.FETCH_NEXT, this._statement.BatchSize);
+                TFetchResultsResp response = await 
this._statement.Connection.Client!.FetchResults(request, cancellationToken);
+                _activeReader = DetermineReader(response);
+            }
+
+            return await 
_activeReader.ReadNextRecordBatchAsync(cancellationToken);
+        }
+    }
+}
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 2ffa7b552..18beb5c2c 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -326,8 +326,6 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
 
         internal override IArrowArrayStream NewReader<T>(T statement, Schema 
schema, TGetResultSetMetadataResp? metadataResp = null)
         {
-            // Get result format from metadata response if available
-            TSparkRowSetType resultFormat = TSparkRowSetType.ARROW_BASED_SET;
             bool isLz4Compressed = false;
 
             DatabricksStatement? databricksStatement = statement as 
DatabricksStatement;
@@ -337,29 +335,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 throw new InvalidOperationException("Cannot obtain a reader 
for Databricks");
             }
 
-            if (metadataResp != null)
+            if (metadataResp != null && metadataResp.__isset.lz4Compressed)
             {
-                if (metadataResp.__isset.resultFormat)
-                {
-                    resultFormat = metadataResp.ResultFormat;
-                }
-
-                if (metadataResp.__isset.lz4Compressed)
-                {
-                    isLz4Compressed = metadataResp.Lz4Compressed;
-                }
+                isLz4Compressed = metadataResp.Lz4Compressed;
             }
 
-            // Choose the appropriate reader based on the result format
-            if (resultFormat == TSparkRowSetType.URL_BASED_SET)
-            {
-                HttpClient cloudFetchHttpClient = new 
HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions, 
_proxyConfigurator));
-                return new CloudFetchReader(databricksStatement, schema, 
isLz4Compressed, cloudFetchHttpClient);
-            }
-            else
-            {
-                return new DatabricksReader(databricksStatement, schema, 
isLz4Compressed);
-            }
+            return new DatabricksCompositeReader(databricksStatement, schema, 
isLz4Compressed, TlsOptions, _proxyConfigurator);
         }
 
         internal override SchemaParser SchemaParser => new 
DatabricksSchemaParser();
diff --git a/csharp/src/Drivers/Databricks/DatabricksReader.cs 
b/csharp/src/Drivers/Databricks/DatabricksReader.cs
index bcf5cc269..13ca37e7d 100644
--- a/csharp/src/Drivers/Databricks/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksReader.cs
@@ -35,18 +35,18 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         int index;
         IArrowReader? reader;
 
-        public DatabricksReader(DatabricksStatement statement, Schema schema, 
bool isLz4Compressed) : base(statement, schema, isLz4Compressed)
+        public DatabricksReader(DatabricksStatement statement, Schema schema, 
TFetchResultsResp? initialResults, bool isLz4Compressed) : base(statement, 
schema, isLz4Compressed)
         {
             // If we have direct results, initialize the batches from them
             if (statement.HasDirectResults)
             {
                 this.batches = 
statement.DirectResults!.ResultSet.Results.ArrowBatches;
-
-                if (!statement.DirectResults.ResultSet.HasMoreRows)
-                {
-                    this.hasNoMoreRows = true;
-                    return;
-                }
+                this.hasNoMoreRows = 
!statement.DirectResults.ResultSet.HasMoreRows;
+            }
+            else if (initialResults != null)
+            {
+                this.batches = initialResults.Results.ArrowBatches;
+                this.hasNoMoreRows = !initialResults.HasMoreRows;
             }
         }
 
diff --git 
a/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs 
b/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
index d70a378e0..7c9de8371 100644
--- a/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
+++ b/csharp/test/Drivers/Databricks/CloudFetch/CloudFetchResultFetcherTest.cs
@@ -427,6 +427,131 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
         #endregion
 
+        #region Initial Results Tests
+
+        [Fact]
+        public async Task InitialResults_ProcessesInitialResultsCorrectly()
+        {
+            // Arrange
+            var initialResultLinks = new List<TSparkArrowResultLink>
+            {
+                CreateTestResultLink(0, 100, "http://test.com/initial1";, 3600),
+                CreateTestResultLink(100, 100, "http://test.com/initial2";, 
3600)
+            };
+
+            var initialResults = 
CreateFetchResultsResponse(initialResultLinks, false);
+            var fetcherWithInitialResults = 
CreateResultFetcherWithInitialResults(initialResults);
+
+            // Act
+            await fetcherWithInitialResults.StartAsync(CancellationToken.None);
+
+            // Wait for the fetcher to process the initial results
+            await Task.Delay(200);
+
+            // Assert
+            // The download queue should contain our initial result links
+            Assert.True(_downloadQueue.Count >= initialResultLinks.Count,
+                $"Expected at least {initialResultLinks.Count} items in queue, 
but found {_downloadQueue.Count}");
+
+            // Take all items from the queue and verify they match our initial 
result links
+            var downloadResults = new List<IDownloadResult>();
+            while (_downloadQueue.TryTake(out var result))
+            {
+                // Skip the end of results guard
+                if (result == EndOfResultsGuard.Instance)
+                {
+                    continue;
+                }
+                downloadResults.Add(result);
+            }
+
+            Assert.Equal(initialResultLinks.Count, downloadResults.Count);
+
+            // Verify each download result has the correct link
+            for (int i = 0; i < initialResultLinks.Count; i++)
+            {
+                Assert.Equal(initialResultLinks[i].FileLink, 
downloadResults[i].Link.FileLink);
+                Assert.Equal(initialResultLinks[i].StartRowOffset, 
downloadResults[i].Link.StartRowOffset);
+                Assert.Equal(initialResultLinks[i].RowCount, 
downloadResults[i].Link.RowCount);
+            }
+
+            // Verify the fetcher completed
+            Assert.True(fetcherWithInitialResults.IsCompleted);
+            Assert.False(fetcherWithInitialResults.HasMoreResults);
+
+            // Cleanup
+            await fetcherWithInitialResults.StopAsync();
+        }
+
+        [Fact]
+        public async Task InitialResults_WithMoreRows_ContinuesFetching()
+        {
+            // Arrange
+            var initialResultLinks = new List<TSparkArrowResultLink>
+            {
+                CreateTestResultLink(0, 100, "http://test.com/initial1";, 3600)
+            };
+
+            var additionalResultLinks = new List<TSparkArrowResultLink>
+            {
+                CreateTestResultLink(100, 100, "http://test.com/additional1";, 
3600)
+            };
+
+            // Initial results indicate more rows are available
+            var initialResults = 
CreateFetchResultsResponse(initialResultLinks, true);
+            var fetcherWithInitialResults = 
CreateResultFetcherWithInitialResults(initialResults);
+
+            // Setup mock for additional fetch
+            SetupMockClientFetchResults(additionalResultLinks, false);
+
+            // Act
+            await fetcherWithInitialResults.StartAsync(CancellationToken.None);
+
+            // Wait for the fetcher to process all results
+            await Task.Delay(300);
+
+            // Assert
+            // The download queue should contain both initial and additional 
result links
+            var expectedCount = initialResultLinks.Count + 
additionalResultLinks.Count;
+            Assert.True(_downloadQueue.Count >= expectedCount,
+                $"Expected at least {expectedCount} items in queue, but found 
{_downloadQueue.Count}");
+
+            // Take all items from the queue
+            var downloadResults = new List<IDownloadResult>();
+            while (_downloadQueue.TryTake(out var result))
+            {
+                // Skip the end of results guard
+                if (result == EndOfResultsGuard.Instance)
+                {
+                    continue;
+                }
+                downloadResults.Add(result);
+            }
+
+            Assert.Equal(expectedCount, downloadResults.Count);
+
+            // Verify the fetcher completed
+            Assert.True(fetcherWithInitialResults.IsCompleted);
+            Assert.False(fetcherWithInitialResults.HasMoreResults);
+
+            // Cleanup
+            await fetcherWithInitialResults.StopAsync();
+        }
+
+        private CloudFetchResultFetcherWithMockClock 
CreateResultFetcherWithInitialResults(TFetchResultsResp initialResults)
+        {
+            return new CloudFetchResultFetcherWithMockClock(
+                _mockStatement.Object,
+                initialResults,
+                _mockMemoryManager.Object,
+                _downloadQueue,
+                100, // batchSize
+                _mockClock,
+                60); // expirationBufferSeconds
+        }
+
+        #endregion
+
         #region Helper Methods
 
         private TSparkArrowResultLink CreateTestResultLink(long 
startRowOffset, int rowCount, string fileLink, int expirySeconds)
@@ -515,7 +640,19 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
             long batchSize,
             IClock clock,
             int expirationBufferSeconds = 60)
-            : base(statement, memoryManager, downloadQueue, batchSize, 
expirationBufferSeconds, clock)
+            : base(statement, null, memoryManager, downloadQueue, batchSize, 
expirationBufferSeconds, clock)
+        {
+        }
+
+        public CloudFetchResultFetcherWithMockClock(
+            IHiveServer2Statement statement,
+            TFetchResultsResp? initialResults,
+            ICloudFetchMemoryBufferManager memoryManager,
+            BlockingCollection<IDownloadResult> downloadQueue,
+            long batchSize,
+            IClock clock,
+            int expirationBufferSeconds = 60)
+            : base(statement, initialResults, memoryManager, downloadQueue, 
batchSize, expirationBufferSeconds, clock)
         {
         }
     }

Reply via email to