Hi Itamar,

for further debugging it would be helpful to get the full logs of Flink and
more information about your environment. Since I'm not too familiar with
Flink's PubSub connector, I have pulled in Richard (original author),
Becket and Robert (both helped with reviewing and merging this connector).
They might know what's going on.

The problem looks a bit similar to [1]. Maybe it would help to upgrade to a
newer google-cloud-pubsub version than 1.62.0. I assume that the others
might know more about it.

[1] https://github.com/googleapis/google-cloud-java/issues/3648

Cheers,
Till

On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Hi all,
>
> We are trying to use the PubSub source with a very minimal and basic Flink
> application as a POC, and getting the following error consistently every
> couple of seconds. What am I missing?
>
> ```
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
> cleanQueue
> SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=
> pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
>     Make sure to call shutdown()/shutdownNow() and wait until
> awaitTermination() returns true.
> java.lang.RuntimeException: ManagedChannel allocation site
> at
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
> at
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.base/java.lang.Thread.run(Thread.java:834)
> ```
>
> Thanks!
>
> --
>
> [image: logo] <https://bigdataboutique.com/>
> Itamar Syn-Hershko
>
>
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> <https://www.linkedin.com/in/itamar-syn-hershko-78b25013>
> <https://twitter.com/synhershko>
> <https://www.youtube.com/channel/UCBHr7lM2u6SCWPJvcKug-Yg>
>

Reply via email to