[ 
https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fengge updated FLINK-10915:
---------------------------
    Description: 
{code:java}
(deftype Cflatmapfunction [] FlatMapFunction
  (flatMap [this value collector]
    (log/info "value:" (type value) value )
    (let [tomap (into {} value)
          {:keys [shopid  shopname]} (ym/readstring (get tomap "body"))]
      ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" "(ym/readstring 
body)" 1))
      (.collect collector (Tuple3. shopid shopname (int 1)))
      )
    ))

;;;The problem is here... Clojure realizes that FlatMapFunction will block in 
clusters.  but local jvm run is ok ..

{code}
{code:java}
(defn -main [& args]
  (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
        _ (.enableCheckpointing flink-env 13000)
        sources (.addSource flink-env
                            (RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
                                             (gen-consumer-properties)))
        _ (.name sources "ririri")
        _ (.setParallelism sources 1)
        ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
        _ (.name ednds "ccc")
        _ (.setParallelism ednds 1)
        _   (.print ednds)
        ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
10))  2)
        ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
(Time/minutes 2))  2)
        ]
    (prn "开始有状态的流式计算1" flink-env)
    ;(.setParallelism ds 1)
    ;(.setParallelism ednds 1)
    ;(.print counts)
    ;(.print secondcounts)
    (.execute flink-env"rocketmq-flink-feng2")
    )
  )

{code}
 

  was:
{code:java}
(defn -main [& args]
  (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
        _ (.enableCheckpointing flink-env 13000)
        sources (.addSource flink-env
                            (RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
                                             (gen-consumer-properties)))
        _ (.name sources "ririri")
        _ (.setParallelism sources 1)
        ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
        _ (.name ednds "ccc")
        _ (.setParallelism ednds 1)
        _   (.print ednds)
        ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
10))  2)
        ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
(Time/minutes 2))  2)
        ]
    (prn "开始有状态的流式计算1" flink-env)
    ;(.setParallelism ds 1)
    ;(.setParallelism ednds 1)
    ;(.print counts)
    ;(.print secondcounts)
    (.execute flink-env"rocketmq-flink-feng2")
    )
  )

{code}
 


> clojure   context.collectWithTimestamp  Will be blocked.
> --------------------------------------------------------
>
>                 Key: FLINK-10915
>                 URL: https://issues.apache.org/jira/browse/FLINK-10915
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.6.2
>            Reporter: fengge
>            Priority: Minor
>
> {code:java}
> (deftype Cflatmapfunction [] FlatMapFunction
>   (flatMap [this value collector]
>     (log/info "value:" (type value) value )
>     (let [tomap (into {} value)
>           {:keys [shopid  shopname]} (ym/readstring (get tomap "body"))]
>       ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" 
> "(ym/readstring body)" 1))
>       (.collect collector (Tuple3. shopid shopname (int 1)))
>       )
>     ))
> ;;;The problem is here... Clojure realizes that FlatMapFunction will block in 
> clusters.  but local jvm run is ok ..
> {code}
> {code:java}
> (defn -main [& args]
>   (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
>         _ (.enableCheckpointing flink-env 13000)
>         sources (.addSource flink-env
>                             (RocketMQSource. 
> (SimpleKeyValueDeserializationSchema. "msgid" "body") 
>                                              (gen-consumer-properties)))
>         _ (.name sources "ririri")
>         _ (.setParallelism sources 1)
>         ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
>         _ (.name ednds "ccc")
>         _ (.setParallelism ednds 1)
>         _   (.print ednds)
>         ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) 
> (Time/seconds 10))  2)
>         ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
> (Time/minutes 2))  2)
>         ]
>     (prn "开始有状态的流式计算1" flink-env)
>     ;(.setParallelism ds 1)
>     ;(.setParallelism ednds 1)
>     ;(.print counts)
>     ;(.print secondcounts)
>     (.execute flink-env"rocketmq-flink-feng2")
>     )
>   )
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to