The inconsistency is resolved; I can see rules getting fired consistently
and reliably across a File based source, and a steam (of file data), and a
JMS stream. I am running more tests till 50M facts/events, but looks like
it is working now.

Regards,
Ajit

On Mon, Jul 6, 2015 at 11:59 AM, Ajit Bhingarkar <a...@capiot.com> wrote:

> Jorn,
>
> Thanks for your response.
>
> I am pasting below a snippet of code which shows drools integration when
> facts/events are picked up after reading through a File
> (FileReader->readLine()), it works as expected and I have tested it for
> wide range of record data in a File.
>
> Same code doesn't work when I try to do same thing on a streaming incoming
> data generated out of same File. I have used several batch durations, from
> 1 to 50 seconds. Every execution shows that rules did not fire on some
> valid facts/events.
>
> I also thought of it being an issue with multi-threading; but that is not
> the case as well. I have verified.
>
> Hope this provides with with all the relevant information.
>
> Regards,
> Ajit
>
>
> /*
>  * Copyright (c) 2015. Capiot Software India Pvt Ltd.
>  * Author: a...@capiot.com
>  */
>
> package com.capiot.analytics.spark.file;
>
> import com.capiot.analytics.spark.Person;
> import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil;
> import com.capiot.analytics.spark.util.TrackingAgendaEventListener;
> import org.apache.spark.api.java.function.VoidFunction;
> import org.drools.runtime.StatefulKnowledgeSession;
>
> import java.io.BufferedWriter;
> import java.io.PrintWriter;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.atomic.AtomicInteger;
>
> public class RuleExceutionFunction implements VoidFunction <Person>
> {
>     static StatefulKnowledgeSession knowledgeSession;
>     static List<Person> customersWithOffers =
> Collections.synchronizedList(new ArrayList());
>     //static Map<Integer, String> map = Collections.synchronizedMap(new
> HashMap());
>     static TrackingAgendaEventListener agendaEventListener = new
> TrackingAgendaEventListener();
>
>     static AtomicInteger count = new AtomicInteger(0);
>
>     //private static final File f = new
> File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv");
>     private static PrintWriter pw = null;
>     private static PrintWriter pwp = null;
>
>     private static final long serialVersionUID = 2370;
>
>     public RuleExceutionFunction()  throws Exception
>     {
>         if (knowledgeSession == null)
>         {
>             knowledgeSession =
> KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl");
>             knowledgeSession.addEventListener(agendaEventListener);
>
>             {
>                 pw = new PrintWriter(new BufferedWriter(new
> java.io.FileWriter
>
> ("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv")
>                 ), true);
>
>                 pwp = new PrintWriter(new BufferedWriter(new
> java.io.FileWriter
>
>  ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" +
>                                                           ".csv")
>                 ), true);
>             }
>         }
>     }
>
>
>     @Override
>     public void call(Person person) throws Exception
>     {
>         //List<Person> facts = rdd.collect();
>         //Apply rules on facts here
>         //synchronized (this)
>         {
>             knowledgeSession.insert(person);
>             int i = knowledgeSession.fireAllRules();
>         }
>
>         //System.out.println("++++++ '"+
> agendaEventListener.activationsToString());
>
>         if (person.hasOffer())
>         {
>             customersWithOffers.add(person);
>             //Files.append(person.toString() +
> System.getProperty("line.separator"), f, Charset.defaultCharset());
>             pw.println(person.toString());
>         }
>
>         pwp.println(person.toString());
>
>         count.getAndIncrement();
>     }
>
>     public StatefulKnowledgeSession getSession()
>     {
>         return knowledgeSession;
>     }
>
>     public List<Person> getCustomersWithOffers()
>     {
>         return customersWithOffers;
>     }
> }
>
>
> On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Can you provide the result set you are using and specify how you
>> integrated the drools engine?
>> Drools basically is based on a large shared memory. Hence, if you have
>> several tasks in Shark they end up using different shared memory areas.
>> A full integration of drools requires some sophisticated design and
>> probably rewriting of the rules evaluation algorithm, so you probably have
>> to rewrite that engine from scratch.
>>
>> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <a...@capiot.com> a écrit :
>>
>>> Hi,
>>>
>>> I am trying to integrate Drools rules API with Spark so that the
>>> solution could solve few CEP centric use cases.
>>>
>>> When I read data from a local file (simple FileWriter -> readLine()), I
>>> see that all my rules are reliably fired and everytime I get the results as
>>> expected. I have tested with file record sizes from 5K to 5M; results are
>>> as expected, every time.
>>>
>>> However when I try to receive same data through a stream (I created a
>>> simple ServerSocket, and am reading the file and writing to the socket line
>>> by line) using a custom socket receiver; even though I see that data is
>>> received at the custom receiver's end; perhaps store() API has an issue,
>>> and data is not reliably persisted, (I am
>>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>>>
>>> Result is that my rules don't get fired reliably, and everytime I get a
>>> different result. It also could be internal data loss within Spark engine.
>>>
>>> I am using a a single Windows based server, and latest 1.4.0.
>>>
>>> I have attached code for custom receiver, and my socket server which
>>> streams file data as text.
>>>
>>> Can someone pls shed more light on this issue? I have read in the
>>> documentation that a reliable receiver needs to implement
>>> *store(multi-records)*, but couldn't find any example.
>>>
>>> Many thanks in advance for any inputs or suggestions for trying out.
>>>
>>> Regards,
>>> Ajit
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to