Hi All,
  I ran into an exception after deployed our app in EMR. It seems like the
connection to Kinesis failed. Any one got Flink KinesisConnector working in
EMR?

Release label:emr-5.11.0
Hadoop distribution:Amazon 2.7.3
Applications:Flink 1.3.2

java.lang.IllegalStateException: Socket not created by this factory
at org.apache.http.util.Asserts.check(Asserts.java:34)
at
org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435)
at
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186)
at
org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326)
at
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
at
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
at
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:656)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.describeStream(KinesisProxy.java:361)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:323)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:231)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:430)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:202)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

Thanks,
Tao

Reply via email to