Thanks Austin, Luke replying my message:
I did some experiments, these are my code snippets. Manen: <beam.version>2.21.0</beam.version> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-spanner-jdbc</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-spanner</artifactId> <version>1.56.0</version> </dependency> Java code: public class SpannerJdbcToCsvText { private static final Logger LOG = LoggerFactory.getLogger(SpannerJdbcToCsvText.class); public interface SpannerToTextOptions extends PipelineOptions, SpannerReadOptions, JavascriptTextTransformerOptions, FilesystemWriteOptions { } public static void main(String[] args) { LOG.info("Starting pipeline setup"); PipelineOptionsFactory.register(SpannerToTextOptions.class); SpannerToTextOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class); FileSystems.setDefaultPipelineOptions(options); Pipeline pipeline = Pipeline.create(options); // ----- block 1 start--------------------- // block 1 will print out in logs in my local mac desktop. List<String> list = new ArrayList<>(); try { String projectId = "projectId"; String instanceId = "instanceId"; String databaseId = "databaseId"; String credentialsFile = "/my/mac/local/path/credentials.json"; try (Connection connection = DriverManager.getConnection( String.format( "jdbc:cloudspanner:/projects/%s/instances/%s/databases/%s?credentials=%s", projectId, instanceId, databaseId, credentialsFile))) { try (Statement statement = connection.createStatement()) { try (ResultSet rs = statement.executeQuery("SELECT name from t2")) { while (rs.next()) { list.add(rs.getString(1)); LOG.info("print outside get value: " + rs.getString(1)); } } } } } catch(Exception e) { LOG.error("", e); } PCollection<String> results = pipeline.apply(Create.of(list).withType(TypeDescriptor.of(String.class))).setCoder(StringUtf8Coder.of()); // ----- block 1 end--------------------- // ----- block 2 start--------------------- // block 2 will print in server logs results.apply("print value", ParDo.of(new MapFn())); // ----- block 2 end--------------------- pipeline.run(); LOG.info("Completed pipeline setup"); } } I ran the program like this: mvn compile exec:java \ -Dexec.mainClass=com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText \ -Dexec.args="--runner=DataflowRunner \ --region=us-central1" There logs printed out in my local console: org.apache.beam.runners.dataflow.DataflowRunner - PipelineOptions.filesToStage was not specified. Defaulting tofiles from the classpath: will stage 351 files. Enable logging at DEBUG level to see which files will be staged. com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: myname com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: 2 com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: 3 com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get value: 4 org.apache.beam.runners.dataflow.DataflowRunner - Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services. org.apache.beam.runners.dataflow.util.PackageUtil - Uploading 351 files from PipelineOptions.filesToStage to staging location to prepare for execution. org.apache.beam.runners.dataflow.util.PackageUtil - Uploading /Users/shengyang/ws/Stubhub-DataPlatform.dataworks-ingestion/target/classes to gs://dataflow-staging-us-central1-661544897337/temp/staging/classes-KrjSD-Y0s4i28kG-XmiBiw.jar There logs printed in gcp servers 2020-06-30 09:44:57.483 HKT Finished processing stage F0 with 0 errors in 0.28 seconds 2020-06-30 09:44:59.600 HKT Starting MapTask stage s01 2020-06-30 09:45:00.916 HKT in mapfn - get value:myname 2020-06-30 09:45:00.934 HKT Finished processing stage s01 with 0 errors in 1.333 seconds 2020-06-30 09:45:03.025 HKT Starting MapTask stage s01 2020-06-30 09:45:03.046 HKT in mapfn - get value:4 2020-06-30 09:45:03.047 HKT Finished processing stage s01 with 0 errors in 0.022 seconds 2020-06-30 09:45:05.148 HKT Starting MapTask stage s01 2020-06-30 09:45:05.166 HKT in mapfn - get value:2 2020-06-30 09:45:05.176 HKT Finished processing stage s01 with 0 errors in 0.028 seconds Why Spanner JDBC call happens (in block 1) in my local machine during compile phase? while MapFn (in block 2) happens in server side, I expect all of them happen in server side. At 2020-06-30 00:17:51, "Luke Cwik" <lc...@google.com> wrote: The intent is that you grant permissions to the account that is running the Dataflow job to the resources you want it to access in project B before you start the pipeline. This allows for much finer grain access control and the ability to revoke permissions without having to disable an entire account. I would take a look at the general IAM and security documentation within GCP[1] or open up a support case with GCP requesting guidance. 1: https://cloud.google.com/iam On Sun, Jun 28, 2020 at 8:56 AM Austin Bennett <whatwouldausti...@gmail.com> wrote: I havent tried yet, but looks like the connection string asks for the project to be specified. Based on that (and cross project working for other circumstances), I would imagine it will work, but...? Give it a try! One tricky place might be ensuring proper permissions, in both projects (and without being too open). On Sat, Jun 27, 2020, 5:46 AM Sheng Yang <liff...@163.com> wrote: Hi, I am working on Beam using Dataflow engine. Recently I am working on reading spanner data from different project. Say I run my Beam dataflow job in GCP project A, but the Spanner is in GCP project B. I searched all the documents, but can't find any documentation about SpannerIO reading data with the custom credential key files. Right now I am considering JdbcIO because it accepts custom credential as parameters and spanner also have jdbc api[1]. Do I have something wrong in my description? Or am I considering the correct approach? String url ="jdbc:cloudspanner:/projects/my_project_id/" +"instances/my_instance_id/"+"databases/my_database_name"+"?credentials=/home/cloudspanner-keys/my-key.json"+";autocommit=false"; try (Connection connection =DriverManager.getConnection(url)) { try(ResultSet rs = connection.createStatement() .executeQuery("SELECT SingerId, AlbumId, MarketingBudget FROM Albums")) { while(rs.next()) { Long singerId = rs.getLong(1); } } } [1]: https://github.com/googleapis/java-spanner-jdbc Thanks, Sheng