mynameborat commented on a change in pull request #912: SEP-19 : Refactoring sideInputs from SamzaContainer to ContainerStorageManager URL: https://github.com/apache/samza/pull/912#discussion_r258147011
########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -147,32 +213,60 @@ public ContainerStorageManager(ContainerModel containerModel, StreamMetadataCach this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions; this.streamMetadataCache = streamMetadataCache; + this.systemAdmins = systemAdmins; // create taskStores for all tasks in the containerModel and each store in storageEngineFactories - this.taskStores = createTaskStores(containerModel, jobContext, containerContext, storageEngineFactories, changelogSystemStreams, - serdes, taskInstanceMetrics, taskInstanceCollectors, StorageEngineFactory.StoreMode.BulkLoad); + this.taskStores = createTaskStores(containerModel, jobContext, containerContext, storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors); - // create system consumers (1 per store system) - this.systemConsumers = createStoreConsumers(changelogSystemStreams, systemFactories, config, this.samzaContainerMetrics.registry()); + // create system consumers (1 per store system in changelogSystemStreams), and index it by storeName + Map<String, SystemConsumer> storeSystemConsumers = createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + e -> Arrays.asList(e.getValue()))), systemFactories, config, this.samzaContainerMetrics.registry()); + this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers); // creating task restore managers this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock); + + // creating side input store processors, one per store per task + sideInputStoresToProcessor = createSideInputProcessors(new StorageConfig(config), this.containerModel, this.sideInputSystemStreams, this.taskInstanceMetrics); + + // create side input storage managers + sideInputStorageManagers = createSideInputStorageManagers(clock); + + // create side Input consumers indexed by systemName + this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, systemFactories, config, this.samzaContainerMetrics.registry()); + + // create SystemConsumers for consuming from sideInputSSPs, if sideInputs are being used + if (!this.sideInputConsumers.isEmpty()) { + + scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet( + this.sideInputSystemStreams.values().stream().flatMap(List::stream).collect(Collectors.toSet())).toSet(), false); + + SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics( + new MetricsRegistryMap("samza-container-" + containerModel.getId() + "-sideinputs")); + + MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, + systemConsumersMetrics.registry(), systemAdmins); + + sideInputSystemConsumers = + new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager, + systemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), + SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime())); + } + } /** * Creates SystemConsumer objects for store restoration, creating one consumer per system. */ - private static Map<String, SystemConsumer> createStoreConsumers(Map<String, SystemStream> changelogSystemStreams, + private static Map<String, SystemConsumer> createConsumers(Map<String, List<SystemStream>> systemStreams, Review comment: Why does this have to take a `Map<String, List<SystemStream>> systemStreams`? It seems sufficient to just take in `Set<SystemStreams>`. You can also get rid of the up conversion in the calling site for creating store consumers. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services