ibudanaev-crunchyroll opened a new issue #3707:
URL: https://github.com/apache/hudi/issues/3707
Hi,
we are trying to use Hudi in aws Kinesis Data Analytics Studio along with
Flink.
The following python code produces data and inserts it into kinesis stream:
```
def getReferrer():
data = {}
now = datetime.datetime.now()
str_now = now.isoformat()
data['uuid'] = str(uuid.uuid4())
data['event_time'] = str_now
data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
price = random.random() * 100
data['price'] = round(price, 2)
return data
while True:
data = json.dumps(getReferrer())
print(data)
res = kinesis.put_record(
StreamName="stream-name",
Data=data,
PartitionKey="partitionkey")
time.sleep(1)
```
In Zeppelin, we create a table as a source ingesting the above generated
data:
```
%flink.ssql(type=update)
drop table stock_table;
CREATE TABLE stock_table (
uuid varchar(36),
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'stream-name',
'aws.region' = 'us-west-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');
```
Next, we create a table source for hudi on S3:
```
%flink.ssql(type=update)
drop table t1;
CREATE TABLE t1(
uuid VARCHAR(36),
ticker VARCHAR(6),
price DOUBLE,
ts TIMESTAMP(3)
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'hudi',
'path' = 's3://bucket-name/hudi/',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by
default is COPY_ON_WRITE
);
```
And finally, we want to insert data from stream into s3 using hudi:
```
%flink.ssql(type=update)
-- insert data using values
INSERT INTO t1 select uuid, ticker, price, event_time as ts from stock_table;
```
We get the following error when trying to run the above code:
```
java.lang.NoClassDefFoundError:
org/apache/flink/table/connector/sink/DataStreamSinkProvider
at
org.apache.hudi.table.HoodieTableSink.getSinkRuntimeProvider(HoodieTableSink.java:81)
at
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:69)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:108)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
at
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.connector.sink.DataStreamSinkProvider
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 38 more
```
We tried all the jar files as custom connectors,
`hudi-flink-bundle_2.11-0.9.0.jar` and `hudi-flink-bundle_2.11-0.8.0.jar` and
`hudi-flink-bundle_2.11-0.7.0.jar` downloaded from
[here](https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.11/).
You advice/assistance in this matter is much appreciated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]