gerlowskija commented on a change in pull request #120: URL: https://github.com/apache/solr/pull/120#discussion_r641534935
########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/backup/BlobBackupRepository.java ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.backup; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.solr.blob.client.BlobNotFoundException; +import org.apache.solr.blob.client.BlobStorageClient; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.backup.repository.BackupRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A concrete implementation of {@link BackupRepository} interface supporting backup/restore of Solr indexes to a blob store like S3, GCS. + */ +public class BlobBackupRepository implements BackupRepository { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int CHUNK_SIZE = 16 * 1024 * 1024; // 16 MBs + static final String BLOB_SCHEME = "blob"; + + private NamedList<String> config; + private BlobStorageClient client; + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public void init(NamedList args) { + this.config = (NamedList<String>) args; + BlobBackupRepositoryConfig backupConfig = new BlobBackupRepositoryConfig(this.config); + + // If a client was already created, close it to avoid any resource leak + if (client != null) { + client.close(); + } + + this.client = backupConfig.buildClient(); + } + + @Override + @SuppressWarnings("unchecked") + public <T> T getConfigProperty(String name) { + return (T) this.config.get(name); + } + + @Override + public URI createURI(String location) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(location)); + URI result; + try { + result = new URI(location); + if (!result.isAbsolute()) { + if (location.startsWith("/")) { + return new URI(BLOB_SCHEME, null, location, null); + } else { + return new URI(BLOB_SCHEME, null, "/" + location, null); + } + } + } catch (URISyntaxException ex) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ex); + } + + return result; + } + + @Override + public URI resolve(URI baseUri, String... pathComponents) { + Objects.requireNonNull(baseUri); + Preconditions.checkArgument(baseUri.isAbsolute()); Review comment: [0] I love the rigorous input-checking you guys are doing throughout here. [0] I wonder whether some of these URI assumptions will hold up across other blob stores. I know GCS supports "relative looking" URIs (and in fact the GCS-mock used today for tests actually requires them!) ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/client/LocalStorageClient.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.client; + +import org.apache.commons.io.FileUtils; + +import java.io.*; +import java.nio.file.*; +import java.util.Collection; + +/** + * Primarily for testing purposes. + */ +public class LocalStorageClient implements BlobStorageClient { Review comment: [Q] I'm a little confused about the purpose of the various BlobStorageClients, and when each should be used. What's the purpose of "local" when tests already have an (also local) S3Mock implementation? Is this truly only ever for testing purposes or is there a reason a user might want to actually use this instead of LocalFileSystemRepository? If the former, could it live in "src/test" instead of "src/java"? ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/backup/BlobBackupRepositoryConfig.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.backup; + +import com.google.common.base.Strings; +import org.apache.solr.blob.client.*; +import org.apache.solr.common.util.NamedList; + +import java.util.Locale; +import java.util.Map; + +/** + * Class representing the {@code backup} blob config bundle specified in solr.xml. All user-provided config can be + * overridden via environment variables (use uppercase, with '_' instead of '.'), see {@link BlobBackupRepositoryConfig#toEnvVar}. + */ +public class BlobBackupRepositoryConfig { + + public static final String PROVIDER_TYPE = "blob.store.provider.type"; + public static final String LOCAL_STORAGE_ROOT = "blob.store.localStorageRoot"; + public static final String BUCKET_NAME = "blob.store.bucket.name"; + public static final String REGION = "blob.store.region"; + public static final String PROXY_HOST = "blob.store.proxy.host"; Review comment: [0] One small hurdle to having a level of abstraction across all our blob-based repositories is in configuration. Many of these config properties make sense for both S3 and GCS, but not all of them. Some are S3 specific. Conversely, the GCS support has some props that are GCS specific. There are ways to make all this explicit in the ref-guide without too much mess, but it's a documentation problem that largely disappears if we don't try for the uniform "blob repository" abstraction. ########## File path: solr/contrib/blob-repository/build.gradle ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +apply plugin: 'java-library' + +description = 'S3 Blob Repository' Review comment: [0] If we go the "all blob stores live in this contrib" route, then we'll have to grep around for text like this that's overly specific to S3. ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/backup/BlobBackupRepositoryConfig.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.backup; + +import com.google.common.base.Strings; +import org.apache.solr.blob.client.*; +import org.apache.solr.common.util.NamedList; + +import java.util.Locale; +import java.util.Map; + +/** + * Class representing the {@code backup} blob config bundle specified in solr.xml. All user-provided config can be + * overridden via environment variables (use uppercase, with '_' instead of '.'), see {@link BlobBackupRepositoryConfig#toEnvVar}. Review comment: [+1] I love the uniformity of this approach, that all config props can be set by env-var. It's something I didn't do with the GCS repository impl and that now looks like a defect in hindsight. ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/client/S3OutputStream.java ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.client; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.SyncProgressListener; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation is adapted from https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java Review comment: [0] Looks like kafka-connect-storage-cloud is licensed under a license I haven't seen before called the "Confluent Community License". If this code goes forward a committer will probably have to check with someone more knowledgeable about whether these license terms are ok. They look OK on an initial glance, mostly leaving this comment here as a reminder to myself to follow up later. ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/backup/BlobIndexInput.java ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.backup; + +import org.apache.lucene.store.BufferedIndexInput; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Locale; + +class BlobIndexInput extends BufferedIndexInput { Review comment: [Q] The code in this class all LGTM, but I've gotta admit that I don't quite get its purpose. Nothing in here seems terribly specific to blob stores over other repository implementations. Is there a reason you wrote this class instead of having BlobBackupRepository use the existing `BufferedChecksumIndexInput`? I feel like I'm just overlooking some important detail there... ########## File path: solr/contrib/blob-repository/src/test/org/apache/solr/blob/backup/BlobBackupRepositoryTest.java ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.backup; + +import com.adobe.testing.s3mock.junit4.S3MockRule; +import com.amazonaws.services.s3.AmazonS3; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.google.common.base.Strings; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.store.*; +import org.apache.solr.blob.client.AdobeMockS3StorageClient; +import org.apache.solr.blob.client.BlobstoreProviderType; +import org.apache.solr.cloud.api.collections.AbstractBackupRepositoryTest; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.backup.repository.BackupRepository; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.$; +import static com.carrotsearch.randomizedtesting.RandomizedTest.$$; +import static org.apache.solr.blob.backup.BlobBackupRepository.BLOB_SCHEME; + +public class BlobBackupRepositoryTest extends AbstractBackupRepositoryTest { + + private static final String BUCKET_NAME = BlobBackupRepositoryTest.class.getSimpleName(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @ClassRule + public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder() + .silent() + .withInitialBuckets(BUCKET_NAME) + .withHttpPort(AdobeMockS3StorageClient.DEFAULT_MOCK_S3_PORT) + .build(); + + @ParametersFactory(argumentFormatting = "%1$s") + public static Iterable<Object[]> parameters() { + return Arrays.asList($$($(BlobstoreProviderType.LOCAL), $(BlobstoreProviderType.S3MOCK))); Review comment: [+1] Neat - I had a vague recollection that there was some sort of issue preventing use of parameterization in SolrTestCaseJ4-based tests, but evidently not. Def going to use this pattern elsewhere. ########## File path: solr/contrib/blob-repository/src/test/org/apache/solr/blob/backup/BlobBackupRepositoryTest.java ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.backup; + +import com.adobe.testing.s3mock.junit4.S3MockRule; +import com.amazonaws.services.s3.AmazonS3; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.google.common.base.Strings; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.store.*; +import org.apache.solr.blob.client.AdobeMockS3StorageClient; +import org.apache.solr.blob.client.BlobstoreProviderType; +import org.apache.solr.cloud.api.collections.AbstractBackupRepositoryTest; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.backup.repository.BackupRepository; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.$; +import static com.carrotsearch.randomizedtesting.RandomizedTest.$$; +import static org.apache.solr.blob.backup.BlobBackupRepository.BLOB_SCHEME; + +public class BlobBackupRepositoryTest extends AbstractBackupRepositoryTest { + + private static final String BUCKET_NAME = BlobBackupRepositoryTest.class.getSimpleName(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @ClassRule + public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder() + .silent() + .withInitialBuckets(BUCKET_NAME) + .withHttpPort(AdobeMockS3StorageClient.DEFAULT_MOCK_S3_PORT) + .build(); + + @ParametersFactory(argumentFormatting = "%1$s") + public static Iterable<Object[]> parameters() { + return Arrays.asList($$($(BlobstoreProviderType.LOCAL), $(BlobstoreProviderType.S3MOCK))); + } + + private final BlobstoreProviderType type; + + public BlobBackupRepositoryTest(BlobstoreProviderType type) { + this.type = type; + } + + /** + * Sent by {@link org.apache.solr.handler.ReplicationHandler}, ensure we don't choke on the bare URI. + */ + @Test + public void testURI() { + try (BlobBackupRepository repo = getRepository()) { + URI uri = repo.createURI("x"); + assertEquals(BLOB_SCHEME, uri.getScheme()); + assertEquals("/x", uri.getPath()); + assertEquals("blob:/x", uri.toString()); + + } + } + + @Test + public void testLocalDirectoryFunctions() throws Exception { + + try (BlobBackupRepository repo = getRepository()) { + + URI path = new URI("/test"); + repo.createDirectory(path); + assertTrue(repo.exists(path)); + assertEquals(BackupRepository.PathType.DIRECTORY, repo.getPathType(path)); + assertEquals(repo.listAll(path).length, 0); + + URI subDir = new URI("/test/dir"); + repo.createDirectory(subDir); + assertTrue(repo.exists(subDir)); + assertEquals(BackupRepository.PathType.DIRECTORY, repo.getPathType(subDir)); + assertEquals(repo.listAll(subDir).length, 0); + + assertEquals(repo.listAll(path).length, 1); + + repo.deleteDirectory(path); + assertFalse(repo.exists(path)); + assertFalse(repo.exists(subDir)); + } + } + + /** + * Check resolving paths. + */ + @Test + public void testResolve() throws Exception { + + BlobBackupRepository repo = new BlobBackupRepository(); + + // Add single element to root + assertEquals(new URI("blob:/root/path"), + repo.resolve(new URI("blob:/root"), "path")); + + // Root ends with '/' + assertEquals(new URI("blob://root/path"), + repo.resolve(new URI("blob://root/"), "path")); + assertEquals(new URI("blob://root/path"), + repo.resolve(new URI("blob://root///"), "path")); + + // Add to a sub-element + assertEquals(new URI("blob://root/path1/path2"), + repo.resolve(new URI("blob://root/path1"), "path2")); + + // Add two elements to root + assertEquals(new URI("blob://root/path1/path2"), + repo.resolve(new URI("blob://root"), "path1", "path2")); + + // Add compound elements + assertEquals(new URI("blob:/root/path1/path2/path3"), + repo.resolve(new URI("blob:/root"), "path1/path2", "path3")); + + // Check URIs with an authority + assertEquals(new URI("blob://auth/path"), + repo.resolve(new URI("blob://auth"), "path")); + assertEquals(new URI("blob://auth/path1/path2"), + repo.resolve(new URI("blob://auth/path1"), "path2")); + } + + /** + * Check + * - pushing a file to the repo (backup). + * - pulling a file from the repo (restore). + */ + @Test + public void testCopyFiles() throws Exception { + + // basic test with a small file + String content = "Test to push a backup"; + doTestCopyFileFrom(content); + doTestCopyFileTo(content); + + // copy a 10Mb file + content += Strings.repeat("1234567890", 1024 * 1024); + doTestCopyFileFrom(content); + doTestCopyFileTo(content); + } + + /** + * Check copying a file to the repo (backup). + * Specified content is used for the file. + */ + private void doTestCopyFileFrom(String content) throws Exception { + + try (BlobBackupRepository repo = getRepository()) { + + // A file on the local disk (another storage than the local blob) + File tmp = temporaryFolder.newFolder(); + FileUtils.write(new File(tmp, "from-file"), content, StandardCharsets.UTF_8); + + Directory sourceDir = new NIOFSDirectory(tmp.toPath()); + repo.copyIndexFileFrom(sourceDir, "from-file", new URI("blob://to-folder"), "to-file"); + + // Sanity check: we do have different files + File actualSource = new File(tmp, "from-file"); + File actualDest = pullBlob("to-folder/to-file"); + assertNotEquals(actualSource, actualDest); + + // Check the copied content + assertTrue(actualDest.isFile()); + assertTrue(FileUtils.contentEquals(actualSource, actualDest)); + } + } + + /** + * Check retrieving a file from the repo (restore). + * Specified content is used for the file. + */ + private void doTestCopyFileTo(String content) throws Exception { + + try (BlobBackupRepository repo = getRepository()) { + + // Local folder for destination + File tmp = temporaryFolder.newFolder(); + Directory destDir = new NIOFSDirectory(tmp.toPath()); + + // Directly create a file in blob storage + pushBlob("from-file", content); + + repo.copyIndexFileTo(new URI("blob:///"), "from-file", destDir, "to-file"); + + // Sanity check: we do have different files + File actualSource = pullBlob("from-file"); + File actualDest = new File(tmp, "to-file"); + assertNotEquals(actualSource, actualDest); + + // Check the copied content + assertTrue(actualDest.isFile()); + assertTrue(FileUtils.contentEquals(actualSource, actualDest)); + } + } + + /** + * Check reading input with random access stream. + */ + @Test + public void testRandomAccessInput() throws Exception { + + // Test with a short text that fills in the buffer + String content = "This is the content of my blob"; + doRandomAccessTest(content, content.indexOf("content")); + + // Large text, we force to refill the buffer + String blank = Strings.repeat(" ", 5 * BufferedIndexInput.BUFFER_SIZE); + content = "This is a super large" + blank + "content"; + doRandomAccessTest(content, content.indexOf("content")); + } + + /** + * Check implementation of {@link BlobBackupRepository#openInput(URI, String, IOContext)}. + * Open an index input and seek to an absolute position. + * + * <p>We use specified text. It must has the word "content" at given position. + */ + private void doRandomAccessTest(String content, int position) throws Exception { + + try (BlobBackupRepository repo = getRepository()) { + File tmp = temporaryFolder.newFolder(); + + // Open an index input on a file + File subdir = new File(tmp, "my-repo"); + FileUtils.write(new File(subdir, "content"), content, StandardCharsets.UTF_8); + repo.copyIndexFileFrom(new NIOFSDirectory(tmp.getAbsoluteFile().toPath()), "my-repo/content", new URI("blob://my-repo"), "content"); + IndexInput input = repo.openInput(new URI("blob://my-repo"), "content", IOContext.DEFAULT); + + byte[] buffer = new byte[100]; + + // Read 4 bytes + input.readBytes(buffer, 0, 4); + assertEquals("This", new String(buffer, 0, 4, StandardCharsets.UTF_8)); + + // Seek to the work 'content' and read it + input.seek(position); + input.readBytes(buffer, 0, 7); + assertEquals("content", new String(buffer, 0, 7, StandardCharsets.UTF_8)); + } + } + + /** + * Check we gracefully fail when seeking before current position of the stream. + */ + @Test + public void testBackwardRandomAccess() throws Exception { + + try (BlobBackupRepository repo = getRepository()) { + + // Open an index input on a file + String blank = Strings.repeat(" ", 5 * BufferedIndexInput.BUFFER_SIZE); + String content = "This is the file " + blank + "content"; + + pushBlob("/content", content); + IndexInput input = repo.openInput(new URI("blob:///"), "content", IOContext.DEFAULT); + + // Read twice the size of the internal buffer, so first bytes are not in the buffer anymore + byte[] buffer = new byte[BufferedIndexInput.BUFFER_SIZE * 2]; + input.readBytes(buffer, 0, BufferedIndexInput.BUFFER_SIZE * 2); + + // Seek back to the 5th byte. + // It is not any more in the internal buffer, so we should fail + IOException exception = assertThrows(IOException.class, () -> input.seek(5)); + assertEquals("Cannot seek backward", exception.getMessage()); + } + } + + /** + * Initialize a blog repository based on local or S3 blob storage. Review comment: [0] Typo: blog->blob ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/client/S3StorageClient.java ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.client; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.*; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.apache.commons.io.input.ClosedInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.util.*; +import java.util.stream.Collectors; + + +/** + * Creates a {@link BlobStorageClient} for communicating with AWS S3. Utilizes the default credential provider chain; + * reference <a href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html">AWS SDK docs</a> for + * details on where this client will fetch credentials from, and the order of precedence. + */ +public class S3StorageClient implements BlobStorageClient { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // S3 has a hard limit of 1000 keys per batch delete request + private static final int MAX_KEYS_PER_BATCH_DELETE = 1000; + + // Metadata name used to identify flag directory entries in S3 + private static final String BLOB_DIR_HEADER = "x_is_directory"; + + // Error messages returned by S3 for a key not found. + private static final ImmutableSet<String> NOT_FOUND_CODES = ImmutableSet.of("NoSuchKey", "404 Not Found"); + + private final AmazonS3 s3Client; + + /** + * The S3 bucket where we write all of our blobs to. + */ + private final String bucketName; + + public S3StorageClient(String bucketName, String region, String proxyHost, int proxyPort) { + this(createInternalClient(region, proxyHost, proxyPort), bucketName); + } + + @VisibleForTesting + S3StorageClient(AmazonS3 s3Client, String bucketName) { + this.s3Client = s3Client; + this.bucketName = bucketName; + } + + private static AmazonS3 createInternalClient(String region, String proxyHost, int proxyPort) { + ClientConfiguration clientConfig = new ClientConfiguration() + .withProtocol(Protocol.HTTPS); + + // If configured, add proxy + if (!Strings.isNullOrEmpty(proxyHost)) { + clientConfig.setProxyHost(proxyHost); + if (proxyPort > 0) { + clientConfig.setProxyPort(proxyPort); + } + } + + /* + * Default s3 client builder loads credentials from disk and handles token refreshes + */ + return AmazonS3ClientBuilder.standard() + .enablePathStyleAccess() + .withClientConfiguration(clientConfig) + .withRegion(Regions.fromName(region)) + .build(); + } + + /** + * Create Directory in Blob Store + * + * @param path Directory Path in Blob Store + */ + @Override + public void createDirectory(String path) throws BlobException { + path = sanitizedPath(path, false); + + if (!parentDirectoryExist(path)) { + createDirectory(path.substring(0, path.lastIndexOf(BLOB_FILE_PATH_DELIMITER))); + //TODO see https://issues.apache.org/jira/browse/SOLR-15359 +// throw new BlobException("Parent directory doesn't exist, path=" + path); + } + + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.addUserMetadata(BLOB_DIR_HEADER, "true"); + objectMetadata.setContentLength(0); + + // Create empty blob object with header + final InputStream im = ClosedInputStream.CLOSED_INPUT_STREAM; + + try { + PutObjectRequest putRequest = new PutObjectRequest(bucketName, path, im, objectMetadata); + s3Client.putObject(putRequest); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Delete files in Blob Store. Deletion order is not guaranteed. + */ + @Override + public void delete(Collection<String> paths) throws BlobException { Review comment: [0] The de-duplication you're enforcing here by copying things into a Set is great. To take it a step further - it seems like logic that'd make sense for all BlobStorageClient implementations. Would it make any sense to change the interface here to take in a Set and move the conversion logic up into BlobRepository so that all BSC implementations benefit from it without duplicating the code? ########## File path: solr/server/etc/security.policy ########## @@ -106,6 +106,8 @@ grant { permission java.lang.RuntimePermission "writeFileDescriptor"; // needed by hadoop http permission java.lang.RuntimePermission "getProtectionDomain"; + // needed by aws s3 sdk + permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.reflect"; Review comment: [0] Note to self to look into this a bit more. ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/client/S3OutputStream.java ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.client; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.SyncProgressListener; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation is adapted from https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java + */ +public class S3OutputStream extends OutputStream { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // 16 MB. Part sizes must be between 5MB to 5GB. + // https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html + static final int PART_SIZE = 16777216; + static final int MIN_PART_SIZE = 5242880; + + private final AmazonS3 s3Client; + private final String bucketName; + private final String key; + private final SyncProgressListener progressListener; + private volatile boolean closed; + private final ByteBuffer buffer; + private MultipartUpload multiPartUpload; + + public S3OutputStream(AmazonS3 s3Client, String key, String bucketName) { + this.s3Client = s3Client; + this.bucketName = bucketName; + this.key = key; + this.closed = false; + this.buffer = ByteBuffer.allocate(PART_SIZE); + this.progressListener = new ConnectProgressListener(); + this.multiPartUpload = null; + + if (log.isDebugEnabled()) { + log.debug("Created S3OutputStream for bucketName '{}' key '{}'", bucketName, key); + } + } + + @Override + public void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + buffer.put((byte) b); + + // If the buffer is now full, push it to remote S3. + if (!buffer.hasRemaining()) { + uploadPart(false); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if (b == null) { + throw new NullPointerException(); + } else if (outOfRange(off, b.length) || len < 0 || outOfRange(off + len, b.length)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + if (buffer.remaining() <= len) { + int firstPart = buffer.remaining(); + buffer.put(b, off, firstPart); + uploadPart(false); + write(b, off + firstPart, len - firstPart); + } else { + buffer.put(b, off, len); + } + } + + private static boolean outOfRange(int off, int len) { + return off < 0 || off > len; + } + + private void uploadPart(boolean isLastPart) throws IOException { + + int size = buffer.position(); + + if (size == 0) { + // nothing to upload + return; + } + + if (multiPartUpload == null) { + if (log.isDebugEnabled()) { + log.debug("New multi-part upload for bucketName '{}' key '{}'", bucketName, key); + } + multiPartUpload = newMultipartUpload(); + } + try { + multiPartUpload.uploadPart(new ByteArrayInputStream(buffer.array()), size, isLastPart); + } catch (Exception e) { + if (multiPartUpload != null) { + multiPartUpload.abort(); + if (log.isDebugEnabled()) { + log.debug("Multipart upload aborted for bucketName '{}' key '{}'.", bucketName, key); + } + } + throw new BlobException("Part upload failed: ", e); + } + + // reset the buffer for eventual next write operation + buffer.clear(); + } + + @Override + public void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + // Flush is possible only if we have more data than the required part size + // If buffer size is lower than than, just skip + if (buffer.position() >= MIN_PART_SIZE) { + uploadPart(false); + } + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + // flush first + uploadPart(true); + + if (multiPartUpload != null) { + multiPartUpload.complete(); + multiPartUpload = null; + } + + closed = true; + } + + private MultipartUpload newMultipartUpload() throws IOException { + InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest( + bucketName, + key, + new ObjectMetadata()); + + try { + return new MultipartUpload(s3Client.initiateMultipartUpload(initRequest).getUploadId()); + } catch (AmazonClientException e) { + throw S3StorageClient.handleAmazonException(e); + } + } + + // Dummy listener for now, just logs the event progress. + private static class ConnectProgressListener extends SyncProgressListener { + public void progressChanged(ProgressEvent progressEvent) { + if (log.isDebugEnabled()) { + log.debug("Progress event {}", progressEvent); Review comment: [+1] Very cool ########## File path: solr/contrib/blob-repository/src/java/org/apache/solr/blob/client/S3StorageClient.java ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.blob.client; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.*; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.apache.commons.io.input.ClosedInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.util.*; +import java.util.stream.Collectors; + + +/** + * Creates a {@link BlobStorageClient} for communicating with AWS S3. Utilizes the default credential provider chain; + * reference <a href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html">AWS SDK docs</a> for + * details on where this client will fetch credentials from, and the order of precedence. + */ +public class S3StorageClient implements BlobStorageClient { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // S3 has a hard limit of 1000 keys per batch delete request + private static final int MAX_KEYS_PER_BATCH_DELETE = 1000; + + // Metadata name used to identify flag directory entries in S3 + private static final String BLOB_DIR_HEADER = "x_is_directory"; + + // Error messages returned by S3 for a key not found. + private static final ImmutableSet<String> NOT_FOUND_CODES = ImmutableSet.of("NoSuchKey", "404 Not Found"); + + private final AmazonS3 s3Client; + + /** + * The S3 bucket where we write all of our blobs to. + */ + private final String bucketName; + + public S3StorageClient(String bucketName, String region, String proxyHost, int proxyPort) { + this(createInternalClient(region, proxyHost, proxyPort), bucketName); + } + + @VisibleForTesting + S3StorageClient(AmazonS3 s3Client, String bucketName) { + this.s3Client = s3Client; + this.bucketName = bucketName; + } + + private static AmazonS3 createInternalClient(String region, String proxyHost, int proxyPort) { + ClientConfiguration clientConfig = new ClientConfiguration() + .withProtocol(Protocol.HTTPS); + + // If configured, add proxy + if (!Strings.isNullOrEmpty(proxyHost)) { + clientConfig.setProxyHost(proxyHost); + if (proxyPort > 0) { + clientConfig.setProxyPort(proxyPort); + } + } + + /* + * Default s3 client builder loads credentials from disk and handles token refreshes + */ + return AmazonS3ClientBuilder.standard() + .enablePathStyleAccess() + .withClientConfiguration(clientConfig) + .withRegion(Regions.fromName(region)) + .build(); + } + + /** + * Create Directory in Blob Store + * + * @param path Directory Path in Blob Store + */ + @Override + public void createDirectory(String path) throws BlobException { + path = sanitizedPath(path, false); + + if (!parentDirectoryExist(path)) { + createDirectory(path.substring(0, path.lastIndexOf(BLOB_FILE_PATH_DELIMITER))); + //TODO see https://issues.apache.org/jira/browse/SOLR-15359 +// throw new BlobException("Parent directory doesn't exist, path=" + path); + } + + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.addUserMetadata(BLOB_DIR_HEADER, "true"); + objectMetadata.setContentLength(0); + + // Create empty blob object with header + final InputStream im = ClosedInputStream.CLOSED_INPUT_STREAM; + + try { + PutObjectRequest putRequest = new PutObjectRequest(bucketName, path, im, objectMetadata); + s3Client.putObject(putRequest); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Delete files in Blob Store. Deletion order is not guaranteed. + */ + @Override + public void delete(Collection<String> paths) throws BlobException { + Set<String> entries = new HashSet<>(); + for (String path : paths) { + entries.add(sanitizedPath(path, true)); + } + + deleteBlobs(entries); + } + + @Override + public void deleteDirectory(String path) throws BlobException { + path = sanitizedPath(path, false); + + List<String> entries = new ArrayList<>(); + entries.add(path); + + // Get all the files and subdirectories + entries.addAll(listAll(path)); + + deleteObjects(entries); + } + + /** + * List all the Files and Sub directories in Path + * + * @param path Path to Directory in Blob Store + * @return Files and Sub directories in Path + */ + @Override + public String[] listDir(String path) throws BlobException { + path = sanitizedPath(path, false); + + String prefix = path.equals("/") ? path : path + BLOB_FILE_PATH_DELIMITER; + ListObjectsRequest listRequest = new ListObjectsRequest() + .withBucketName(bucketName) + .withPrefix(prefix) + .withDelimiter(BLOB_FILE_PATH_DELIMITER); + + List<String> entries = new ArrayList<>(); + try { + ObjectListing objectListing = s3Client.listObjects(listRequest); + + while (true) { + List<String> files = objectListing.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey) + // This filtering is needed only for S3mock. Real S3 does not ignore the trailing '/' in the prefix. + .filter(s -> s.startsWith(prefix)) + .map(s -> s.substring(prefix.length())) + .collect(Collectors.toList()); + + entries.addAll(files); + + if (objectListing.isTruncated()) { + objectListing = s3Client.listNextBatchOfObjects(objectListing); + } else { + break; + } + } + return entries.toArray(new String[0]); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Check if path exists + * + * @param path to File/Directory in Blob Store + * @return true if path exists otherwise false + */ + @Override + public boolean pathExists(String path) throws BlobException { + path = sanitizedPath(path, false); + + // for root return true + if ("/".equals(path)) { + return true; + } + + try { + return s3Client.doesObjectExist(bucketName, path); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Check if path is directory + * + * @param path to File/Directory in Blob Store + * @return true if path is directory otherwise false + */ + @Override + public boolean isDirectory(String path) throws BlobException { + path = sanitizedPath(path, false); + + try { + ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucketName, path); + String blobDirHeaderVal = objectMetadata.getUserMetaDataOf(BLOB_DIR_HEADER); + + return !Strings.isNullOrEmpty(blobDirHeaderVal) && blobDirHeaderVal.equalsIgnoreCase("true"); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Get length of File in Bytes + * + * @param path to File in Blob Store + * @return length of File + */ + @Override + public long length(String path) throws BlobException { + path = sanitizedPath(path, true); + try { + ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucketName, path); + String blobDirHeaderVal = objectMetadata.getUserMetaDataOf(BLOB_DIR_HEADER); + + if (Strings.isNullOrEmpty(blobDirHeaderVal) || !blobDirHeaderVal.equalsIgnoreCase("true")) { + return objectMetadata.getContentLength(); + } + throw new BlobException("Path is Directory"); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Get InputStream of File for Read + * + * @param path to File in Blob Store + * @return InputStream for File + */ + @Override + public InputStream pullStream(String path) throws BlobException { + path = sanitizedPath(path, true); + + try { + S3Object requestedObject = s3Client.getObject(bucketName, path); + // This InputStream instance needs to be closed by the caller + return requestedObject.getObjectContent(); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + /** + * Get OutputStream of File for Write. Caller needs to close the stream + * + * @param path to File in Blob Store + * @return OutputStream for File + */ + @Override + public OutputStream pushStream(String path) throws BlobException { + path = sanitizedPath(path, true); + + if (!parentDirectoryExist(path)) { + throw new BlobException("Parent directory doesn't exist"); + } + + try { + return new S3OutputStream(s3Client, path, bucketName); + } catch (AmazonClientException ase) { + throw handleAmazonException(ase); + } + } + + public BlobstoreProviderType getStorageProvider() { + return BlobstoreProviderType.S3; + } + + @Override + public void close() { + s3Client.shutdown(); + } + + /** + * Batch delete blob files from the blob store. + * + * @param entries collection of blob file keys to the files to be deleted. + **/ + private void deleteBlobs(Collection<String> entries) throws BlobException { Review comment: [0] ditto, re: should the Collection de-duplication be moved up into BlobRepository so all StorageClient impl's can benefit from it? ########## File path: solr/contrib/blob-repository/src/test/org/apache/solr/blob/backup/BlobBackupRepositoryTest.java ########## @@ -0,0 +1,366 @@ +/* Review comment: [+1] The unit-test coverage in the following files is laudable. The attention to thoroughness is awesome and appreciated! [-1] That said, there's one checkbox that's missing I think - integration-level testing. We have a base class (AbstractIncrementalBackupTest) that does a passable job here that gets extended by each of the existing BackupRepository impls. We should probably create a `S3IncrementalBackupTest` extending this, similar to `GCSIncrementalBackupTest` (as one example). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org