Hey Roger,

Try sending an email to this address:
dev-unsubscr...@samza.apache.org

The links for Samza mailing lists are provided on the mailing lists page:
http://samza.apache.org/community/mailing-lists.html

Hope this helps.

-Jake

On Fri, Mar 16, 2018 at 8:51 PM, Roger Sill <recruiter.ro...@gmail.com>
wrote:

> please unsubscribe me from this samza list -thx
>
> Also - your advice - please.  I am using the instructions:
> REQUEST ADDRESSES FOR [UN]SUBSCRIBING
> To get off a list, send a message to:
> list-unsubscr...@apache.org
>
> so, I send it to samza-unsubscr...@apache.org
> and I get a bounce back that it does not recognize that
> mailbox.  What am I doing wrong???
> I need to do this with some other lists too and the same
> thing happens.
>
> Roger Sill
> Field Recruiter
> recruiter.ro...@gmail.com
> 408-926-6212
>
> -----Original Message-----
> From: Thunder Stumpges <tstump...@ntent.com>
> Sent: Friday, March 16, 2018 2:43 PM
> To: dev@samza.apache.org; Jagadish Venkatraman
> <jagadish1...@gmail.com>
> Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan
> <nickpa...@gmail.com>
> Subject: RE: Old style "low level" Tasks with alternative
> deployment model(s)
>
> Well I have my stand-alone application in docker and running
> in kubernetes. I think something isn't wired up all the way
> though, because my task never actually gets invoked. I see
> no errors, however I'm not getting the usual startup logs
> (checking existing offsets, "entering run loop"...) My logs
> look like this:
>
> 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties - Verifying properties
> 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties - Property client.id is
> overridden to samza_admin-test_stream_task-1
> 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties - Property
> metadata.broker.list is overridden to
> test-kafka-kafka.test-svc:9092
> 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties - Property
> request.timeout.ms is overridden to 30000
> 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO
> kafka.client.ClientUtils$ - Fetching metadata from broker
> BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with
> correlation id 0 for 1 topic(s)
> Set(dev_k8s.samza.test.topic)
> 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG
> kafka.network.BlockingChannel - Created socket with
> SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 179680
> (requested -1), SO_SNDBUF = 102400 (requested 102400),
> connectTimeoutMs = 30000.
> 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO
> kafka.producer.SyncProducer - Connected to
> test-kafka-kafka.test-svc:9092 for producing
> 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO
> kafka.producer.SyncProducer - Disconnecting from
> test-kafka-kafka.test-svc:9092
> 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG
> kafka.client.ClientUtils$ - Successfully fetched metadata
> for 1 topic(s) Set(dev_k8s.samza.test.topic)
> 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ -
> SystemStreamPartitionGrouper
> org.apache.samza.container.grouper.stream.GroupByPartition@1
> a7158cc has grouped the SystemStreamPartitions into 10 tasks
> with the following taskNames: [Partition 1, Partition 0,
> Partition 3, Partition 2, Partition 5, Partition 4,
> Partition 7, Partition 6, Partition 9, Partition 8]
> 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 0 is
> being assigned changelog partition 0.
> 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 1 is
> being assigned changelog partition 1.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 2 is
> being assigned changelog partition 2.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 3 is
> being assigned changelog partition 3.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 4 is
> being assigned changelog partition 4.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 5 is
> being assigned changelog partition 5.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 6 is
> being assigned changelog partition 6.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 7 is
> being assigned changelog partition 7.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 8 is
> being assigned changelog partition 8.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 9 is
> being assigned changelog partition 9.
> 2018-03-16 21:05:55 logback 50838
> [main-SendThread(10.0.127.114:2181)] DEBUG
> org.apache.zookeeper.ClientCnxn - Reading reply
> sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null
> serverPath:null finished:false header:: 23,4  replyHeader::
> 23,14024,0  request::
> '/app-test_stream_task-1/dev_test_stream_task-1-coordination
> Data/JobModelGeneration/jobModelVersion,T  response::
> ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878
> }
> 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO
> o.apache.samza.zk.ZkJobCoordinator -
> pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated new Job
> Model. Version = 1
> 2018-03-16 21:06:05 logback 60848
> [main-SendThread(10.0.127.114:2181)] DEBUG
> org.apache.zookeeper.ClientCnxn - Got ping response for
> sessionid: 0x1622c8b5fc01ac7 after 2ms
> 2018-03-16 21:06:15 logback 70856
> [main-SendThread(10.0.127.114:2181)] DEBUG
> org.apache.zookeeper.ClientCnxn - Got ping response for
> sessionid: 0x1622c8b5fc01ac7 after 1ms
> 2018-03-16 21:06:25 logback 80865
> [main-SendThread(10.0.127.114:2181)] DEBUG
> org.apache.zookeeper.ClientCnxn - Got ping response for
> sessionid: 0x1622c8b5fc01ac7 after 2ms ...
>
> The zk ping responses continue every 10 seconds, but no
> other activity or messages occur.
> It looks like it gets as far as confirming the JobModel and
> grouping the partitions, but nothing actually starts up.
>
> Any ideas?
> Thanks in advance!
> Thunder
>
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:tstump...@ntent.com]
> Sent: Thursday, March 15, 2018 16:35
> To: Jagadish Venkatraman <jagadish1...@gmail.com>
> Cc: dev@samza.apache.org; t...@recursivedream.com;
> yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
> Subject: RE: Old style "low level" Tasks with alternative
> deployment model(s)
>
> Thanks a lot for the info. I have something basically
> working at this point! I have not integrated it with Docker
> nor Kubernetes yet, but it does run from my local machine.
>
> I have determined that LocalApplicationRunner does NOT do
> config rewriting. I had to write my own little
> "StandAloneApplicationRunner" that handles the "main"
> entrypoint. It does command parsing using CommandLine, load
> config from ConfigFactory, and perform rewriting before
> creating the new instance of LocalApplicationRunner. This is
> all my StandAloneApplicationRunner contains:
>
>
> object StandAloneSamzaRunner extends App with LazyLogging {
>
>   // parse command line args just like JobRunner.
>   val cmdline = new ApplicationRunnerCommandLine
>   val options = cmdline.parser.parse(args: _*)
>   val config = cmdline.loadConfig(options)
>
>   val runner = new
> LocalApplicationRunner(Util.rewriteConfig(config))
>   runner.runTask()
>   runner.waitForFinish()
> }
>
> The only config settings I needed to make to use this runner
> were (easily configured due to our central Consul config
> system and our rewriter) :
>
> # use the ZK based job coordinator
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinator
> Factory
> # need to use GroupByContainerIds instead of
> GroupByContainerCount
> task.name.grouper.factory=org.apache.samza.container.grouper
> .task.GroupByContainerIdsFactory
> # ZKJC config
> job.coordinator.zk.connect=<our_zk_connection>
>
> I did run into one potential problem; as you see above, I
> have started the task using runTask() and then to prevent my
> main method from returning, I have called waitForFinish().
> The first time I ran it, the job itself failed because I had
> forgotten to override the task grouper, and container count
> was pulled from our staging environment. There are some
> failures logged and it appears the JobCoordinator fails, but
> it never returns from waitForFinish. Stack trace and
> continuation of log is below:
>
> 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR
> o.a.s.zk.ScheduleAfterDebounceTime - Execution of action:
> OnProcessorChange failed.
> java.lang.IllegalArgumentException: Your container count (4)
> is larger than your task count (2). Can't have containers
> with nothing to do, so aborting.
>        at
> org.apache.samza.container.grouper.task.GroupByContainerCoun
> t.validateTasks(GroupByContainerCount.java:212)
>        at
> org.apache.samza.container.grouper.task.GroupByContainerCoun
> t.group(GroupByContainerCount.java:62)
>        at
> org.apache.samza.container.grouper.task.TaskNameGrouper.grou
> p(TaskNameGrouper.java:56)
>        at
> org.apache.samza.coordinator.JobModelManager$.readJobModel(J
> obModelManager.scala:266)
>        at
> org.apache.samza.coordinator.JobModelManager.readJobModel(Jo
> bModelManager.scala)
>        at
> org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(ZkJ
> obCoordinator.java:306)
>        at
> org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJ
> obCoordinator.java:197)
>        at
> org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerIm
> pl.lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
>        at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getSche
> duleableAction$0(ScheduleAfterDebounceTime.java:134)
>        at
> java.util.concurrent.Executors$RunnableAdapter.call$$$captur
> e(Executors.java:511)
>        at
> java.util.concurrent.Executors$RunnableAdapter.call(Executor
> s.java)
>        at
> java.util.concurrent.FutureTask.run$$$capture(FutureTask.jav
> a:266)
>        at
> java.util.concurrent.FutureTask.run(FutureTask.java)
>        at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
> tureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>        at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
> tureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>        at java.lang.Thread.run(Thread.java:745)
> 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> o.a.samza.processor.StreamProcessor - Container is not
> instantiated yet.
> 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> org.I0Itec.zkclient.ZkClient - Closing ZkClient...
> 2018-03-15 22:34:32 logback 77789
> [ZkClient-EventThread-15-10.0.127.114:2181] INFO
> org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event
> thread.
>
> And then the application continues on with metric reporters,
> and other debug logging (not actually running the task
> though)
>
> Thanks in advance for the guidance, this has been easier
> than I imagined! I'll report back when I get more of the
> Dockerization/Kubernetes running and test it a bit more.
> Cheers,
> Thunder
>
>
> From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com]
> Sent: Thursday, March 15, 2018 14:46
> To: Thunder Stumpges <tstump...@ntent.com>
> Cc: dev@samza.apache.org; t...@recursivedream.com;
> yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
> Subject: Re: Old style "low level" Tasks with alternative
> deployment model(s)
>
> >>  Thanks for the info on the tradeoffs. That makes a lot
> of sense. I am on-board with using ZkJobCoordinator, sounds
> like some good benefits over just the Kafka high-level
> consumer.
>
> This certainly looks like the simplest alternative.
>
> For your other questions, please find my answers inline.
>
> >> Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes)
> correct?
>
> Your understanding is correct. It directly instantiates the
> StreamProcessor, which in-turn creates and runs the
> SamzaContainer.
>
> >> Q2: If I use LocalApplicationRunner, I will need to code
> myself the loading and rewriting of the Config that is
> currently handled by JobRunner, correct?
>
> I don't think you'll need to do this. IIUC, the
> LocalApplicationRunner should automatically invoke rewriters
> and do the right thing.
>
> >>  Q3: Do I need to also handle coordinator stream(s) and
> storing of config that is done in JobRunner (I don't think
> so as the ?
>
> I don't think this is necessary either. The creation of
> coordinator stream and persisting configuration happens in
> the LocalApplicationRunner (more specifically in
> StreamManager#createStreams).
>
> >> Q4: Where/How do I specify the Container ID for each
> instance? Is there a config setting that I can pass, (or
> pull from an env variable and add to the config) ? I am
> assuming it is my responsibility to ensure that each
> instance is started with a unique container ID..?
>
> Nope, If you are using the ZkJobCoordinator, you need not
> have to worry about assigning IDs for each instance. The
> framework will automatically take care of generating IDs and
> reaching consensus by electing a leader. If you are curious
> please take a look at implementations of the
> ProcessorIdGenerator interface.
>
> Please let us know should you have further questions!
>
> Best,
> Jagdish
>
> On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges
> <tstump...@ntent.com<mailto:tstump...@ntent.com>> wrote:
>
> Thanks for the info on the tradeoffs. That makes a lot of
> sense. I am on-board with using ZkJobCoordinator, sounds
> like some good benefits over just the Kafka high-level
> consumer.
>
>
>
> To that end, I have made some notes on possible approaches
> based on the previous thread, and from my look into the
> code. I'd love to get feedback.
>
>
>
> Approach 1. Configure jobs to use "ProcessJobFactory" and
> run instances of the job using run-job.sh or using JobRunner
> directly.
>
> I don't think this makes sense from what I can see for a few
> reasons:
>
>   *   JobRunner is concerned with stuff I don't *think* we
> need:
>
>      *   coordinatorSystemProducer|Consumer,
>      *   writing/reading the configuration to the
> coordinator streams
>
>   *   ProcessJobFactory hard-codes the ID to "0" so I don't
> think that will work for multiple instances.
>
>
>
> Approach 2. Configure ZkJobCoordinator, GroupByContainerIds,
> and invoke LocalApplicationRunner.runTask()
>
>
>
>     Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes)
> correct?
>
>     Q2: If I use LocalApplicationRunner, I will need to code
> myself the loading and rewriting of the Config that is
> currently handled by JobRunner, correct?
>
>     Q3: Do I need to also handle coordinator stream(s) and
> storing of config that is done in JobRunner (I don't think
> so as the ?
>
>     Q4: Where/How do I specify the Container ID for each
> instance? Is there a config setting that I can pass, (or
> pull from an env variable and add to the config) ? I am
> assuming it is my responsibility to ensure that each
> instance is started with a unique container ID..?
>
> I am getting started on the above (Approach 2.), and looking
> closer at the code so I may have my own answers to my
> questions, but figured I should go ahead and ask now anyway.
> Thanks!
>
> -Thunder
>
>
> From: Jagadish Venkatraman
> [mailto:jagadish1...@gmail.com<mailto:jagadish1...@gmail.com
> >]
> Sent: Thursday, March 15, 2018 1:41
> To: dev@samza.apache.org<mailto:dev@samza.apache.org>;
> Thunder Stumpges
> <tstump...@ntent.com<mailto:tstump...@ntent.com>>;
> t...@recursivedream.com<mailto:t...@recursivedream.com>
> Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan
> <nickpa...@gmail.com<mailto:nickpa...@gmail.com>>
>
> Subject: Re: Old style "low level" Tasks with alternative
> deployment model(s)
>
> >> You are correct that this is focused on the higher-level
> API but
> >> doesn't
> preclude using the lower-level API. I was at the same point
> you were not long ago, in fact, and had a very productive
> conversation on the list
>
> Thanks Tom for linking the thread, and I'm glad that you
> were able to get Kubernetes integration working with Samza.
>
> >> If it is helpful for everyone, once I get the low-level
> API +
> >> ZkJobCoordinator + Docker +
> K8s working, I'd be glad to formulate an additional sample
> for hello-samza.
>
> @Thunder Stumpges:
> We'd be thrilled to receive your contribution. Examples,
> demos, tutorials etc.
> contribute a great deal to improving the ease of use of
> Apache Samza. I'm happy to shepherd design
> discussions/code-reviews in the open-source including
> answering any questions you may have.
>
>
> >> One thing I'm still curious about, is what are the
> drawbacks or
> >> complexities of leveraging the Kafka High-level consumer
> +
> >> PassthroughJobCoordinator in a stand-alone setup like
> this? We do
> >> have Zookeeper (because of kafka) so I think either would
> work. The
> >> Kafka High-level consumer comes with other nice tools for
> monitoring
> >> offsets, lag, etc
>
>
> @Thunder Stumpges:
>
> Samza uses a "Job-Coordinator" to assign your
> input-partitions among the different instances of your
> application s.t. they don't overlap. A typical way to solve
> this "partition distribution"
> problem is to have a single instance elected as a "leader"
> and have the leader assign partitions to the group.
> The ZkJobCoordinator uses Zk primitives to achieve this,
> while the YarnJC relies on Yarn's guarantee that there will
> be a singleton-AppMaster to achieve this.
>
> A key difference that separates the PassthroughJC from the
> Yarn/Zk variants is that it does _not_ attempt to solve the
> "partition distribution" problem. As a result, there's no
> leader-election involved. Instead, it pushes the problem of
> "partition distribution" to the underlying consumer.
>
> The PassThroughJc supports these 2 scenarios:
>
> 1. Consumer-managed partition distribution: When using the
> Kafka high-level consumer (or an AWS KinesisClientLibrary
> consumer) with Samza, the consumer manages partitions
> internally.
>
> 2. Static partition distribution: Alternately, partitions
> can be managed statically using configuration. You can
> achieve static partition assignment by implementing a custom
> SystemStreamPartitionGrouper<https://samza.apache.org/learn/
> documentation/0.8/api/javadocs/org/apache/samza/container/gr
> ouper/stream/SystemStreamPartitionGrouper.html
> <http://s.bl-1.com/h/ccJRHVJT?url=https://samza.apache.org/l
> earn/documentation/0.8/api/javadocs/org/apache/samza/contain
> er/grouper/stream/SystemStreamPartitionGrouper.html> > and
> TaskNameGrouper<https://github.com/apache/samza/blob/master/
> samza-core/src/main/java/org/apache/samza/container/grouper/
> task/TaskNameGrouper.java
> <http://s.bl-1.com/h/ccJRJbjW?url=https://github.com/apache/
> samza/blob/master/samza-core/src/main/java/org/apache/samza/
> container/grouper/task/TaskNameGrouper.java> >. Solutions in
> this category will typically require you to distinguish the
> various processors in the group by providing an "id" for
> each.
> Once the "id"s are decided, you can then statically compute
> assignments using a function (eg: modulo N).
> You can rely on the following mechanisms to provide this id:
>  - Configure each instance differently to have its own id
>  - Obtain the id from the cluster-manager. For instance,
> Kubernetes will provide each POD an unique id in the range
> [0,N). AWS ECS should expose similar capabilities via a REST
> end-point.
>
> >> One thing I'm still curious about, is what are the
> drawbacks or complexities of leveraging the Kafka High-level
> consumer + PassthroughJobCoordinator in a stand-alone setup
> like this?
>
> Leveraging the Kafka High-level consumer:
>
> The Kafka high-level consumer is not integrated into Samza
> just yet. Instead, Samza's integration with Kafka uses the
> low-level consumer because
> i) It allows for greater control in fetching data from
> individual brokers. It is simple and performant in-terms of
> the threading model to have one-thread pull from each
> broker.
> ii) It is efficient in memory utilization since it does not
> do internal-buffering of messages.
> iii) There's no overhead like Kafka-controller heart-beats
> that are driven by consumer.poll
>
> Since there's no built-in integration, you will have to
> build a new SystemConsumer if you need to integrate with the
> Kafka High-level consumer. Further, there's more a fair bit
> of complexity to manage in checkpointing.
>
> >> The Kafka High-level consumer comes with other nice tools
> for
> >> monitoring offsets, lag, etc
>
> Samza
> exposes<https://github.com/apache/samza/blob/master/samza-ka
> fka/src/main/scala/org/apache/samza/system/kafka/KafkaSystem
> ConsumerMetrics.scala
> <http://s.bl-1.com/h/ccJRJg5Y?url=https://github.com/apache/
> samza/blob/master/samza-kafka/src/main/scala/org/apache/samz
> a/system/kafka/KafkaSystemConsumerMetrics.scala> > the below
> metrics for lag-monitoring:
> - The current log-end offset for each partition
> - The last check-pointed offset for each partition
> - The number of messages behind the highwatermark of the
> partition
>
> Please let us know if you need help discovering these or
> integrating these with other systems/tools.
>
>
> Leveraging the Passthrough JobCoordinator:
>
> It's helpful to split this discussion on tradeoffs with
> PassthroughJC into 2 parts:
>
> 1. PassthroughJC + consumer managed partitions:
>
> - In this model, Samza has no control over
> partition-assignment since it's managed by the consumer.
> This means that stateful operations like joins that rely on
> partitions being co-located on the same task will not work.
> Simple stateless operations (eg: map, filter, remote
> lookups) are fine.
>
> - A key differentiator between Samza and other frameworks is
> our support for "host
> affinity<https://samza.apache.org/learn/documentation/0.14/y
> arn/yarn-host-affinity.html
> <http://s.bl-1.com/h/ccJRJlWb?url=https://samza.apache.org/l
> earn/documentation/0.14/yarn/yarn-host-affinity.html> >".
> Samza achieves this by assigning partitions to hosts taking
> data-locality into account. If the consumer can arbitrarily
> shuffle partitions, it'd be hard to support this
> affinity/locality. Often this is a key optimization when
> dealing with large stateful jobs.
>
> 2. PassthroughJC + static partitions:
>
> - In this model, it is possible to make stateful processing
> (including host affinity) work by carefully choosing how
> "id"s are assigned and computed.
>
> Recommendation:
>
> - Owing to the above subtleties, I would recommend that we
> give the ZkJobCoordinator + the built-in low-level Kafka
> integration a try.
> - If we hit snags down this path, we can certainly explore
> the approach with PassthroughJC + static partitions.
> - Using the PassthroughJC + consumer-managed distribution
> would be least preferable owing to the subtleties I outlined
> above.
>
> Please let us know should you have more questions.
>
> Best,
> Jagdish
>
> On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges
> <tstump...@ntent.com<mailto:tstump...@ntent.com>> wrote:
> Wow, what great timing, and what a great thread! I
> definitely have some good starters to go off of here.
>
> If it is helpful for everyone, once I get the low-level API
> + ZkJobCoordinator + Docker + K8s working, I'd be glad to
> formulate an additional sample for hello-samza.
>
> One thing I'm still curious about, is what are the drawbacks
> or complexities of leveraging the Kafka High-level consumer
> + PassthroughJobCoordinator in a stand-alone setup like
> this? We do have Zookeeper (because of kafka) so I think
> either would work. The Kafka High-level consumer comes with
> other nice tools for monitoring offsets, lag, etc....
>
> Thanks guys!
> -Thunder
>
> -----Original Message-----
> From: Tom Davis
> [mailto:t...@recursivedream.com<mailto:t...@recursivedream.com
> >]
> Sent: Wednesday, March 14, 2018 17:50
> To: dev@samza.apache.org<mailto:dev@samza.apache.org>
> Subject: Re: Old style "low level" Tasks with alternative
> deployment model(s)
>
> Hey there!
>
> You are correct that this is focused on the higher-level API
> but doesn't preclude using the lower-level API. I was at the
> same point you were not long ago, in fact, and had a very
> productive conversation on the list:
> you should look for "Question about custom
> StreamJob/Factory" in the list archive for the past couple
> months.
>
> I'll quote Jagadish Venkatraman from that thread:
>
> > For the section on the low-level API, can you use
> > LocalApplicationRunner#runTask()? It basically creates a
> new
> > StreamProcessor and runs it. Remember to provide
> task.class and set it
> > to your implementation of StreamTask or AsyncStreamTask.
> Please note
> > that this is an evolving API and hence, subject to change.
>
> I ended up just switching to the high-level API because I
> don't have any existing Tasks and the Kubernetes story is a
> little more straight forward there (there's only one
> container/configuration to deploy).
>
> Best,
>
> Tom
>
> Thunder Stumpges
> <tstump...@ntent.com<mailto:tstump...@ntent.com>> writes:
>
> > Hi all,
> >
> > We are using Samza (0.12.0) in about 2 dozen jobs
> implementing several
> > processing pipelines. We have also begun a significant
> move of other
> > services within our company to Docker/Kubernetes. Right
> now our
> > Hadoop/Yarn cluster has a mix of stream and batch "Map
> Reduce" jobs (many reporting and other batch processing
> jobs). We would really like to move our stream processing
> off of Hadoop/Yarn and onto Kubernetes.
> >
> > When I just read about some of the new progress in .13 and
> .14 I got
> > really excited! We would love to have our jobs run as
> simple libraries
> > in our own JVM, and use the Kafka High-Level-Consumer for
> partition distribution and such. This would let us
> "dockerfy" our application and run/scale in kubernetes.
> >
> > However as I read it, this new deployment model is ONLY
> for the
> > new(er) High Level API, correct? Is there a plan and/or
> resources for
> > adapting this back to existing low-level tasks ? How
> complicated of a task is that? Do I have any other options
> to make this transition easier?
> >
> > Thanks in advance.
> > Thunder
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
>
>

Reply via email to