Bump. On Wed, Nov 15, 2017 at 12:34 AM, Shailesh Jain <shailesh.j...@stellapps.com > wrote:
> 1. Single data source because I have one kafka topic where all events get > published. But I am creating multiple data streams by applying a series of > filter operations on the single input stream, to generate device specific > data stream, and then assigning the watermarks on that stream. Will this > not result in downstream operators (for a particular device specific > stream) to get correct device specific watermarks? > > Job code: > > // eventStream initially contains all events from all devices > > for (int i = 0; i < TOTAL_DEVICES; i++) { > DataStream<Event> deviceOnlyEvents = eventStream.filter(new > DeviceFilter(i)) > .assignTimestampsAndWatermarks(new > EventTimeStampExtractor(Time.milliseconds(1))).setParallelism(1); > // apply CEP operators, and generate derived events > DataStream<Event> derivedEvents = PatternCreator. > addPatternsOnStream(deviceOnlyEvents, appId); > // also pass the stream through a process function (this gets > chained with the source operator as you had mentioned above) > DataStream<Event> stateTransitionEvents = > deviceOnlyEvents.process(new StateMachineOperator(appId)). > setParallelism(1); > // add sink to the new event streams > derivedEvents.union(stateTransitionEvents).addSink(kafkaSink); > } > > Comments? > > Thanks, > Shailesh > > > On Tue, Nov 14, 2017 at 6:57 PM, Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> 1. It seems like you have one single data source, not one per device. >> That might make a difference. Single data source followed by comap might >> create one single operator chain. If you want to go this way, please use my >> suggested solution c), since you will have troubles with handling >> watermarks anyway with single data source. >> >> 3. Nico, can you take a look at this one? Isn’t this a blob server issue? >> >> Piotrek >> >> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.j...@stellapps.com> >> wrote: >> >> 1. Okay, I understand. My code is similar to what you demonstrated. I >> have attached a snap of my job plan visualization. >> >> 3. Have attached the logs and exception raised (15min - configured akka >> timeout) after submitting the job. >> >> Thanks, >> Shailesh >> >> >> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> 1. >>> I’m not sure what is your code. However I have tested it and here is the >>> example with multiple streams in one job: >>> https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f >>> As expected it created 5 source threads (checked in the debugger) and is >>> printing 5 values to the output every seconds, so clearly those 5 sources >>> are executed simultaneously. >>> >>> Number of operators is not related to the number of threads. Number of >>> operator chains is. Simple pipelines like source -> map -> filter -> sink >>> will be chained and executed in one threads, please refer to the >>> documentation link in one of my earlier response. >>> >>> Can you share your job code? >>> >>> 2. Good point, I forgot to mention that. The job in my example will have >>> 5 operator chains, but because of task slot sharing, they will share one >>> single task slot. In order to distribute such job with parallelism 1 across >>> the cluster you have to define different slot sharing groups per each chain: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/datastream_api.html#task-chaining-and-resource-groups >>> Just set it on the sources. >>> >>> 3. Can you show the logs from job manager and task manager? >>> >>> 4. As long as you have enough heap memory to run your application/tasks >>> there is no upper limit for number of task slots. >>> >>> Piotrek >>> >>> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.j...@stellapps.com> >>> wrote: >>> >>> Hi Piotrek, >>> >>> I tried out option 'a' mentioned above, but instead of separate jobs, >>> I'm creating separate streams per device. Following is the test deployment >>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine): >>> >>> akka.client.timeout 15 min >>> jobmanager.heap.mb 1024 >>> jobmanager.rpc.address localhost >>> jobmanager.rpc.port 6123 >>> jobmanager.web.port 8081 >>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter >>> metrics.reporter.jmx.port 8789 >>> metrics.reporters jmx >>> parallelism.default 1 >>> taskmanager.heap.mb 1024 >>> taskmanager.memory.preallocate false >>> taskmanager.numberOfTaskSlots 4 >>> >>> The number of Operators per device stream is 4 (one sink function, 3 CEP >>> operators). >>> >>> Observations (and questions): >>> >>> 1. No. of threads (captured through JMX) is almost the same as the total >>> number of operators being created. This clears my original question in this >>> thread. >>> >>> 2. Even when the number of task slots is 4, on web ui, it shows 3 slots >>> as free. Is this expected? Why are the subtasks not being distributed >>> across slots? >>> >>> 3. Job deployment hangs (never switches to RUNNING) when the number of >>> devices is greater than 5. Even on increasing the akka client timeout, it >>> does not help. Will separate jobs being deployed per device instead of >>> separate streams help here? >>> >>> 4. Is there an upper limit on number task slots which can be configured? >>> I know that my operator state size at any given point in time would not be >>> very high, so it looks OK to deploy independent jobs which can be deployed >>> on the same task manager across slots. >>> >>> Thanks, >>> Shailesh >>> >>> >>> On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <pi...@data-artisans.com >>> > wrote: >>> >>>> Sure, let us know if you have other questions or encounter some issues. >>>> >>>> Thanks, Piotrek >>>> >>>> >>>> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.j...@stellapps.com> >>>> wrote: >>>> >>>> Thanks, Piotr. I'll try it out and will get back in case of any further >>>> questions. >>>> >>>> Shailesh >>>> >>>> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski < >>>> pi...@data-artisans.com> wrote: >>>> >>>>> 1. It’s a little bit more complicated then that. Each operator >>>>> chain/task will be executed in separate thread (parallelism >>>>> Multiplies that). You can check in web ui how was your job split into >>>>> tasks. >>>>> >>>>> 3. Yes that’s true, this is an issue. To preserve the individual >>>>> watermarks/latencies (assuming that you have some way to calculate them >>>>> individually per each device), you could either: >>>>> >>>>> a) have separate jobs per each device with parallelism 1. Pros: >>>>> independent failures/checkpoints, Cons: resource usage (number of threads >>>>> increases with number of devices, there are also other resources consumed >>>>> by each job), efficiency, >>>>> b) have one job with multiple data streams. Cons: resource usage >>>>> (threads) >>>>> c) ignore Flink’s watermarks, and implement your own code in place of >>>>> it. You could read all of your data in single data stream, keyBy >>>>> partition/device and manually handle watermarks logic. You could either >>>>> try >>>>> to wrap CEP/Window operators or copy/paste and modify them to suite your >>>>> needs. >>>>> >>>>> I would start and try out from a). If it work for your cluster/scale >>>>> then that’s fine. If not try b) (would share most of the code with a), and >>>>> as a last resort try c). >>>>> >>>>> Kostas, would you like to add something? >>>>> >>>>> Piotrek >>>>> >>>>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.j...@stellapps.com> >>>>> wrote: >>>>> >>>>> On 1. - is it tied specifically to the number of source operators or >>>>> to the number of Datastream objects created. I mean does the answer change >>>>> if I read all the data from a single Kafka topic, get a Datastream of all >>>>> events, and the apply N filters to create N individual streams? >>>>> >>>>> On 3. - the problem with partitions is that watermarks cannot be >>>>> different per partition, and since in this use case, each stream is from a >>>>> device, the latency could be different (but order will be correct almost >>>>> always) and there are high chances of loosing out on events on operators >>>>> like Patterns which work with windows. Any ideas for workarounds here? >>>>> >>>>> >>>>> Thanks, >>>>> Shailesh >>>>> >>>>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <pi...@data-artisans.com> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> 1. >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>> dev/parallel.html >>>>> >>>>> Number of threads executing would be roughly speaking equal to of the >>>>> number of input data streams multiplied by the parallelism. >>>>> >>>>> 2. >>>>> Yes, you could dynamically create more data streams at the job startup. >>>>> >>>>> 3. >>>>> Running 10000 independent data streams on a small cluster (couple of >>>>> nodes) will definitely be an issue, since even with parallelism set to 1, >>>>> there would be quite a lot of unnecessary threads. >>>>> >>>>> It would be much better to treat your data as a single data input >>>>> stream with multiple partitions. You could assign partitions between >>>>> source >>>>> instances based on parallelism. For example with parallelism 6: >>>>> - source 0 could get partitions 0, 6, 12, 18 >>>>> - source 1, could get partitions 1, 7, … >>>>> … >>>>> - source 5, could get partitions 5, 11, ... >>>>> >>>>> Piotrek >>>>> >>>>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.j...@stellapps.com> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I'm trying to understand the runtime aspect of Flink when dealing with >>>>> multiple data streams and multiple operators per data stream. >>>>> >>>>> Use case: N data streams in a single flink job (each data stream >>>>> representing 1 device - with different time latencies), and each of these >>>>> data streams gets split into two streams, of which one goes into a bunch >>>>> of >>>>> CEP operators, and one into a process function. >>>>> >>>>> Questions: >>>>> 1. At runtime, will the engine create one thread per data stream? Or >>>>> one thread per operator? >>>>> 2. Is it possible to dynamically create a data stream at runtime when >>>>> the job starts? (i.e. if N is read from a file when the job starts and >>>>> corresponding N streams need to be created) >>>>> 3. Are there any specific performance impacts when a large number of >>>>> streams (N ~ 10000) are created, as opposed to N partitions within a >>>>> single >>>>> stream? >>>>> >>>>> Are there any internal (design) documents which can help understanding >>>>> the implementation details? Any references to the source will also be >>>>> really helpful. >>>>> >>>>> Thanks in advance. >>>>> >>>>> Shailesh >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> <Screenshot-2017-11-14 Flink Plan Visualizer.png><flink-shailesh >> -jobmanager-0-shailesh.log><Exception> >> >> >> >