jiangbiao910 opened a new issue, #6260:
URL: https://github.com/apache/hudi/issues/6260
Environment Description
Hudi version : 0.11.0
Flink version : 1.13.1
Hive version : 2.1.1-cdh6.2.0
Hadoop version : 3.0.0-cdh6.2.0
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : no
when I use **COW** , the error log :
Caused by: java.util.**NoSuchElementException: No value present in Option**
at org.apache.hudi.common.util.Option.get(Option.java:89)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:120)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:491)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:141)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:184)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
~[?:1.8.0_181]
at
org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:454)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:131)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.sink.bucket.BucketStreamWriteFunction.snapshotState(BucketStreamWriteFunction.java:100)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:157)
~[hudi-flink-bundle_2.11.jar:0.11.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1086)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1070)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1026)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:122)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:428)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
`create table if not exists kafka_source
(
uuid string ,
message_id string ,
vin string ,
ei string ,
en string ,
event_time string ,
n string ,
v string ,
send_time string ,
message_receive_time string ,
etlTime as PROCTIME()
) with (
'connector' = 'kafka',
'topic' = 'signal_detail_hudi_test',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'bigdata_es_test0728COW',
'scan.startup.mode' = 'group-offsets',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'json.encode.decimal-as-plain-number' = 'true'
)
;
CREATE TABLE ods_hudi_sink
(
uuid string ,
message_id string ,
vin string ,
ei string ,
en string ,
event_time string ,
n string ,
v string ,
send_time string ,
message_receive_time string ,
etl_update_ime string ,
dt string ,
hh string
)partitioned by (dt,hh)
WITH (
'connector' = 'hudi',
'path' =
'hdfs:///tmp/dev_zone_ods_es33_misc_signal_detail_hudi_test0728cow1',
'table.type' = 'COPY_ON_WRITE',
'changelog.enabled' = 'true',
'write.precombine.field' = 'message_receive_time',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'write.tasks' = '4',
'compaction.tasks' = '4',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'num_commits',
'compaction.delta_commits'='5',
'compaction.delta_seconds'='180',
'compaction.max_memory' = '1024',
'changelog.enabled' = 'true',
'read.streaming.enabled' = 'true',
'read.streaming.check.interval' = '3',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = '',
'hive_sync.db' = 'zone_test',
'hive_sync.table' = 'hudi_test0728cow1',
-- 'hive_sync.username'='hivetest',
'hive_sync.support_timestamp' = 'true',
'hoodie.cleaner.commits.retained' = '2',
'index.type'='BUCKET',
'hoodie.bucket.index.num.buckets'='3'
);`
the MOR type work normal, what can I do next?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]