Hi everybody, My requirements was: + Made a table charge to be partitioned by carrier and month + summarize by charges + summarize by users, + each summarization must be by month and several others columns.
Here is the database CREATE TABLE charges.charge ( id serial NOT NULL, transaction_id uuid NOT NULL, carrier_id integer NOT NULL, msisdn character varying(60) NOT NULL, partner_id integer NOT NULL, product_id integer NOT NULL, parent_id integer, retry_count integer NOT NULL, created_at timestamp with time zone NOT NULL, CONSTRAINT charge_pkey PRIMARY KEY (id) ); CREATE TABLE charges.charge_summarized ( id serial NOT NULL, created_at timestamp with time zone NOT NULL, carrier_id integer NOT NULL, partner_id integer NOT NULL, product_id integer NOT NULL, retry_count integer NOT NULL, amount integer NOT NULL, CONSTRAINT charge_summarized_pkey PRIMARY KEY (id), CONSTRAINT client_charge_client_id_key UNIQUE (carrier_id, partner_id, product_id, retry_count) ); CREATE TABLE charges.client ( id serial NOT NULL, carrier_id integer NOT NULL, msisdn character varying(60) NOT NULL, collectibility numeric(5,2) NOT NULL, CONSTRAINT client_pkey PRIMARY KEY (id), ); CREATE TABLE charges.client_charge ( id serial NOT NULL, client_id integer NOT NULL, date date NOT NULL, amount integer NOT NULL, CONSTRAINT client_charge_pkey PRIMARY KEY (id), CONSTRAINT client_charge_client_id_fkey FOREIGN KEY (client_id) REFERENCES charges.client (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED, CONSTRAINT client_charge_client_id_key UNIQUE (client_id, date), ); OK, now the functions I made to complete all this, Inserts are made only in charges.charge create or replace function charges.insert_into_charges() returns trigger as $body1$ args = TD['new'] event = TD['event'] table_name = TD['table_name'] from datetime import datetime, timedelta if event == 'INSERT': carrier = args['carrier_id'] created_at = datetime.strptime(args['created_at'].split(" ")[0], "%Y-%m-%d") month, year = created_at.month, created_at.year next_month, next_year = (month + 1, year) if month < 12 else (1, year+1) target_table_name = "charge_%s_%04d%02d" % (carrier, year, month) while True: exist_table = len(plpy.execute("select relname from pg_stat_user_tables where relname = '%s';" % target_table_name)) if not exist_table: sql = """create table charges.%(target_table_name)s (CONSTRAINT charge_%(carrier_id)s_carrier_id_check CHECK ( carrier_id = '%(carrier_id)s' AND created_at >= '%(from_year)s-%(from_month)s-01' AND created_at < '%(to_year)s-%(to_month)s-01' ) ) INHERITS (charges.charge) WITH ( OIDS=FALSE ); create trigger summarize_%(target_table_name)s AFTER insert on charges.%(target_table_name)s for each row execute procedure charges.summarize(); create trigger client_charge_sum_%(target_table_name)s AFTER insert on charges.%(target_table_name)s for each row execute procedure charges.client_charge_sum(); """ % {"carrier_id": carrier, "from_year": year, "from_month": month, "to_year": next_year, "to_month": next_month, "target_table_name": target_table_name} try: # multithreading could have a race condition here. Better to ask for forgiveness than permission. plpy.execute(sql) except: continue break keys, values = zip(*tuple([(x,y) for x,y in args.items() if y is not None])) sql = "insert into charges.%(target_table_name)s " \ "(%(keys)s) VALUES (%(values)s);" % \ {"carrier_id": carrier, "keys" : ",".join(keys), "values" : ",".join(["'%s'" % x for x in values]), "target_table_name": target_table_name } plpy.execute(sql) return "SKIP" $body1$ language plpythonu; create or replace function charges.client_charge_sum() returns trigger as $body3$ args = TD['new'] event = TD['event'] table_name = TD['table_name'] if event != 'INSERT': return while True: # to populate clients if is needed sql = "select id from charges.client where msisdn='%s';" % args["msisdn"] clients = plpy.execute(sql) if len(clients): client_id = clients[0]['id'] else: sql= """INSERT INTO charges.client ( carrier_id, msisdn,collectibility ) VALUES ( %s, '%s',0) RETURNING CURRVAL('charges.client_id_seq') as id;""" % (args['carrier_id'], args['msisdn']) try: client_id = plpy.execute(sql)[0]['id'] except: continue break group_by_data ={ "date" : args['created_at'].split(" ")[0], "client_id" : str(client_id), } filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()]) while True: sql = "select 1 from charges.client_charge where %s;" % (filter_string,) if len(plpy.execute(sql)): sql = "update charges.client_charge set amount=amount + 1 where %s" % filter_string plpy.info("update") else: field_names, field_datas = zip(*tuple(group_by_data.items())) field_data_string = ", ".join(["'%s'" % x for x in field_datas]) sql = "insert into charges.client_charge (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string) plpy.info("insert") try: plpy.execute(sql) except: continue break $body3$ language plpythonu; create or replace function charges.summarize() returns trigger as $body2$ args = TD['new'] event = TD['event'] table_name = TD['table_name'] if event != 'INSERT': return group_by_data ={ "carrier_id" : args['carrier_id'], "charged_at" : args['created_at'].split(" ")[0], "partner_id" : args['partner_id'], "product_id" : args['product_id'], "retry_count" : args['retry_count'], } filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()]) while True: sql = "select 1 from charges.charge_summarized where %s;" % (filter_string,) if len(plpy.execute(sql)): sql = "update charges.charge_summarized set amount=amount + 1 where %s" % filter_string plpy.info("update") else: field_names, field_datas = zip(*tuple(group_by_data.items())) field_data_string = ", ".join(["'%s'" % x for x in field_datas]) sql = "insert into charges.charge_summarized (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string) plpy.info("insert") try: plpy.execute(sql) except: continue break $body2$ language plpythonu; And finally the trigger: CREATE TRIGGER inserta_tg BEFORE INSERT ON charges.charge FOR EACH ROW EXECUTE PROCEDURE charges.insert_into_charges(); Doesn't sound like too much? As I say, im new and I didn't found any better. But an insert takes around 135ms in the worst case (create tables and insert rows) and about 85 ms in best case (only updates). There are something better? Thanks in advance, Sabrina