I tried to use Cassandra and multiprocessing to insert rows (dummy data) concurrently based on the examples in
http://www.datastax.com/dev/blog/datastax-python-driver-multiprocessing-example-for-improved-bulk-data-throughput This is my code class QueryManager(object): concurrency = 100 # chosen to match the default in execute_concurrent_with_args def __init__(self, session, process_count=None): self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,)) @classmethoddef _setup(cls, session): cls.session = session cls.prepared = cls.session.prepare(""" INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?) """) def close_pool(self): self.pool.close() self.pool.join() def get_results(self, params): results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency))) return list(itertools.chain(*results)) @classmethoddef _results_from_concurrent(cls, params): return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)] def _multiprocess_write(params): return QueryManager._results_from_concurrent(params) if __name__ == '__main__': processes = 2 # connect cluster cluster = Cluster(contact_points=['127.0.0.1'], port=9042) session = cluster.connect() # database name is a concatenation of client_id and system_id keyspace_name = 'unit_test_0' # drop keyspace if it already exists in a cluster try: session.execute("DROP KEYSPACE IF EXISTS " + keyspace_name) except: pass create_keyspace_query = "CREATE KEYSPACE " + keyspace_name \ + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};" session.execute(create_keyspace_query) # use a session's keyspace session.set_keyspace(keyspace_name) # drop table if it already exists in the keyspace try: session.execute("DROP TABLE IF EXISTS " + "test_table") except: pass # create a table for invoices in the keyspace create_test_table = "CREATE TABLE test_table(" keys = "key1 text,\n" \ "key2 text,\n" \ "key3 text,\n" \ "key4 text,\n" \ "key5 text,\n" create_invoice_table_query += keys create_invoice_table_query += "PRIMARY KEY (key1))" session.execute(create_test_table) qm = QueryManager(session, processes) params = list() for row in range(100000): key = 'test' + str(row) params.append([key, 'test', 'test', 'test', 'test']) start = time.time() rows = qm.get_results(params) delta = time.time() - start log.info(fm('Cassandra inserts 100k dummy rows for ', delta, ' secs')) when I executed the code, I got the following error TypeError: can't pickle _thread.lock objects which pointed at self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,)) I am wondering how to resolve the issue. cheers -- https://mail.python.org/mailman/listinfo/python-list