haf edited a comment on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-481330654 When I run this code with Flink 1.7, I get this critical error when things crash: ``` Apr 09, 2019 6:44:17 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~* Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true. java.lang.RuntimeException: ManagedChannel allocation site at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103) at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53) at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44) at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419) at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:222) at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164) at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156) at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157) at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:260) at okr.sources.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:30) at okr.sources.PubSubSource.open(PubSubSource.java:96) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) ``` While it's bad that my code crashes, I would not want this message in my production logs anyway. Perhaps the subscriber should be disposed in a different manner upon the remainder of the pipeline failing? Another similar crash/stacktrace: ``` 18:58:33,803 WARN okr.sources.PubSubSource - Still waiting for subscriber to terminate. java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at io.grpc.internal.ManagedChannelImpl.awaitTermination(ManagedChannelImpl.java:752) at io.grpc.internal.ForwardingManagedChannel.awaitTermination(ForwardingManagedChannel.java:57) at io.grpc.internal.ManagedChannelOrphanWrapper.awaitTermination(ManagedChannelOrphanWrapper.java:70) at com.google.api.gax.grpc.GrpcTransportChannel.awaitTermination(GrpcTransportChannel.java:89) at com.google.api.gax.core.BackgroundResourceAggregation.awaitTermination(BackgroundResourceAggregation.java:82) at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.awaitTermination(GrpcSubscriberStub.java:560) at okr.sources.PubSubSource.shutdownSubscriber(PubSubSource.java:192) at okr.sources.PubSubSource.run(PubSubSource.java:139) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) ``` ``` java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:173) at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) ``` ``` Caused by: java.io.IOException: Too many open files at sun.nio.ch.IOUtil.makePipe(Native Method) at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:84) at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42) at java.nio.channels.Selector.open(Selector.java:227) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.<init>(AbstractMultiworkerIOReactor.java:142) ... 24 more ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services