davidzollo commented on issue #8388:
URL: https://github.com/apache/seatunnel/issues/8388#issuecomment-2673812869

   Streaming task from Kafka to Doris
   
   ```
   env {
     execution.parallelism = 4 # It is recommended to adjust according to the 
number of Kafka partitions, keeping it consistent with the partition count
     job.mode = "STREAMING"
     checkpoint.interval = 30000
     checkpoint.timeout = 600000
     # Your current rate limits seem high but reasonable, ~700MB/s
     read_limit.bytes_per_second=700000000
     read_limit.rows_per_second=40000
   }
   
   source {
     
       Kafka {
         result_table_name = "kafka_log"
         #Kafka server address
         bootstrap.servers = "xxxxx"
         topic = "xxxx"
         consumer.group = "kafka2table"
         start_mode = "earliest"
         kafka.config = {
               "fetch.min.bytes" = "1048576" # 1MB, increase the minimum batch 
fetch size
               "fetch.max.wait.ms" = "500"    # Wait time when data is 
insufficient
               "max.partition.fetch.bytes" = "5242880" # 5MB, maximum data 
fetch per partition
               "max.poll.records" = "5000"     # Maximum number of records per 
poll
               "isolation.level" = "read_committed" # Ensure data consistency
         }
         format = json
         schema={
           fields={
               ev=STRING
               pg=STRING
               uuid=STRING
               userId=bigint
               fromDevice=STRING
               ip=STRING
               source=STRING
               np=STRING
               lp=STRING
               tg=STRING
               ch=STRING
               v=STRING
               nt=STRING
               wifi=STRING
               dbd=STRING
               dmd=STRING
               bs=STRING
               browser_version=STRING
               ext=STRING
               sid=STRING
               timestamp=bigint
               reporttime=bigint
           }
         }
     }
   }
   
   transform {
     Sql {
       source_table_name = "kafka_log"
       result_table_name = "log"
       query = "select ev as event,pg as page,uuid,userId as userid,fromDevice 
as platform,ip,source,np as nextpage,lp as lastpage,tg as target,ch as 
channel,v as version,nt as network,wifi,dbd as device_brand,dmd as 
device_model,bs as browser,browser_version,ext as extra,sid as 
sessionid,timestamp,reporttime,CURRENT_DATE as dt from kafka_log"
     }
   }
   
   sink {
     
     Doris {
       source_table_name = "log"
       
       fenodes = "dxxx"
       username = xxx
       password = "xxxx"
       table.identifier = "ods.ods_log"
       sink.label-prefix = "log"
   
       sink.enable-2pc = "false"
       doris.batch.size = 500000
       sink.buffer-size = 104857600
       sink.max-retries = 5
   
       doris.config {
           format="json"
           read_json_by_line="true"
       }
     }
   }
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to