[ https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
fengge updated FLINK-10915: --------------------------- Description: {code:java} (deftype Cflatmapfunction [] FlatMapFunction (flatMap [this value collector] (log/info "value:" (type value) value ) (let [tomap (into {} value) {:keys [shopid shopname]} (ym/readstring (get tomap "body"))] ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" "(ym/readstring body)" 1)) (.collect collector (Tuple3. shopid shopname (int 1))) ) )) ;;;The problem is here... Clojure realizes that FlatMapFunction will block in clusters. but local jvm run is ok .. {code} {code:java} (defn -main [& args] (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing flink-env 13000) sources (.addSource flink-env (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute flink-env"rocketmq-flink-feng2") ) ) {code} was: {code:java} (defn -main [& args] (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing flink-env 13000) sources (.addSource flink-env (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute flink-env"rocketmq-flink-feng2") ) ) {code} > clojure context.collectWithTimestamp Will be blocked. > -------------------------------------------------------- > > Key: FLINK-10915 > URL: https://issues.apache.org/jira/browse/FLINK-10915 > Project: Flink > Issue Type: Improvement > Components: Client > Affects Versions: 1.6.2 > Reporter: fengge > Priority: Minor > > {code:java} > (deftype Cflatmapfunction [] FlatMapFunction > (flatMap [this value collector] > (log/info "value:" (type value) value ) > (let [tomap (into {} value) > {:keys [shopid shopname]} (ym/readstring (get tomap "body"))] > ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" > "(ym/readstring body)" 1)) > (.collect collector (Tuple3. shopid shopname (int 1))) > ) > )) > ;;;The problem is here... Clojure realizes that FlatMapFunction will block in > clusters. but local jvm run is ok .. > {code} > {code:java} > (defn -main [& args] > (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment) > _ (.enableCheckpointing flink-env 13000) > sources (.addSource flink-env > (RocketMQSource. > (SimpleKeyValueDeserializationSchema. "msgid" "body") > (gen-consumer-properties))) > _ (.name sources "ririri") > _ (.setParallelism sources 1) > ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) > _ (.name ednds "ccc") > _ (.setParallelism ednds 1) > _ (.print ednds) > ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) > (Time/seconds 10)) 2) > ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) > (Time/minutes 2)) 2) > ] > (prn "开始有状态的流式计算1" flink-env) > ;(.setParallelism ds 1) > ;(.setParallelism ednds 1) > ;(.print counts) > ;(.print secondcounts) > (.execute flink-env"rocketmq-flink-feng2") > ) > ) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)