Try this piece of code: System.setProperty("AWS_ACCESS_KEY_ID", "access_key") System.setProperty("AWS_SECRET_KEY", "secret") val streamName = "mystream" val endpointUrl = "https://kinesis.us-east-1.amazonaws.com/" val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl) val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size() val numStreams = numShards val kinesisCheckpointInterval = Seconds(10) val kinesisStreams = (0 until 10).map { i => KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val unionStreams = ssc.union(kinesisStreams)
unionStreams.print() Thanks Best Regards On Tue, Jan 20, 2015 at 12:51 PM, Hafiz Mujadid <hafizmujadi...@gmail.com> wrote: > Hi all! > > I am trying to use kinesis and spark streaming together. So when I execute > program I get exception com.amazonaws.AmazonClientException: Unable to load > AWS credentials from any provider in the chain > > > Here is my piece of code > > val credentials = new > BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID, > KinesisProperties.AWS_SECRET_KEY) > > var kinesisClient: AmazonKinesisClient = new > AmazonKinesisClient(credentials) > > > kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL, > KinesisProperties.KINESIS_SERVICE_NAME, > KinesisProperties.KINESIS_REGION_ID) > System.setProperty("aws.accessKeyId", > KinesisProperties.AWS_ACCESS_KEY_ID) > System.setProperty("aws.secretKey", > KinesisProperties.AWS_SECRET_KEY) > System.setProperty("AWS_ACCESS_KEY_ID", > KinesisProperties.AWS_ACCESS_KEY_ID) > System.setProperty("AWS_SECRET_KEY", > KinesisProperties.AWS_SECRET_KEY) > val numShards = > kinesisClient.describeStream(KinesisProperties.MY_STREAM_NAME) > .getStreamDescription().getShards().size() > val numStreams = numShards > val ssc = StreamingHelper.getStreamingInstance(new > Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL)) > ssc.addStreamingListener(new MyStreamListener) > val kinesisStreams = (0 until numStreams).map { i => > KinesisUtils.createStream(ssc, > KinesisProperties.MY_STREAM_NAME, > KinesisProperties.KINESIS_ENDPOINT_URL, > new > Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL), > InitialPositionInStream.TRIM_HORIZON, > StorageLevel.MEMORY_AND_DISK_2) > } > /* Union all the streams */ > val unionStreams = ssc.union(kinesisStreams) > val tmp_stream = unionStreams.map(byteArray => new > String(byteArray)) > val > data=tmp_stream.window(Seconds(KinesisProperties.WINDOW_INTERVAL ), > Seconds(KinesisProperties.SLIDING_INTERVAL)) > data.foreachRDD((rdd: RDD[String], time: Time) => { > if (rdd.take(1).size == 1) { > rdd.saveAsTextFile(KinesisProperties.Sink > + time.milliseconds) > } > }) > ssc.start() > ssc.awaitTermination() > > > > Any suggestion? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/com-amazonaws-AmazonClientException-Unable-to-load-AWS-credentials-from-any-provider-in-the-chain-tp21255.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >