The inconsistency is resolved; I can see rules getting fired consistently and reliably across a File based source, and a steam (of file data), and a JMS stream. I am running more tests till 50M facts/events, but looks like it is working now.
Regards, Ajit On Mon, Jul 6, 2015 at 11:59 AM, Ajit Bhingarkar <a...@capiot.com> wrote: > Jorn, > > Thanks for your response. > > I am pasting below a snippet of code which shows drools integration when > facts/events are picked up after reading through a File > (FileReader->readLine()), it works as expected and I have tested it for > wide range of record data in a File. > > Same code doesn't work when I try to do same thing on a streaming incoming > data generated out of same File. I have used several batch durations, from > 1 to 50 seconds. Every execution shows that rules did not fire on some > valid facts/events. > > I also thought of it being an issue with multi-threading; but that is not > the case as well. I have verified. > > Hope this provides with with all the relevant information. > > Regards, > Ajit > > > /* > * Copyright (c) 2015. Capiot Software India Pvt Ltd. > * Author: a...@capiot.com > */ > > package com.capiot.analytics.spark.file; > > import com.capiot.analytics.spark.Person; > import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil; > import com.capiot.analytics.spark.util.TrackingAgendaEventListener; > import org.apache.spark.api.java.function.VoidFunction; > import org.drools.runtime.StatefulKnowledgeSession; > > import java.io.BufferedWriter; > import java.io.PrintWriter; > import java.util.ArrayList; > import java.util.Collections; > import java.util.List; > import java.util.concurrent.atomic.AtomicInteger; > > public class RuleExceutionFunction implements VoidFunction <Person> > { > static StatefulKnowledgeSession knowledgeSession; > static List<Person> customersWithOffers = > Collections.synchronizedList(new ArrayList()); > //static Map<Integer, String> map = Collections.synchronizedMap(new > HashMap()); > static TrackingAgendaEventListener agendaEventListener = new > TrackingAgendaEventListener(); > > static AtomicInteger count = new AtomicInteger(0); > > //private static final File f = new > File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv"); > private static PrintWriter pw = null; > private static PrintWriter pwp = null; > > private static final long serialVersionUID = 2370; > > public RuleExceutionFunction() throws Exception > { > if (knowledgeSession == null) > { > knowledgeSession = > KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl"); > knowledgeSession.addEventListener(agendaEventListener); > > { > pw = new PrintWriter(new BufferedWriter(new > java.io.FileWriter > > ("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv") > ), true); > > pwp = new PrintWriter(new BufferedWriter(new > java.io.FileWriter > > ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" + > ".csv") > ), true); > } > } > } > > > @Override > public void call(Person person) throws Exception > { > //List<Person> facts = rdd.collect(); > //Apply rules on facts here > //synchronized (this) > { > knowledgeSession.insert(person); > int i = knowledgeSession.fireAllRules(); > } > > //System.out.println("++++++ '"+ > agendaEventListener.activationsToString()); > > if (person.hasOffer()) > { > customersWithOffers.add(person); > //Files.append(person.toString() + > System.getProperty("line.separator"), f, Charset.defaultCharset()); > pw.println(person.toString()); > } > > pwp.println(person.toString()); > > count.getAndIncrement(); > } > > public StatefulKnowledgeSession getSession() > { > return knowledgeSession; > } > > public List<Person> getCustomersWithOffers() > { > return customersWithOffers; > } > } > > > On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jornfra...@gmail.com> wrote: > >> Can you provide the result set you are using and specify how you >> integrated the drools engine? >> Drools basically is based on a large shared memory. Hence, if you have >> several tasks in Shark they end up using different shared memory areas. >> A full integration of drools requires some sophisticated design and >> probably rewriting of the rules evaluation algorithm, so you probably have >> to rewrite that engine from scratch. >> >> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <a...@capiot.com> a écrit : >> >>> Hi, >>> >>> I am trying to integrate Drools rules API with Spark so that the >>> solution could solve few CEP centric use cases. >>> >>> When I read data from a local file (simple FileWriter -> readLine()), I >>> see that all my rules are reliably fired and everytime I get the results as >>> expected. I have tested with file record sizes from 5K to 5M; results are >>> as expected, every time. >>> >>> However when I try to receive same data through a stream (I created a >>> simple ServerSocket, and am reading the file and writing to the socket line >>> by line) using a custom socket receiver; even though I see that data is >>> received at the custom receiver's end; perhaps store() API has an issue, >>> and data is not reliably persisted, (I am >>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended). >>> >>> Result is that my rules don't get fired reliably, and everytime I get a >>> different result. It also could be internal data loss within Spark engine. >>> >>> I am using a a single Windows based server, and latest 1.4.0. >>> >>> I have attached code for custom receiver, and my socket server which >>> streams file data as text. >>> >>> Can someone pls shed more light on this issue? I have read in the >>> documentation that a reliable receiver needs to implement >>> *store(multi-records)*, but couldn't find any example. >>> >>> Many thanks in advance for any inputs or suggestions for trying out. >>> >>> Regards, >>> Ajit >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >> >> >