[ 
https://issues.apache.org/jira/browse/FLINK-32273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tan Kim updated FLINK-32273:
----------------------------
    Description: 
I'm trying to relay a topic from kafka to another kafka.

This is the original record in source topic.
{code:java}
"json": {
        "eventName": "event-ABC",
        ...
    } {code}
The source is json format and sink is avro format with confluent-schema 
registry.

Here is my code.

 
{code:java}
tableEnv.executeSql("CREATE TABLE source_table (..) WITH (
'connector'='kafka', 
'format'='json',
)")

tableEnv.executeSql("CREATE TABLE sink_table WITH (
'connector'='kafka',
'format'='avro-confluent',
..
) AS SELECT * FROM source_table") {code}
If I run this code without 'value.avro-confluent.subject' configuration, the 
record is something like this.
{code:java}
{
    "json": {
        "org.apache.flink.avro.generated.record_json": {
            "eventName": {
                "string": "event-ABC"
            },
           ..
         }
}      {code}
I don't understand why flink-avro inserts 
"org.apache.flink.avro.generated.record_json" between `json` and `eventName`.

Also `eventName` is not just 'event-ABC' but `string: event-ABC`.

 

Is this bug? or something I missed?

 

  was:
I'm trying to relay a topic from kafka to another kafka.

This is the original record in source topic.
{code:java}
"json": {
        "eventName": "event-ABC",
        ...
    } {code}
The source is json format and sink is avro format with confluent-schema 
registry.

Here is my code.

 
{code:java}
tableEnv.executeSql("CREATE TABLE source_table (..) WITH (
'connector'='kafka', 
'format'='json',
)")

tableEnv.executeSql("CREATE TABLE sink_table WITH (
'connector'='kafka',
'format'='avro-confluent',
..
) AS SELECT * FROM source_table") {code}
If I run this code without 'value.avro-confluent.subject' configuration, the 
record is some like this.
{code:java}
{
    "json": {
        "org.apache.flink.avro.generated.record_json": {
            "eventName": {
                "string": "event-ABC"
            },
           ..
         }
}      {code}
I don't understand why flink-avro inserts 
"org.apache.flink.avro.generated.record_json" between `json` and `eventName`.

Also `eventName` is not just 'event-ABC' but `string: event-ABC`.

 

Is this bug? or something I missed?

 


> What is org.apache.flink.avro.generated.record_json?
> ----------------------------------------------------
>
>                 Key: FLINK-32273
>                 URL: https://issues.apache.org/jira/browse/FLINK-32273
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>            Reporter: Tan Kim
>            Priority: Major
>
> I'm trying to relay a topic from kafka to another kafka.
> This is the original record in source topic.
> {code:java}
> "json": {
>         "eventName": "event-ABC",
>         ...
>     } {code}
> The source is json format and sink is avro format with confluent-schema 
> registry.
> Here is my code.
>  
> {code:java}
> tableEnv.executeSql("CREATE TABLE source_table (..) WITH (
> 'connector'='kafka', 
> 'format'='json',
> )")
> tableEnv.executeSql("CREATE TABLE sink_table WITH (
> 'connector'='kafka',
> 'format'='avro-confluent',
> ..
> ) AS SELECT * FROM source_table") {code}
> If I run this code without 'value.avro-confluent.subject' configuration, the 
> record is something like this.
> {code:java}
> {
>     "json": {
>         "org.apache.flink.avro.generated.record_json": {
>             "eventName": {
>                 "string": "event-ABC"
>             },
>            ..
>          }
> }      {code}
> I don't understand why flink-avro inserts 
> "org.apache.flink.avro.generated.record_json" between `json` and `eventName`.
> Also `eventName` is not just 'event-ABC' but `string: event-ABC`.
>  
> Is this bug? or something I missed?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to