mynameborat commented on a change in pull request #912: SEP-19 : Refactoring 
sideInputs from SamzaContainer to ContainerStorageManager
URL: https://github.com/apache/samza/pull/912#discussion_r258147011
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -147,32 +213,60 @@ public ContainerStorageManager(ContainerModel 
containerModel, StreamMetadataCach
 
     this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions;
     this.streamMetadataCache = streamMetadataCache;
+    this.systemAdmins = systemAdmins;
 
     // create taskStores for all tasks in the containerModel and each store in 
storageEngineFactories
-    this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, changelogSystemStreams,
-        serdes, taskInstanceMetrics, taskInstanceCollectors, 
StorageEngineFactory.StoreMode.BulkLoad);
+    this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors);
 
-    // create system consumers (1 per store system)
-    this.systemConsumers = createStoreConsumers(changelogSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
+    // create system consumers (1 per store system in changelogSystemStreams), 
and index it by storeName
+    Map<String, SystemConsumer> storeSystemConsumers = 
createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+        e -> Arrays.asList(e.getValue()))), systemFactories, config, 
this.samzaContainerMetrics.registry());
+    this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, 
storeSystemConsumers);
 
     // creating task restore managers
     this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+
+    // creating side input store processors, one per store per task
+    sideInputStoresToProcessor = createSideInputProcessors(new 
StorageConfig(config), this.containerModel, this.sideInputSystemStreams, 
this.taskInstanceMetrics);
+
+    // create side input storage managers
+    sideInputStorageManagers = createSideInputStorageManagers(clock);
+
+    // create side Input consumers indexed by systemName
+    this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
+
+    // create SystemConsumers for consuming from sideInputSSPs, if sideInputs 
are being used
+    if (!this.sideInputConsumers.isEmpty()) {
+
+      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
+          
this.sideInputSystemStreams.values().stream().flatMap(List::stream).collect(Collectors.toSet())).toSet(),
 false);
+
+      SystemConsumersMetrics systemConsumersMetrics = new 
SystemConsumersMetrics(
+          new MetricsRegistryMap("samza-container-" + containerModel.getId() + 
"-sideinputs"));
+
+      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new 
RoundRobinChooserFactory(), config,
+          systemConsumersMetrics.registry(), systemAdmins);
+
+      sideInputSystemConsumers =
+          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
+              systemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
+    }
+
   }
 
   /**
    *  Creates SystemConsumer objects for store restoration, creating one 
consumer per system.
    */
-  private static Map<String, SystemConsumer> createStoreConsumers(Map<String, 
SystemStream> changelogSystemStreams,
+  private static Map<String, SystemConsumer> createConsumers(Map<String, 
List<SystemStream>> systemStreams,
 
 Review comment:
   Why does this have to take a `Map<String, List<SystemStream>> 
systemStreams`? It seems sufficient to just take in `Set<SystemStreams>`.
   
   You can also get rid of the up conversion in the calling site for creating 
store consumers.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to