[ 
https://issues.apache.org/jira/browse/HIVE-21720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ShrekerNil updated HIVE-21720:
------------------------------
    Description: 
I'm a fresh to hive, and when i used flume sink data to hive, the error occured:

2019-05-11 09:50:31,183 (hive-shive-call-runner-0) [ERROR - 
org.apache.hadoop.hive.ql.exec.DDLTask.failed(DDLTask.java:512)] 
org.apache.hadoop.hive.ql.metadata.HiveException: {color:#f79232}partition spec 
is invalid; field collection does not exist or is empty{color}
 at 
org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(Partition.java:130)
 at 
org.apache.hadoop.hive.ql.metadata.Hive.convertAddSpecToMetaPartition(Hive.java:1662)
 at org.apache.hadoop.hive.ql.metadata.Hive.createPartitions(Hive.java:1638)
 at org.apache.hadoop.hive.ql.exec.DDLTask.addPartitions(DDLTask.java:900)
 at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:339)
 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
 at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
 at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1638)
 at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1397)
 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1183)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:372)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
 at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:379)
 at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
 at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

 

and other errors fallows above:

org.apache.flume.sink.hive.{color:#f79232}HiveWriter$ConnectException: Failed 
connecting to EndPoint \{metaStoreUri='thrift://172.25.48.146:9083', 
database='test', table='user_increase', partitionVals=[, 190511] }{color}
 at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
 at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
 at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:295)
 at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
 at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
 at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
values=[, 190511]. Unable to get path for end point: [, 190511]
 at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
 at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
 at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:49)
 at 
org.apache.flume.sink.hive.HiveJsonSerializer.createRecordWriter(HiveJsonSerializer.java:54)
 at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
 ... 6 more
 Caused by: NoSuchObjectException(message:partition values=[, 190511])
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:56077)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:56045)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:55976)
 at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1776)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1761)
 at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1112)
 at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
 ... 10 more

 

kafka 数据:

{
 "platform":"platform-ios",
 "system":"system-ios",
 "brand":"apple",
 "model":"ios12",
 "collection":"wechart",
 "create_time":"20190509",
 "content":{
 "new_user":1,
 "full_link":"http://xxx.ooo.xom/sdjdjdjdjdjd";,
 "link_creator":"shreker",
 "target_enterprise_id":"thisismyid",
 "authorize_time":"1557405918974",
 "open_id":"nicebaby",
 "phone":"15809257921",
 "userprofile_id":"thisismyid",
 "join_time":"1557405990921",
 "join_result":"1",
 "failure_reason":""
 }
}

 

*Hive table creatation statement:*

REATE TABLE IF NOT EXISTS user_increase(
 platform STRING COMMENT "platform info - ios android",
 system STRING COMMENT "system info - platform + version",
 brand STRING COMMENT "phone brand",
 model STRING COMMENT "phone model",
 content STRING COMMENT "json string of statistics"
 ) COMMENT "wechat user increase data in sharing"
 PARTITIONED BY (collection STRING COMMENT "a flag to distinct statistics 
catalog in common flow", 
 create_time STRING COMMENT "timestamp when data just reported")
 CLUSTERED BY (platform) INTO 2 BUCKETS
 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
 LINES TERMINATED BY '\n'
 STORED AS ORC
 TBLPROPERTIES ('transactional'='true');

 

Configuration:
 # The configuration file needs to define the sources,
 # the channels and the sinks.
 # Sources, channels and sinks are defined per agent,
 # in this case called 'agent'

#source: skafka sink: shive channel: k2h
 agent.sources = skafka
 agent.sinks = shive
 agent.channels = k2h
 #skafka --> k2h
 agent.sources.skafka.channels = k2h
 #k2h --> shive
 agent.sinks.shive.channel = k2h

#describe the source
 agent.sources.skafka.type = org.apache.flume.source.kafka.KafkaSource
 agent.sources.skafka.batchSize = 5000
 agent.sources.skafka.batchDurationMillis = 2000
 agent.sources.skafka.kafka.bootstrap.servers = 172.25.48.176:9092
 agent.sources.skafka.kafka.topics = soft-device-statis

#use a channel which buffers events in memory
 agent.channels.k2h.type = memory
 agent.channels.k2h.capacity = 1000
 agent.channels.k2h.transactionCapacity = 100

#sinks type
 agent.sinks.shive.type = hive
 agent.sinks.shive.hive.metastore = thrift://172.25.48.146:9083
 agent.sinks.shive.hive.database = test
 agent.sinks.shive.hive.table = user_increase
 agent.sinks.shive.hive.partition = %\{platform},%Y%m%d
 agent.sinks.shive.useLocalTimeStamp = true
 agent.sinks.shive.round = true
 agent.sinks.shive.roundValue = 10
 agent.sinks.shive.roundUnit = minute
 agent.sinks.shive.serializer = json
  

Any idea for me ?

 

Thanks.

 

  was:
I'm a fresh to hive, and when i used flume sink data to hive, the error occured:

2019-05-11 09:50:31,183 (hive-shive-call-runner-0) [ERROR - 
org.apache.hadoop.hive.ql.exec.DDLTask.failed(DDLTask.java:512)] 
org.apache.hadoop.hive.ql.metadata.HiveException: {color:#f79232}partition spec 
is invalid; field collection does not exist or is empty{color}
 at 
org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(Partition.java:130)
 at 
org.apache.hadoop.hive.ql.metadata.Hive.convertAddSpecToMetaPartition(Hive.java:1662)
 at org.apache.hadoop.hive.ql.metadata.Hive.createPartitions(Hive.java:1638)
 at org.apache.hadoop.hive.ql.exec.DDLTask.addPartitions(DDLTask.java:900)
 at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:339)
 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
 at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
 at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1638)
 at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1397)
 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1183)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:372)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
 at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
 at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:379)
 at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
 at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

 

and other errors fallows above:

org.apache.flume.sink.hive.{color:#f79232}HiveWriter$ConnectException: Failed 
connecting to EndPoint {metaStoreUri='thrift://172.25.48.146:9083', 
database='test', table='user_increase', partitionVals=[, 190511] }{color}
 at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
 at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
 at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:295)
 at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
 at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
 at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
values=[, 190511]. Unable to get path for end point: [, 190511]
 at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
 at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
 at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:49)
 at 
org.apache.flume.sink.hive.HiveJsonSerializer.createRecordWriter(HiveJsonSerializer.java:54)
 at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
 ... 6 more
 Caused by: NoSuchObjectException(message:partition values=[, 190511])
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:56077)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:56045)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:55976)
 at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1776)
 at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1761)
 at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1112)
 at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
 ... 10 more

 

kafka 数据:

{
    "platform":"platform-ios",
    "system":"system-ios",
    "brand":"apple",
    "model":"ios12",
    "collection":"wechart",
    "create_time":"20190509",
    "content":{
        "new_user":1,
        
"full_link":"[http://txdev.ooooo/sdjdjdjdjdjd|http://txdev.xylink.xom/sdjdjdjdjdjd]";,
        "link_creator":"shreker",
        "target_enterprise_id":"thisismyid",
        "authorize_time":"1557405918974",
        "open_id":"nicebaby",
        "phone":"15809257921",
        "userprofile_id":"thisismyid",
        "join_time":"1557405990921",
        "join_result":"1",
        "failure_reason":""
    }
}

 

*Hive table creatation statement:*

REATE TABLE IF NOT EXISTS user_increase(
 platform STRING COMMENT "platform info - ios android",
 system STRING COMMENT "system info - platform + version",
 brand STRING COMMENT "phone brand",
 model STRING COMMENT "phone model",
 content STRING COMMENT "json string of statistics"
 ) COMMENT "wechat user increase data in sharing"
 PARTITIONED BY (collection STRING COMMENT "a flag to distinct statistics 
catalog in common flow", 
 create_time STRING COMMENT "timestamp when data just reported")
 CLUSTERED BY (platform) INTO 2 BUCKETS
 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
 LINES TERMINATED BY '\n'
 STORED AS ORC
 TBLPROPERTIES ('transactional'='true');

 

Configuration:
 # The configuration file needs to define the sources,
 # the channels and the sinks.
 # Sources, channels and sinks are defined per agent,
 # in this case called 'agent'

#source: skafka sink: shive channel: k2h
 agent.sources = skafka
 agent.sinks = shive
 agent.channels = k2h
 #skafka --> k2h
 agent.sources.skafka.channels = k2h
 #k2h --> shive
 agent.sinks.shive.channel = k2h

#describe the source
 agent.sources.skafka.type = org.apache.flume.source.kafka.KafkaSource
 agent.sources.skafka.batchSize = 5000
 agent.sources.skafka.batchDurationMillis = 2000
 agent.sources.skafka.kafka.bootstrap.servers = 172.25.48.176:9092
 agent.sources.skafka.kafka.topics = soft-device-statis

#use a channel which buffers events in memory
 agent.channels.k2h.type = memory
 agent.channels.k2h.capacity = 1000
 agent.channels.k2h.transactionCapacity = 100

#sinks type
 agent.sinks.shive.type = hive
 agent.sinks.shive.hive.metastore = thrift://172.25.48.146:9083
 agent.sinks.shive.hive.database = test
 agent.sinks.shive.hive.table = user_increase
 agent.sinks.shive.hive.partition = %\{platform},%Y%m%d
 agent.sinks.shive.useLocalTimeStamp = true
 agent.sinks.shive.round = true
 agent.sinks.shive.roundValue = 10
 agent.sinks.shive.roundUnit = minute
 agent.sinks.shive.serializer = json
  

Any idea for me ?

 

Thanks.

 


> HiveException: partition spec is invalid; field <partition> does not exist or 
> is empty
> --------------------------------------------------------------------------------------
>
>                 Key: HIVE-21720
>                 URL: https://issues.apache.org/jira/browse/HIVE-21720
>             Project: Hive
>          Issue Type: Bug
>         Environment: apache-flume-1.7.0-bin
>            Reporter: ShrekerNil
>            Priority: Major
>
> I'm a fresh to hive, and when i used flume sink data to hive, the error 
> occured:
> 2019-05-11 09:50:31,183 (hive-shive-call-runner-0) [ERROR - 
> org.apache.hadoop.hive.ql.exec.DDLTask.failed(DDLTask.java:512)] 
> org.apache.hadoop.hive.ql.metadata.HiveException: {color:#f79232}partition 
> spec is invalid; field collection does not exist or is empty{color}
>  at 
> org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(Partition.java:130)
>  at 
> org.apache.hadoop.hive.ql.metadata.Hive.convertAddSpecToMetaPartition(Hive.java:1662)
>  at org.apache.hadoop.hive.ql.metadata.Hive.createPartitions(Hive.java:1638)
>  at org.apache.hadoop.hive.ql.exec.DDLTask.addPartitions(DDLTask.java:900)
>  at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:339)
>  at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
>  at 
> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
>  at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1638)
>  at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1397)
>  at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1183)
>  at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049)
>  at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:372)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>  at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>  at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:379)
>  at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>  at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  
> and other errors fallows above:
> org.apache.flume.sink.hive.{color:#f79232}HiveWriter$ConnectException: Failed 
> connecting to EndPoint \{metaStoreUri='thrift://172.25.48.146:9083', 
> database='test', table='user_increase', partitionVals=[, 190511] }{color}
>  at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>  at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
>  at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:295)
>  at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
>  at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>  at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
> values=[, 190511]. Unable to get path for end point: [, 190511]
>  at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>  at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>  at 
> org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:49)
>  at 
> org.apache.flume.sink.hive.HiveJsonSerializer.createRecordWriter(HiveJsonSerializer.java:54)
>  at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>  ... 6 more
>  Caused by: NoSuchObjectException(message:partition values=[, 190511])
>  at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:56077)
>  at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:56045)
>  at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:55976)
>  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
>  at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1776)
>  at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1761)
>  at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1112)
>  at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>  ... 10 more
>  
> kafka 数据:
> {
>  "platform":"platform-ios",
>  "system":"system-ios",
>  "brand":"apple",
>  "model":"ios12",
>  "collection":"wechart",
>  "create_time":"20190509",
>  "content":{
>  "new_user":1,
>  "full_link":"http://xxx.ooo.xom/sdjdjdjdjdjd";,
>  "link_creator":"shreker",
>  "target_enterprise_id":"thisismyid",
>  "authorize_time":"1557405918974",
>  "open_id":"nicebaby",
>  "phone":"15809257921",
>  "userprofile_id":"thisismyid",
>  "join_time":"1557405990921",
>  "join_result":"1",
>  "failure_reason":""
>  }
> }
>  
> *Hive table creatation statement:*
> REATE TABLE IF NOT EXISTS user_increase(
>  platform STRING COMMENT "platform info - ios android",
>  system STRING COMMENT "system info - platform + version",
>  brand STRING COMMENT "phone brand",
>  model STRING COMMENT "phone model",
>  content STRING COMMENT "json string of statistics"
>  ) COMMENT "wechat user increase data in sharing"
>  PARTITIONED BY (collection STRING COMMENT "a flag to distinct statistics 
> catalog in common flow", 
>  create_time STRING COMMENT "timestamp when data just reported")
>  CLUSTERED BY (platform) INTO 2 BUCKETS
>  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
>  LINES TERMINATED BY '\n'
>  STORED AS ORC
>  TBLPROPERTIES ('transactional'='true');
>  
> Configuration:
>  # The configuration file needs to define the sources,
>  # the channels and the sinks.
>  # Sources, channels and sinks are defined per agent,
>  # in this case called 'agent'
> #source: skafka sink: shive channel: k2h
>  agent.sources = skafka
>  agent.sinks = shive
>  agent.channels = k2h
>  #skafka --> k2h
>  agent.sources.skafka.channels = k2h
>  #k2h --> shive
>  agent.sinks.shive.channel = k2h
> #describe the source
>  agent.sources.skafka.type = org.apache.flume.source.kafka.KafkaSource
>  agent.sources.skafka.batchSize = 5000
>  agent.sources.skafka.batchDurationMillis = 2000
>  agent.sources.skafka.kafka.bootstrap.servers = 172.25.48.176:9092
>  agent.sources.skafka.kafka.topics = soft-device-statis
> #use a channel which buffers events in memory
>  agent.channels.k2h.type = memory
>  agent.channels.k2h.capacity = 1000
>  agent.channels.k2h.transactionCapacity = 100
> #sinks type
>  agent.sinks.shive.type = hive
>  agent.sinks.shive.hive.metastore = thrift://172.25.48.146:9083
>  agent.sinks.shive.hive.database = test
>  agent.sinks.shive.hive.table = user_increase
>  agent.sinks.shive.hive.partition = %\{platform},%Y%m%d
>  agent.sinks.shive.useLocalTimeStamp = true
>  agent.sinks.shive.round = true
>  agent.sinks.shive.roundValue = 10
>  agent.sinks.shive.roundUnit = minute
>  agent.sinks.shive.serializer = json
>   
> Any idea for me ?
>  
> Thanks.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to