Hi Guys,
     I am facing another integration challenge with flink-siddhi which i am not 
able to crack yet , i am trying to debug the flow with flink but its too much 
and taking a lot of time , being new to flink. My problem is basically when i 
publish message through kafka and the flink aplication/job has CEP processing 
operation using SiddiCEP , taskmanager is not processing the task with the 
operator having siddhi CEP. It is not even throwing any error anywhere. So i 
started debugging the taskmanager but i am not able to reach anywhere yet have 
been doing that from last 4 days with no luck. I have created a stackoverflow 
query which has more details on the program and application. 
Flink version : 1.13Scala: 2.11Java: 1.8I am using flink-siddhi library: 
haoch/flink-siddhi 2.11-0.2.2-snapshot version

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
haoch/flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application - 
haoch/flink-siddhi
 |

 |

 |




The link is :    Not able to process kafka json message with Flink siddhi 
library

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Not able to process kafka json message with Flink siddhi library

I am trying to create a simple application where the app will consume Kafka 
message do some cql transform and pu...
 |

 |

 |





If there is any pointer that can be given on the problem , it would save my day 
and life. So please kindly respond on the same.
Thanks in advance.
RegardsDipanjan    On Tuesday, May 25, 2021, 12:48:59 PM GMT+5:30, Dipanjan 
Mazumder <java...@yahoo.com> wrote:  
 
  Hi All,
    Found the solution , 
Problem: I was actually using an intermediate library to integrate siddhi with 
Flink (https://github.com/haoch/flink-siddhi) and i was creating a SiddhiCEP 
instance and then calling "define()" on that instance , while i was registering 
the extension on the created instance , but the define method was creating an 
internal SiddhiCEP instance and using that for processing. So i found that out 
by debugging the application. It is an implementation problem with the library 
itself.
Solution: i used from() method on the created SiddhiCEP instance instead of 
define() and it was using the CEP i created for doing the rest of processing 
and the registered extension were accounted similarly and so they were 
recognised at runtime.
RegardsDipanjan
    On Friday, May 21, 2021, 01:51:09 PM GMT+5:30, Salva Alcántara 
<salcantara...@gmail.com> wrote:  
 
 Hi Dipanjan,

I agree with Till. If the extensions are are included in the jar for your
job, it should work. I was having the same doubts some weeks a go and can
confirm that as long as the jar includes those extensions, it works.

One thing I needed to do is to register the different extensions. For
example:

```
siddhiManager.setExtension("map:create", classOf[CreateFunctionExtension])
```

Regards,

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
    

Reply via email to