Hyukjin,
This is what I did so far. I didn’t use DataSet yet or maybe I don’t need to.
var df: DataFrame = null
for(message <- messages) {
val bodyRdd = sc.parallelize(message.getBody() :: Nil)
val fileDf = sqlContext.read.json(bodyRdd)
.select(
$"Records.s3.bucket.name".as("bucket"),
$"Records.s3.object.key".as("key")
)
if (df != null) {
df = df.unionAll(fileDf)
} else {
df = fileDf
}
}
df.show
Each result is returned as an array. I just need to concatenate them together
to make the S3 URL, and download the files per URL. This I need help with next.
Thanks,
Ben
> On Apr 17, 2016, at 7:38 AM, Hyukjin Kwon <[email protected]> wrote:
>
> Hi!
>
> Personally, I don't think it necessarily needs to be DataSet for your goal.
>
> Just select your data at "s3" from DataFrame loaded by sqlContext.read.json().
>
> You can try to printSchema() to check the nested schema and then select the
> data.
>
> Also, I guess (from your codes) you are trying to send a reauest, fetch the
> response to driver-side, and then send each message to executor-side. I guess
> there would be really heavy overhead in driver-side.
> Holden,
>
> If I were to use DataSets, then I would essentially do this:
>
> val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
> val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
> for (message <- messages.asScala) {
> val files = sqlContext.read.json(message.getBody())
> }
>
> Can I simply do files.toDS() or do I have to create a schema using a case
> class File and apply it as[File]? If I have to apply a schema, then how would
> I create it based on the JSON structure below, especially the nested elements.
>
> Thanks,
> Ben
>
>
>> On Apr 14, 2016, at 3:46 PM, Holden Karau <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> You could certainly use RDDs for that, you might also find using Dataset
>> selecting the fields you need to construct the URL to fetch and then using
>> the map function to be easier.
>>
>> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <[email protected]
>> <mailto:[email protected]>> wrote:
>> I was wonder what would be the best way to use JSON in Spark/Scala. I need
>> to lookup values of fields in a collection of records to form a URL and
>> download that file at that location. I was thinking an RDD would be perfect
>> for this. I just want to hear from others who might have more experience in
>> this. Below is the actual JSON structure that I am trying to use for the S3
>> bucket and key values of each “record" within “Records".
>>
>> {
>> "Records":[
>> {
>> "eventVersion":"2.0",
>> "eventSource":"aws:s3",
>> "awsRegion":"us-east-1",
>> "eventTime":The time, in ISO-8601 format, for example,
>> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>> "eventName":"event-type",
>> "userIdentity":{
>>
>> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>> },
>> "requestParameters":{
>> "sourceIPAddress":"ip-address-where-request-came-from"
>> },
>> "responseElements":{
>> "x-amz-request-id":"Amazon S3 generated request ID",
>> "x-amz-id-2":"Amazon S3 host that processed the request"
>> },
>> "s3":{
>> "s3SchemaVersion":"1.0",
>> "configurationId":"ID found in the bucket notification
>> configuration",
>> "bucket":{
>> "name":"bucket-name",
>> "ownerIdentity":{
>> "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>> },
>> "arn":"bucket-ARN"
>> },
>> "object":{
>> "key":"object-key",
>> "size":object-size,
>> "eTag":"object eTag",
>> "versionId":"object version if bucket is versioning-enabled,
>> otherwise null",
>> "sequencer": "a string representation of a hexadecimal value
>> used to determine event sequence,
>> only used with PUTs and DELETEs"
>> }
>> }
>> },
>> {
>> // Additional events
>> }
>> ]
>> }
>>
>> Thanks
>> Ben
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> <mailto:[email protected]>
>> For additional commands, e-mail: [email protected]
>> <mailto:[email protected]>
>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>