HI there,
I have asked this question, however I asked it under a different and
resolved topic, so I posted the quest under a more suitable title. I hope
thats ok
We have tried to configure two compute server nodes one of which is running
on a weaker machine. The node running on the more powerful machine always
finished its tasks far before
the weaker node and then sits idle.
The node is not even sending a steal request, so I must have configured
something wrong.
I have attached the code for both nodes if you could kindly point out what
I am missing , I would really appreciate it!
object ComputeNodeStartup extends App {
val cfg = new IgniteConfiguration();
val jobStealer = new JobStealingCollisionSpi();
jobStealer.setStealingEnabled(false);
/* set the number of parallel jobs allowed */
jobStealer.setActiveJobsThreshold(2);
jobStealer.setMessageExpireTime(1000);
val failoverSpi = new JobStealingFailoverSpi();
val cacheConfig = new CacheConfiguration("myCache");
cacheConfig.setCacheMode(CacheMode.PARTITIONED);
cfg.setCacheConfiguration(cacheConfig);
cfg.setPeerClassLoadingEnabled(true);
cfg.setFailoverSpi(failoverSpi);
cfg.setCollisionSpi(jobStealer);
cfg.setIncludeEventTypes(1600, 1601, 1602, 1603, 1604);
cfg.setUserAttributes(Map("compute.node" -> true));
val spi = new TcpDiscoverySpi();
// create a new instance of tcp discovery multicast ip finder
val tcMp = new TcpDiscoveryMulticastIpFinder();
tcMp.setAddresses(java.util.Arrays.asList("localhost"));
// change your IP address here
// set the multi cast ip finder for spi
spi.setIpFinder(tcMp);
cfg.setDiscoverySpi(spi);
val ignite: Ignite = Ignition.start(cfg);
}
object ComputeNodeJobStealerNodeStartUp extends App {
val jobStealer = new JobStealingCollisionSpi();
jobStealer.setStealingEnabled(true);
jobStealer.setStealingAttributes(Map("compute.node" -> "true"));
/* set the number of parallel jobs allowed */
jobStealer.setActiveJobsThreshold(4);
val failoverSpi = new JobStealingFailoverSpi();
val cfg = new IgniteConfiguration();
val cacheConfig = new CacheConfiguration("myCache");
/* partition the cache over the entire cluster */
cacheConfig.setCacheMode(CacheMode.PARTITIONED);
cfg.setCacheConfiguration(cacheConfig);
cfg.setFailoverSpi(failoverSpi);
cfg.setPeerClassLoadingEnabled(true);
cfg.setCollisionSpi(jobStealer);
/* enable the following events */
cfg.setIncludeEventTypes(
1600, /* job started */
1601, /* job finished*/
1602, /* job message*/
1603, /* job error */
1604 /* job warning */ );
/* jobs are sent to this node for computation */
cfg.setUserAttributes(Map("compute.node" -> true));
val spi = new TcpDiscoverySpi();
// create a new instance of tcp discovery multicast ip finder
val tcMp = new TcpDiscoveryMulticastIpFinder();
tcMp.setAddresses(java.util.Arrays.asList("localhost"));
// change your IP address here
// set the multi cast ip finder for spi
spi.setIpFinder(tcMp);
cfg.setDiscoverySpi(spi);
val ignite: Ignite = Ignition.start(cfg);
}