Hi Itamar, for further debugging it would be helpful to get the full logs of Flink and more information about your environment. Since I'm not too familiar with Flink's PubSub connector, I have pulled in Richard (original author), Becket and Robert (both helped with reviewing and merging this connector). They might know what's going on.
The problem looks a bit similar to [1]. Maybe it would help to upgrade to a newer google-cloud-pubsub version than 1.62.0. I assume that the others might know more about it. [1] https://github.com/googleapis/google-cloud-java/issues/3648 Cheers, Till On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko < ita...@bigdataboutique.com> wrote: > Hi all, > > We are trying to use the PubSub source with a very minimal and basic Flink > application as a POC, and getting the following error consistently every > couple of seconds. What am I missing? > > ``` > io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference > cleanQueue > SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target= > pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~* > Make sure to call shutdown()/shutdownNow() and wait until > awaitTermination() returns true. > java.lang.RuntimeException: ManagedChannel allocation site > at > io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103) > at > io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53) > at > io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44) > at > io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419) > at > org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55) > at > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178) > at > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.base/java.lang.Thread.run(Thread.java:834) > ``` > > Thanks! > > -- > > [image: logo] <https://bigdataboutique.com/> > Itamar Syn-Hershko > > > ita...@bigdataboutique.com > https://bigdataboutique.com > <https://www.linkedin.com/in/itamar-syn-hershko-78b25013> > <https://twitter.com/synhershko> > <https://www.youtube.com/channel/UCBHr7lM2u6SCWPJvcKug-Yg> >