Natan HP created FLINK-27395: -------------------------------- Summary: IllegalStateException: Could not find policy 'pick_first'. on Flink Application Key: FLINK-27395 URL: https://issues.apache.org/jira/browse/FLINK-27395 Project: Flink Issue Type: Bug Components: Connectors / Google Cloud PubSub Affects Versions: 1.14.4, 1.14.2 Environment: # Minikube
{noformat} ➜ minikube version minikube version: v1.25.2 commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7 {noformat} # Apache Flink Docker Image {noformat} apache/flink:1.14.4-scala_2.11{noformat} Reporter: Natan HP I got this exception on flink taskmanager, but I can see that the data is successfully published in the pub sub. Here is the log: {noformat} 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched from INITIALIZING to RUNNING. Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic! java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure its implementation is either registered to LoadBalancerRegistry or included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files. at io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94) at io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) at com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) at com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) at com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) at com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {noformat} The code sample: {code:java} SinkFunction<String> pubsubSink = PubSubSink.newBuilder() .withSerializationSchema((SerializationSchema<String>) s -> s.getBytes(StandardCharsets.UTF_8)) .withProjectName("<project-name>") .withTopicName("<topic-name>") .build(); dataStream.addSink(pubsubSink) .name("Pub-sub-sink"); {code} I use Maven Assembly Plugin to create the uber JAR: {noformat} <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <mainClass>org.example.flink.Main</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>{noformat} The content of the JAR: {noformat} ➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider io/grpc/LoadBalancerProvider$UnknownConfig.class META-INF/services/io.grpc.LoadBalancerProvider io/grpc/LoadBalancerProvider.class ➜ jar tf MyApp.jar | grep io.grpc.NameResolverProvider io/grpc/NameResolverProvider.class META-INF/services/io.grpc.NameResolverProvider {noformat} What I've tried to solve this: # Downgrading version to 1.14.2 {noformat} <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-gcp-pubsub_2.12</artifactId> <version>1.14.2</version> </dependency>{noformat} # Using maven shade plugin (along side maven assembly plugin) with the following config as suggedted in [here|[https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-474739796]:] {noformat} <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>META-INF/services</resource> <file>io.grpc.LoadBalancerProvider</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>META-INF/services</resource> <file>io.grpc.NameResolverProvider</file> </transformer>{noformat} 3. Creating files inside META-INF/services as suggested in [here|https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-477658832]: {noformat} Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on master [⇡!+?] ➜ ls io.grpc.LoadBalancerProvider io.grpc.NameResolverProviderData-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on master [⇡!+?] ➜ cat io.grpc.LoadBalancerProvider io.grpc.internal.PickFirstLoadBalancerProvider Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on master [⇡!+?] ➜ cat io.grpc.NameResolverProvider io.grpc.internal.DnsNameResolverProvider {noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)