mynameborat commented on a change in pull request #912: SEP-19 : Refactoring sideInputs from SamzaContainer to ContainerStorageManager URL: https://github.com/apache/samza/pull/912#discussion_r260978075
########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -148,32 +205,114 @@ public ContainerStorageManager(ContainerModel containerModel, StreamMetadataCach this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions; this.streamMetadataCache = streamMetadataCache; + this.systemAdmins = systemAdmins; // create taskStores for all tasks in the containerModel and each store in storageEngineFactories - this.taskStores = createTaskStores(containerModel, jobContext, containerContext, storageEngineFactories, changelogSystemStreams, - serdes, taskInstanceMetrics, taskInstanceCollectors, StorageEngineFactory.StoreMode.BulkLoad); + this.taskStores = createTaskStores(containerModel, jobContext, containerContext, storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors); - // create system consumers (1 per store system) - this.systemConsumers = createStoreConsumers(changelogSystemStreams, systemFactories, config, this.samzaContainerMetrics.registry()); + // create system consumers (1 per store system in changelogSystemStreams), and index it by storeName + Map<String, SystemConsumer> storeSystemConsumers = createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(e.getValue()))), systemFactories, config, this.samzaContainerMetrics.registry()); + this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers); // creating task restore managers this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock); + + // creating side input store processors, one per store per task + taskSideInputProcessors = createSideInputProcessors(new StorageConfig(config), this.containerModel, this.sideInputSystemStreams, this.taskInstanceMetrics); + + // create side input storage managers + sideInputStorageManagers = createSideInputStorageManagers(clock); + + // create side Input consumers indexed by systemName + this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, systemFactories, config, this.samzaContainerMetrics.registry()); + + // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used + if (!this.sideInputConsumers.isEmpty()) { Review comment: please refer to comment below about the checks for presence of side inputs ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services