import org.apache.ignite.*;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientTransaction;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.lang.IgniteBiPredicate;

import javax.cache.Cache;
import java.util.*;

public class ScanQueryTest {

    static class Person {
        public Person(String ssn, String name, String city) {
            this.ssn = new String(ssn);
            this.name = new String(name);
            this.city = new String(city);
        }

        public String toString() {
            return ssn + " " + name + " " + city;
        }

        public String getSsn() {
            return ssn;
        }

        public String getName() {
            return name;
        }

        public String getCity() {
            return city;
        }

        private String ssn;

        private String name;

        private String city;
    }

    public ScanQueryTest() {
    }


    public static void main(String[] args) {
        String                       cacheName = "Person";
        IgniteClient                 client;
        ClientCache <String, Person> personCache = null;

        ClientConfiguration config = new ClientConfiguration();
        config.setPartitionAwarenessEnabled(true);
        config.setAddresses("127.0.0.1:10800");

        ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration();
        cacheCfg.setName(cacheName);
        cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        try {
            client = Ignition.startClient(config);
            try {
                client.destroyCache(cacheName);
                System.out.println("Removed cache: " + cacheName);
            } catch (Exception e) {
                // do nothing
            }
            personCache = client.getOrCreateCache(cacheCfg);
            System.out.println("Created cache: " + cacheName);

            Person[] persons = {
                new Person("1234-456-3456", "Mary", "Sunnyvale")
                    , new Person("1235-432-8765", "Antony", "Paris")
                    , new Person("1205-198-0015", "Aapar", "London")
                    , new Person("1207-675-8610", "Lilly", "Venice")
                    , new Person("2807-675-8610", "Antony", "Sunnyvale")
                    , new Person("4857-095-0191", "Sharp", "London")
            };

            try (ClientTransaction txn = client.transactions().txStart()) {
                for (int i = 0; i < persons.length; ++i) {
                    personCache.put(persons[i].getSsn(), persons[i]);
                }

                txn.commit();
                System.out.println("Added " + persons.length + " entries");
            } catch (Exception e) {
                System.out.println("Failed to add entries to cache " +
                    e.getMessage());
                System.exit(-1);
            }
        } catch (Exception e) {
            System.out.println("Failed to setup the test : " + e.getMessage());
            System.exit(-1);
        }

        // Run ScanQuery
        System.out.println("Scan Query Ignite Version 01");
        int resCount = 0;
        String[] misPer = { "Mary", "Agatha", "Antony"};

        for (int i = 0; i < misPer.length; ++i) {
            final String srchName = misPer[i];
            IgniteBiPredicate<String, Person> filter =
                (key, p) -> p.getName().equals(srchName);

            try (QueryCursor<Cache.Entry<String, Person>> scnCursor =
                    personCache.query(new ScanQuery<>(filter))) {
                        scnCursor.forEach(
                            entry -> System.out.println(entry.getKey() +
                                " " + entry.getValue()));
            } catch (Exception e) {
                System.out.println("Scan query failed " + e.getMessage());
            }
        }
    }
}
