Thunder, I think setting "job.coordinator.system" or "job.default.system" to be the name of your default system (in configs) solves the problem without code changes.
Then again, I that its reasonable to expect the default system (set with "withDefaultSystem(systemDescriptor)") to get picked up when accessing the coordinator stream when a system is not explicitly set via the above configs. I created this ticket to track this feature: https://issues.apache.org/jira/browse/SAMZA-2436 Thanks, Abhishek On Thu, Jan 9, 2020 at 8:02 AM Thunder Stumpges <thunder.stump...@gmail.com> wrote: > In case someone else runs into this, we had to change the constructor we > used for the LocalApplicationRunner, and explicitly pass in the " new > CoordinatorStreamMetadataStoreFactory()'. > > On Mon, Jan 6, 2020 at 3:18 PM Thunder Stumpges <tstump...@ntent.com> > wrote: > > > Hey dev team. Just upgrading our stand alone low-level tasks to Samza > 1.3. > > We use the LocalApplicationRunner and initialize most of our application > > within SamzaApplication.describe() including setting up > > "withDefaultSystem(systemDescriptor)" > > > > However it seems that earlier on in the process, the > > LocalApplicationRunner constructor is calling > > "getDefaultCoordinatorStreamStoreFactory" to pick the > MetadataStoreFactory, > > and the "coordinatorSystemName" is not set yet (nor is it ever set > > explicitly now that we use the "default system"). > > > > The condition that is failing is in LocalApplicationRunner line 138: > > > > > > // TODO: Remove restriction to only ZkJobCoordinator after next phase of > > metadata store abstraction. > > if (StringUtils.isNotBlank(coordinatorSystemName) && > > > ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) > > { > > return new CoordinatorStreamMetadataStoreFactory(); > > } > > > > coordinatorSystemName is null above. (jobCoordinatorFactoryClassName IS > > set to ZkJobCoordinatorFactory) > > > > Please advise! > > Thanks, > > Thunder > > > > >