消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) <zhiyuan.c...@huawei.com>
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
---- 回复的原邮件 ----
发件人

Chenzhiyuan(HR)<zhiyuan.c...@huawei.com><mailto:zhiyuan.c...@huawei.com>

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org<user-zh@flink.apache.org><mailto:user-zh@flink.apache.org>

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

  定义的表如下:
  CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
  "beforeData": [],
   "byteSize": 272,
   "columnNumber": 32,
   "data": [{
       "byteSize": 8,
       "columnName": "APPLY_PERSON_ID",
       "rawData": 10017,
       "type": "LONG"
   }, {
       "byteSize": 12,
       "columnName": "UPDATE_SALARY",
       "rawData": "11000.000000",
       "type": "DOUBLE"
   }, {
       "byteSize": 11,
       "columnName": "UP_AMOUNT",
       "rawData": "1000.000000",
       "type": "DOUBLE"
   }, {
       "byteSize": 3,
       "columnName": "CURRENCY",
       "rawData": "CNY",
       "type": "STRING"
   }, {
       "byteSize": 32,
       "columnName": "EXCHANGE_RATE",
       "rawData": "1.000000000000000000000000000000",
       "type": "DOUBLE"
   },  {
       "byteSize": 11,
       "columnName": "DEDUCTED_ACCOUNT",
       "rawData": "1000.000000",
       "type": "DOUBLE"
   }, {
       "byteSize": 1,
       "columnName": "ENTER_AT_PROCESS",
       "rawData": "Y",
       "type": "STRING"
   }],
   "dataCount": 0,
   "dataMetaData": {
       "connector": "mysql",
       "pos": 1000368076,
       "row": 0,
       "ts_ms": 1625565737000,
       "snapshot": "false",
       "db": "testdb",
       "table": "flow_person_t"
   },
   "key": "APPLY_PERSON_ID",
   "memorySize": 1120,
   "operation": "insert",
   "rowIndex": -1,
   "timestamp": "1970-01-01 00:00:00"
}

回复