package datagrid.poc.ingest.bulk.cass.receiver;

import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamReceiver;


public class CassStreamReceiverPeerLoading implements StreamReceiver<String, BinaryObject>
    , GridPeerDeployAware {

  private ClassLoader ldr;

  @Override
  public void receive(IgniteCache<String, BinaryObject> cache,
      Collection<Entry<String, BinaryObject>> entries) throws IgniteException {


    for (Map.Entry<String, BinaryObject> entry : entries) {
      String key = entry.getKey();
      BinaryObject newValue = entry.getValue();
      Map<String, Object> newFieldValueMap = newValue.<Map<String, Object>>field(
          FIELD_VALUE_MAP_OBJECT_KEY);

      BinaryObject existingValue = cache.get(key);

      BinaryObjectBuilder finalValueBuilder ;
      if(existingValue != null) {
        finalValueBuilder = existingValue.toBuilder();
      }else{
        finalValueBuilder = newValue.toBuilder();
        finalValueBuilder.removeField(FIELD_VALUE_MAP_OBJECT_KEY) ;
      }

      overwriteExistingValueBuilder(newFieldValueMap, finalValueBuilder);
      cache.put(key, finalValueBuilder.build());
    }
  }

  private void overwriteExistingValueBuilder(Map<String, Object> fieldValueMap,
      BinaryObjectBuilder existingValueBuilder) {

    for (Entry<String, Object> entry : fieldValueMap.entrySet()) {
      Object value = entry.getValue();
      if (value != null) {
        existingValueBuilder.setField(entry.getKey(), value);
      }
    }
  }

  @Override
  public Class<?> deployClass() {
    return this.getClass();
  }

  @Override
  public ClassLoader classLoader() {
    if (ldr == null)
      ldr = U.detectClassLoader(deployClass());

    return ldr;
  }
}
