Hello all,

We had the same problem with unhandled DLRs in one of our installations, see 
Thread : http://www.mail-archive.com/[email protected]/msg20441.html (the 
second issue)

We use the latest stable version of Kannel (1.4.3) and the CIMD2 protocol to 
communicate with the SMSC.

After investigating the code we figured out that for all MT SMS with dlr-url 
specified, Kannel uses the SMSC id  and the send time (timestamp in seconds 
precision) of the original MT SMS to correlate incoming DLRs with the 
provided dlr-url for callback. Thus If you send more than one MT SMS from the 
same SMSC within the same second Kannel wont be able to distinguish between 
the available dlr-urls to invoke, upon DLR retrieval. What we experienced is 
wrong drl-url callback and even no drl-url callback at all.

The interesting thing is that the aforementioned behavior is not applied when 
using Oracle DB for storing DLRs, due to the fact that correlation is 
performed based on the triplet : SMSC id - timestamp - MT SMS receiver 
address (well to be absolutely honest we must state than even for this case 
if you send more than one MT SMS to the same recipient within the same second 
and from the same SMSC then the problem will still exist).

Furthermore this behavior is not applied when using the SMPP protocol because 
the correlation between the MT SMS and the DLR is done based on the SMSC id 
and a unique number (auto increment number for every MT SMS send).

To correct the problem we have altered all classes that handle DLRs by adding 
the MT SMS recipient address to the correlation algorithm. Thus only if you 
send more than one SMS to the same recipient within the same second and from 
the same SMSC, using the CIMD2 protocol you should experience dlr-url 
callback issues.

We attach all altered files


BRs

-- 
Byron I. Kiourtzoglou

Electrical & Computer Engineer
Msc Business Administration

Senior Software Engineer
Business Support Systems Department, Engineering Unit

INTRACOM TELECOM
19.7 km Markopoulou Ave.
19002 Peania, Athens, Greece
/* ==================================================================== 
 * The Kannel Software License, Version 1.0 
 * 
 * Copyright (c) 2001-2009 Kannel Group  
 * Copyright (c) 1998-2001 WapIT Ltd.   
 * All rights reserved. 
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions 
 * are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright 
 *    notice, this list of conditions and the following disclaimer. 
 * 
 * 2. Redistributions in binary form must reproduce the above copyright 
 *    notice, this list of conditions and the following disclaimer in 
 *    the documentation and/or other materials provided with the 
 *    distribution. 
 * 
 * 3. The end-user documentation included with the redistribution, 
 *    if any, must include the following acknowledgment: 
 *       "This product includes software developed by the 
 *        Kannel Group (http://www.kannel.org/)." 
 *    Alternately, this acknowledgment may appear in the software itself, 
 *    if and wherever such third-party acknowledgments normally appear. 
 * 
 * 4. The names "Kannel" and "Kannel Group" must not be used to 
 *    endorse or promote products derived from this software without 
 *    prior written permission. For written permission, please  
 *    contact [email protected]. 
 * 
 * 5. Products derived from this software may not be called "Kannel", 
 *    nor may "Kannel" appear in their name, without prior written 
 *    permission of the Kannel Group. 
 * 
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
 * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 * ==================================================================== 
 * 
 * This software consists of voluntary contributions made by many 
 * individuals on behalf of the Kannel Group.  For more information on  
 * the Kannel Group, please see <http://www.kannel.org/>. 
 * 
 * Portions of this software are based upon software originally written at  
 * WapIT Ltd., Helsinki, Finland for the Kannel project.  
 */ 

/*
 * gw/dlr_mem.c
 *
 * Implementation of handling delivery reports (DLRs)
 * in memory
 *
 * Andreas Fink <[email protected]>, 18.08.2001
 * Stipe Tolj <[email protected]>, 22.03.2002
 * Alexander Malysh <[email protected]> 2003
 */

#include "gwlib/gwlib.h"
#include "dlr_p.h"

/*
 * This is the global list where all messages being sent out are being kept track
 * of his list is looked up once a delivery report comes in
 */
static List *dlr_waiting_list;
static RWLock rwlock;

/*
 * Destroy dlr_waiting_list.
 */
static void dlr_mem_shutdown()
{
    gw_rwlock_wrlock(&rwlock);
    gwlist_destroy(dlr_waiting_list, (gwlist_item_destructor_t *)dlr_entry_destroy);
    gw_rwlock_unlock(&rwlock);
    gw_rwlock_destroy(&rwlock);
}

/*
 * Get count of dlr messages waiting.
 */
static long dlr_mem_messages(void)
{
    return gwlist_len(dlr_waiting_list);
}

static void dlr_mem_flush(void)
{
    long i;
    long len;

    gw_rwlock_wrlock(&rwlock);
    len = gwlist_len(dlr_waiting_list);
    for (i=0; i < len; i++)
        gwlist_delete(dlr_waiting_list, i, 1);
    gw_rwlock_unlock(&rwlock);
}

/*
 * add struct dlr_entry to list
 */
static void dlr_mem_add(struct dlr_entry *dlr)
{
    gw_rwlock_wrlock(&rwlock);
    gwlist_append(dlr_waiting_list,dlr);
    gw_rwlock_unlock(&rwlock);
}

/*
 * Private compare function
 * Return 0 if entry match and 1 if not.
 */
static int dlr_mem_entry_match(struct dlr_entry *dlr, const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    debug("dlr.dlr", 0, "DLR[%s]: Comparing DLR smsc=%s, ts=%s, dst=%s",
              dlr_type(), octstr_get_cstr(smsc), octstr_get_cstr(ts),
              octstr_get_cstr(dst));
    /* XXX: check destination addr too, because e.g. for UCP is not enough to check only
     *          smsc and timestamp (timestamp is even without milliseconds)
     */
    if(octstr_compare(dlr->smsc,smsc) == 0 && octstr_compare(dlr->timestamp,ts) == 0 && octstr_compare(dlr->destination,dst) == 0)
        return 0;

    return 1;
}

/*
 * Find matching entry and return copy of it, otherwise NULL
 */
static struct dlr_entry *dlr_mem_get(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    long i;
    long len;
    struct dlr_entry *dlr = NULL, *ret = NULL;

    gw_rwlock_rdlock(&rwlock);
    len = gwlist_len(dlr_waiting_list);
    for (i=0; i < len; i++) {
        dlr = gwlist_get(dlr_waiting_list, i);

        if (dlr_mem_entry_match(dlr, smsc, ts, dst) == 0) {
            ret = dlr_entry_duplicate(dlr);
            break;
        }
    }
    gw_rwlock_unlock(&rwlock);

    /* we couldnt find a matching entry */
    return ret;
}

/*
 * Remove matching entry
 */
static void dlr_mem_remove(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    long i;
    long len;
    struct dlr_entry *dlr = NULL;

    gw_rwlock_wrlock(&rwlock);
    len = gwlist_len(dlr_waiting_list);
    for (i=0; i < len; i++) {
        dlr = gwlist_get(dlr_waiting_list, i);

        if (dlr_mem_entry_match(dlr, smsc, ts, dst) == 0) {
            gwlist_delete(dlr_waiting_list, i, 1);
            dlr_entry_destroy(dlr);
            break;
        }
    }
    gw_rwlock_unlock(&rwlock);
}

static struct dlr_storage  handles = {
    .type = "internal",
    .dlr_add = dlr_mem_add,
    .dlr_get = dlr_mem_get,
    .dlr_remove = dlr_mem_remove,
    .dlr_shutdown = dlr_mem_shutdown,
    .dlr_messages = dlr_mem_messages,
    .dlr_flush = dlr_mem_flush
};

/*
 * Initialize dlr_waiting_list and return out storage handles.
 */
struct dlr_storage *dlr_init_mem(Cfg *cfg)
{
    dlr_waiting_list = gwlist_create();
    gw_rwlock_init_static(&rwlock);

    return &handles;
}
/* ==================================================================== 
 * The Kannel Software License, Version 1.0 
 * 
 * Copyright (c) 2001-2009 Kannel Group  
 * Copyright (c) 1998-2001 WapIT Ltd.   
 * All rights reserved. 
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions 
 * are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright 
 *    notice, this list of conditions and the following disclaimer. 
 * 
 * 2. Redistributions in binary form must reproduce the above copyright 
 *    notice, this list of conditions and the following disclaimer in 
 *    the documentation and/or other materials provided with the 
 *    distribution. 
 * 
 * 3. The end-user documentation included with the redistribution, 
 *    if any, must include the following acknowledgment: 
 *       "This product includes software developed by the 
 *        Kannel Group (http://www.kannel.org/)." 
 *    Alternately, this acknowledgment may appear in the software itself, 
 *    if and wherever such third-party acknowledgments normally appear. 
 * 
 * 4. The names "Kannel" and "Kannel Group" must not be used to 
 *    endorse or promote products derived from this software without 
 *    prior written permission. For written permission, please  
 *    contact [email protected]. 
 * 
 * 5. Products derived from this software may not be called "Kannel", 
 *    nor may "Kannel" appear in their name, without prior written 
 *    permission of the Kannel Group. 
 * 
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
 * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 * ==================================================================== 
 * 
 * This software consists of voluntary contributions made by many 
 * individuals on behalf of the Kannel Group.  For more information on  
 * the Kannel Group, please see <http://www.kannel.org/>. 
 * 
 * Portions of this software are based upon software originally written at  
 * WapIT Ltd., Helsinki, Finland for the Kannel project.  
 */ 

/*
 * dlr_sdb.c
 *
 * Implementation of handling delivery reports (DLRs)
 * for LibSDB.
 *
 * Andreas Fink <[email protected]>, 18.08.2001
 * Stipe Tolj <[email protected]>, 22.03.2002
 * Alexander Malysh <[email protected]> 2003
 * Guillaume Cottenceau 2004 (dbpool support)
*/

#include "gwlib/gwlib.h"
#include "gwlib/dbpool.h"
#include "dlr_p.h"

#ifdef HAVE_SDB
#include <sdb.h>

/*
 * Our connection pool to sdb.
 */
static DBPool *pool = NULL;

/*
 * Database fields, which we use.
 */
static struct dlr_db_fields *fields = NULL;

enum {
    SDB_ORACLE,
    SDB_MYSQL,
    SDB_POSTGRES,
    SDB_OTHER
};

static long sdb_conn_type = SDB_OTHER;


static const char* sdb_get_limit_str()
{
    switch (sdb_conn_type) {
        case SDB_ORACLE:
            return "AND ROWNUM < 2";
        case SDB_MYSQL:
        case SDB_POSTGRES:
            return "LIMIT 1";
        case SDB_OTHER:
        default:
            return "";
    }
}

static void dlr_sdb_shutdown()
{
    dbpool_destroy(pool);
    dlr_db_fields_destroy(fields);
}

static int gw_sdb_query(char *query,
                        int (*callback)(int, char **, void *), void *closure)
{
    DBPoolConn *pc;
    int rows;

    pc = dbpool_conn_consume(pool);
    if (pc == NULL) {
        error(0, "SDB: Database pool got no connection!");
        return -1;
    }

    rows = sdb_query(pc->conn, query, callback, closure);

    dbpool_conn_produce(pc);

    return rows;
}

static void dlr_sdb_add(struct dlr_entry *dlr)
{
    Octstr *sql;
    int	state;

    sql = octstr_format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES "
                        "('%s', '%s', '%s', '%s', '%s', '%s', '%d', '%s', '%d')",
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(fields->field_ts),
                        octstr_get_cstr(fields->field_src), octstr_get_cstr(fields->field_dst),
                        octstr_get_cstr(fields->field_serv), octstr_get_cstr(fields->field_url),
                        octstr_get_cstr(fields->field_mask), octstr_get_cstr(fields->field_boxc),
                        octstr_get_cstr(fields->field_status),
                        octstr_get_cstr(dlr->smsc), octstr_get_cstr(dlr->timestamp),
                        octstr_get_cstr(dlr->source), octstr_get_cstr(dlr->destination),
                        octstr_get_cstr(dlr->service), octstr_get_cstr(dlr->url), dlr->mask,
                        octstr_get_cstr(dlr->boxc_id), 0);

#if defined(DLR_TRACE)
     debug("dlr.sdb", 0, "SDB: sql: %s", octstr_get_cstr(sql));
#endif

    state = gw_sdb_query(octstr_get_cstr(sql), NULL, NULL);
    if (state == -1)
        error(0, "SDB: error in inserting DLR for DST <%s>", octstr_get_cstr(dlr->destination));

    octstr_destroy(sql);
    dlr_entry_destroy(dlr);
}

static int sdb_callback_add(int n, char **p, void *data)
{
    struct dlr_entry *res = (struct dlr_entry *) data;

    if (n != 6) {
        debug("dlr.sdb", 0, "SDB: Result has incorrect number of columns: %d", n);
        return 0;
    }

#if defined(DLR_TRACE)
    debug("dlr.sdb", 0, "row=%s,%s,%s,%s,%s,%s",p[0],p[1],p[2],p[3],p[4],p[5]);
#endif

    if (res->destination != NULL) {
        debug("dlr.sdb", 0, "SDB: Row already stored.");
        return 0;
    }

    res->mask = atoi(p[0]);
    res->service = octstr_create(p[1]);
    res->url = octstr_create(p[2]);
    res->source = octstr_create(p[3]);
    res->destination = octstr_create(p[4]);
    res->boxc_id = octstr_create(p[5]);

    return 0;
}

static int sdb_callback_msgs(int n, char **p, void *data)
{
    long *count = (long *) data;

    if (n != 1) {
        debug("dlr.sdb", 0, "SDB: Result has incorrect number of columns: %d", n);
        return 0;
    }

#if defined(DLR_TRACE)
    debug("dlr.sdb", 0, "SDB: messages=%s",p[0]);
#endif

    *count = atol(p[0]);

    return 0;
}

static struct dlr_entry*  dlr_sdb_get(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    Octstr *sql;
    int	state;
    struct dlr_entry *res = dlr_entry_create();

    gw_assert(res != NULL);

    debug("dlr.sdb", 0, "DLR[%s]: Comparing DLR smsc=%s, ts=%s, dst=%s",
              dlr_type(), octstr_get_cstr(smsc), octstr_get_cstr(ts),
              octstr_get_cstr(dst));

    sql = octstr_format("SELECT %s, %s, %s, %s, %s, %s FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' %s",
                        octstr_get_cstr(fields->field_mask), octstr_get_cstr(fields->field_serv),
                        octstr_get_cstr(fields->field_url), octstr_get_cstr(fields->field_src),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(fields->field_boxc),
                        octstr_get_cstr(fields->table),
                        octstr_get_cstr(fields->field_smsc), octstr_get_cstr(smsc),
                        octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst), sdb_get_limit_str());


#if defined(DLR_TRACE)
     debug("dlr.sdb", 0, "SDB: sql: %s", octstr_get_cstr(sql));
#endif

    state = gw_sdb_query(octstr_get_cstr(sql), sdb_callback_add, res);
    octstr_destroy(sql);
    if (state == -1) {
        error(0, "SDB: error in finding DLR");
        goto notfound;
    }
    else if (state == 0) {
        debug("dlr.sdb", 0, "SDB: no entry found for DST <%s>.", octstr_get_cstr(dst));
        goto notfound;
    }

    res->smsc = octstr_duplicate(smsc);

    return res;

notfound:
    dlr_entry_destroy(res);
    return NULL;
}

static void  dlr_sdb_update(const Octstr *smsc, const Octstr *ts, const Octstr *dst, int status)
{
    Octstr *sql;
    int	state;

    debug("dlr.sdb", 0, "SDB: updating DLR status in database");

    sql = octstr_format("UPDATE %s SET %s=%d WHERE %s='%s' AND %s='%s' AND %s='%s' %s",
                        octstr_get_cstr(fields->table),
                        octstr_get_cstr(fields->field_status), status,
                        octstr_get_cstr(fields->field_smsc), octstr_get_cstr(smsc),
                        octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst), sdb_get_limit_str());


#if defined(DLR_TRACE)
     debug("dlr.sdb", 0, "SDB: sql: %s", octstr_get_cstr(sql));
#endif

    state = gw_sdb_query(octstr_get_cstr(sql), NULL, NULL);
    octstr_destroy(sql);
    if (state == -1) {
        error(0, "SDB: error in updating DLR");
    }
}

static void  dlr_sdb_remove(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    Octstr *sql;
    int	state;

    debug("dlr.sdb", 0, "removing DLR from database");
    if (sdb_conn_type == SDB_POSTGRES) {
        /*
         * Postgres doesn't support limiting delete/update queries,
         * thus we need to use a select subquery.
         * - notice that for uniqueness use of `oid', postgres suggests
         * to do vacuum regularly, even if it's virtually impossible
         * to hit duplicates since oid's are given in a row
         */
        sql = octstr_format("DELETE FROM %s WHERE oid = \
                            (SELECT oid FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' LIMIT 1)",
                            octstr_get_cstr(fields->table),
                            octstr_get_cstr(fields->table),
                            octstr_get_cstr(fields->field_smsc), octstr_get_cstr(smsc),
                            octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                            octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));
    } else {
        sql = octstr_format("DELETE FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' %s",
                            octstr_get_cstr(fields->table),
                            octstr_get_cstr(fields->field_smsc), octstr_get_cstr(smsc),
                            octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                            octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst), sdb_get_limit_str());
    }

#if defined(DLR_TRACE)
     debug("dlr.sdb", 0, "SDB: sql: %s", octstr_get_cstr(sql));
#endif

    state = gw_sdb_query(octstr_get_cstr(sql), NULL, NULL);
    octstr_destroy(sql);
    if (state == -1)
        error(0, "SDB: error in deleting DLR");
}

static long dlr_sdb_messages(void)
{
    Octstr *sql;
    int	state;
    long res = 0;

    sql = octstr_format("SELECT count(*) FROM %s", octstr_get_cstr(fields->table));

#if defined(DLR_TRACE)
    debug("dlr.sdb", 0, "sql: %s", octstr_get_cstr(sql));
#endif

    state = gw_sdb_query(octstr_get_cstr(sql), sdb_callback_msgs, &res);
    octstr_destroy(sql);
    if (state == -1) {
        error(0, "SDB: error in selecting ammount of waiting DLRs");
        return -1;
    }

    return res;
}

static void dlr_sdb_flush(void)
{
    Octstr *sql;
    int	state;

    sql = octstr_format("DELETE FROM %s", octstr_get_cstr(fields->table));

#if defined(DLR_TRACE)
     debug("dlr.sdb", 0, "sql: %s", octstr_get_cstr(sql));
#endif

    state = gw_sdb_query(octstr_get_cstr(sql), NULL, NULL);
    octstr_destroy(sql);
    if (state == -1) {
        error(0, "SDB: error in flusing DLR table");
    }
}


static struct dlr_storage  handles = {
    .type = "sdb",
    .dlr_add = dlr_sdb_add,
    .dlr_get = dlr_sdb_get,
    .dlr_update = dlr_sdb_update,
    .dlr_remove = dlr_sdb_remove,
    .dlr_shutdown = dlr_sdb_shutdown,
    .dlr_messages = dlr_sdb_messages,
    .dlr_flush = dlr_sdb_flush
};

struct dlr_storage *dlr_init_sdb(Cfg* cfg)
{
    CfgGroup *grp;
    List *grplist;
    Octstr *sdb_url, *sdb_id;
    Octstr *p = NULL;
    long pool_size;
    DBConf *db_conf = NULL;

    /*
     * check for all mandatory directives that specify the field names
     * of the used table
     */
    if (!(grp = cfg_get_single_group(cfg, octstr_imm("dlr-db"))))
        panic(0, "DLR: SDB: group 'dlr-db' is not specified!");

    if (!(sdb_id = cfg_get(grp, octstr_imm("id"))))
   	    panic(0, "DLR: SDB: directive 'id' is not specified!");

    fields = dlr_db_fields_create(grp);
    gw_assert(fields != NULL);

    /*
     * now grap the required information from the 'mysql-connection' group
     * with the sdb-id we just obtained
     *
     * we have to loop through all available SDB connection definitions
     * and search for the one we are looking for
     */

    grplist = cfg_get_multi_group(cfg, octstr_imm("sdb-connection"));
    while (grplist && (grp = gwlist_extract_first(grplist)) != NULL) {
        p = cfg_get(grp, octstr_imm("id"));
        if (p != NULL && octstr_compare(p, sdb_id) == 0) {
            goto found;
        }
        if (p != NULL) octstr_destroy(p);
    }
    panic(0, "DLR: SDB: connection settings for id '%s' are not specified!",
          octstr_get_cstr(sdb_id));

found:
    octstr_destroy(p);
    gwlist_destroy(grplist, NULL);

    if (cfg_get_integer(&pool_size, grp, octstr_imm("max-connections")) == -1 || pool_size == 0)
        pool_size = 1;

    if (!(sdb_url = cfg_get(grp, octstr_imm("url"))))
   	    panic(0, "DLR: SDB: directive 'url' is not specified!");

    if (octstr_search(sdb_url, octstr_imm("oracle:"), 0) == 0)
        sdb_conn_type = SDB_ORACLE;
    else if (octstr_search(sdb_url, octstr_imm("mysql:"), 0) == 0) {
        warning(0, "DLR[sdb]: Please use native MySQL support, instead of libsdb.");
        sdb_conn_type = SDB_MYSQL;
    }
    else if (octstr_search(sdb_url, octstr_imm("postgres:"), 0) == 0) {
        sdb_conn_type = SDB_POSTGRES;
    }
    else
        sdb_conn_type = SDB_OTHER;

    /*
     * ok, ready to connect
     */
    info(0,"Connecting to sdb resource <%s>.", octstr_get_cstr(sdb_url));

    db_conf = gw_malloc(sizeof(DBConf));
    gw_assert(db_conf != NULL);

    db_conf->sdb = gw_malloc(sizeof(SDBConf));
    gw_assert(db_conf->sdb != NULL);

    db_conf->sdb->url = sdb_url;

    pool = dbpool_create(DBPOOL_SDB, db_conf, pool_size);
    gw_assert(pool != NULL);

    /*
     * XXX should a failing connect throw panic?!
     */
    if (dbpool_conn_count(pool) == 0)
        panic(0,"DLR: SDB: database pool has no connections!");

    return &handles;
}
#else
/*
 * Return NULL , so we point dlr-core that we were
 * not compiled in.
 */
struct dlr_storage *dlr_init_sdb(Cfg* cfg)
{
    return NULL;
}
#endif /* HAVE_SDB */
/* ==================================================================== 
 * The Kannel Software License, Version 1.0 
 * 
 * Copyright (c) 2001-2009 Kannel Group  
 * Copyright (c) 1998-2001 WapIT Ltd.   
 * All rights reserved. 
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions 
 * are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright 
 *    notice, this list of conditions and the following disclaimer. 
 * 
 * 2. Redistributions in binary form must reproduce the above copyright 
 *    notice, this list of conditions and the following disclaimer in 
 *    the documentation and/or other materials provided with the 
 *    distribution. 
 * 
 * 3. The end-user documentation included with the redistribution, 
 *    if any, must include the following acknowledgment: 
 *       "This product includes software developed by the 
 *        Kannel Group (http://www.kannel.org/)." 
 *    Alternately, this acknowledgment may appear in the software itself, 
 *    if and wherever such third-party acknowledgments normally appear. 
 * 
 * 4. The names "Kannel" and "Kannel Group" must not be used to 
 *    endorse or promote products derived from this software without 
 *    prior written permission. For written permission, please  
 *    contact [email protected]. 
 * 
 * 5. Products derived from this software may not be called "Kannel", 
 *    nor may "Kannel" appear in their name, without prior written 
 *    permission of the Kannel Group. 
 * 
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
 * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 * ==================================================================== 
 * 
 * This software consists of voluntary contributions made by many 
 * individuals on behalf of the Kannel Group.  For more information on  
 * the Kannel Group, please see <http://www.kannel.org/>. 
 * 
 * Portions of this software are based upon software originally written at  
 * WapIT Ltd., Helsinki, Finland for the Kannel project.  
 */ 

/*
 * dlr_mysql.c
 *
 * Implementation of handling delivery reports (DLRs)
 * for MySql database
 *
 * Andreas Fink <[email protected]>, 18.08.2001
 * Stipe Tolj <[email protected]>, 22.03.2002
 * Alexander Malysh <[email protected]> 2003
*/

#include "gwlib/gwlib.h"
#include "gwlib/dbpool.h"
#include "dlr_p.h"


#ifdef HAVE_MYSQL
#include <mysql/mysql.h>

/*
 * Our connection pool to mysql.
 */
static DBPool *pool = NULL;

/*
 * Database fields, which we are use.
 */
static struct dlr_db_fields *fields = NULL;


static void mysql_update(const Octstr *sql)
{
    int	state;
    DBPoolConn *pc;

#if defined(DLR_TRACE)
     debug("dlr.mysql", 0, "sql: %s", octstr_get_cstr(sql));
#endif

    pc = dbpool_conn_consume(pool);
    if (pc == NULL) {
        error(0, "MYSQL: Database pool got no connection! DB update failed!");
        return;
    }

    state = mysql_query(pc->conn, octstr_get_cstr(sql));
    if (state != 0)
        error(0, "MYSQL: %s", mysql_error(pc->conn));

    dbpool_conn_produce(pc);
}

static MYSQL_RES* mysql_select(const Octstr *sql)
{
    int	state;
    MYSQL_RES *result = NULL;
    DBPoolConn *pc;

#if defined(DLR_TRACE)
    debug("dlr.mysql", 0, "sql: %s", octstr_get_cstr(sql));
#endif

    pc = dbpool_conn_consume(pool);
    if (pc == NULL) {
        error(0, "MYSQL: Database pool got no connection! DB update failed!");
        return NULL;
    }

    state = mysql_query(pc->conn, octstr_get_cstr(sql));
    if (state != 0) {
        error(0, "MYSQL: %s", mysql_error(pc->conn));
    } else {
        result = mysql_store_result(pc->conn);
    }

    dbpool_conn_produce(pc);

    return result;
}

static void dlr_mysql_shutdown()
{
    dbpool_destroy(pool);
    dlr_db_fields_destroy(fields);
}

static void dlr_mysql_add(struct dlr_entry *entry)
{
    Octstr *sql;

    sql = octstr_format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES "
                        "('%s', '%s', '%s', '%s', '%s', '%s', '%d', '%s', '%d');",
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(fields->field_ts),
                        octstr_get_cstr(fields->field_src), octstr_get_cstr(fields->field_dst),
                        octstr_get_cstr(fields->field_serv), octstr_get_cstr(fields->field_url),
                        octstr_get_cstr(fields->field_mask), octstr_get_cstr(fields->field_boxc),
                        octstr_get_cstr(fields->field_status),
                        octstr_get_cstr(entry->smsc), octstr_get_cstr(entry->timestamp), octstr_get_cstr(entry->source),
                        octstr_get_cstr(entry->destination), octstr_get_cstr(entry->service), octstr_get_cstr(entry->url),
                        entry->mask, octstr_get_cstr(entry->boxc_id), 0);


    mysql_update(sql);

    octstr_destroy(sql);
    dlr_entry_destroy(entry);
}

static struct dlr_entry* dlr_mysql_get(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    struct dlr_entry *res = NULL;
    Octstr *sql;
    MYSQL_RES *result;
    MYSQL_ROW row;

    debug("dlr.mysql", 0, "DLR[%s]: Comparing DLR smsc=%s, ts=%s, dst=%s",
              dlr_type(), octstr_get_cstr(smsc), octstr_get_cstr(ts),
              octstr_get_cstr(dst));

    sql = octstr_format("SELECT %s, %s, %s, %s, %s, %s FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s';",
                        octstr_get_cstr(fields->field_mask), octstr_get_cstr(fields->field_serv),
                        octstr_get_cstr(fields->field_url), octstr_get_cstr(fields->field_src),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(fields->field_boxc),
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(smsc), octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));

    result = mysql_select(sql);
    octstr_destroy(sql);

    if (result == NULL) {
        return NULL;
    }
    if (mysql_num_rows(result) < 1) {
        debug("dlr.mysql", 0, "no rows found");
        mysql_free_result(result);
        return NULL;
    }
    row = mysql_fetch_row(result);
    if (!row) {
        debug("dlr.mysql", 0, "rows found but could not load them");
        mysql_free_result(result);
        return NULL;
    }

    debug("dlr.mysql", 0, "Found entry, row[0]=%s, row[1]=%s, row[2]=%s, row[3]=%s, row[4]=%s row[5]=%s",
          row[0], row[1], row[2], (row[3] ? row[3] : "NULL"), (row[4] ? row[4] : "NULL"), (row[5] ? row[5] : "NULL"));

    res = dlr_entry_create();
    gw_assert(res != NULL);
    res->mask = atoi(row[0]);
    res->service = octstr_create(row[1]);
    res->url = octstr_create(row[2]);
    res->source = row[3] ? octstr_create(row[3]) : octstr_create("");
    res->destination = row[4] ? octstr_create(row[4]) : octstr_create("");
    res->boxc_id = row[5] ? octstr_create(row[5]) : octstr_create("");
    res->smsc = octstr_duplicate(smsc);

    mysql_free_result(result);

    return res;
}

static void dlr_mysql_remove(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    Octstr *sql;

    debug("dlr.mysql", 0, "removing DLR from database");
    sql = octstr_format("DELETE FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' LIMIT 1;",
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(smsc), octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts), 
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));


    mysql_update(sql);

    octstr_destroy(sql);
}

static void dlr_mysql_update(const Octstr *smsc, const Octstr *ts, const Octstr *dst, int status)
{
    Octstr *sql;

    debug("dlr.mysql", 0, "updating DLR status in database");
    sql = octstr_format("UPDATE %s SET %s=%d WHERE %s='%s' AND %s='%s' AND %s='%s' LIMIT 1;",
                        octstr_get_cstr(fields->table),
                        octstr_get_cstr(fields->field_status), status,
                        octstr_get_cstr(fields->field_smsc), octstr_get_cstr(smsc),
                        octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));

    mysql_update(sql);

    octstr_destroy(sql);
}


static long dlr_mysql_messages(void)
{
    Octstr *sql;
    long res;
    MYSQL_RES *result;
    MYSQL_ROW row;

    sql = octstr_format("SELECT count(*) FROM %s;", octstr_get_cstr(fields->table));

    result = mysql_select(sql);
    octstr_destroy(sql);

    if (result == NULL) {
        return -1;
    }
    if (mysql_num_rows(result) < 1) {
        debug("dlr.mysql", 0, "Could not get count of DLR table");
        mysql_free_result(result);
        return 0;
    }
    row = mysql_fetch_row(result);
    if (row == NULL) {
        debug("dlr.mysql", 0, "rows found but could not load them");
        mysql_free_result(result);
        return 0;
    }
    res = atol(row[0]);
    mysql_free_result(result);

    return res;
}

static void dlr_mysql_flush(void)
{
    Octstr *sql;

    sql = octstr_format("DELETE FROM %s;", octstr_get_cstr(fields->table));

    mysql_update(sql);
    octstr_destroy(sql);
}

static struct dlr_storage handles = {
    .type = "mysql",
    .dlr_add = dlr_mysql_add,
    .dlr_get = dlr_mysql_get,
    .dlr_update = dlr_mysql_update,
    .dlr_remove = dlr_mysql_remove,
    .dlr_shutdown = dlr_mysql_shutdown,
    .dlr_messages = dlr_mysql_messages,
    .dlr_flush = dlr_mysql_flush
};

struct dlr_storage *dlr_init_mysql(Cfg *cfg)
{
    CfgGroup *grp;
    List *grplist;
    Octstr *mysql_host, *mysql_user, *mysql_pass, *mysql_db, *mysql_id;
    long mysql_port = 0;
    Octstr *p = NULL;
    long pool_size;
    DBConf *db_conf = NULL;

    /*
     * check for all mandatory directives that specify the field names
     * of the used MySQL table
     */
    if (!(grp = cfg_get_single_group(cfg, octstr_imm("dlr-db"))))
        panic(0, "DLR: MySQL: group 'dlr-db' is not specified!");

    if (!(mysql_id = cfg_get(grp, octstr_imm("id"))))
   	    panic(0, "DLR: MySQL: directive 'id' is not specified!");

    fields = dlr_db_fields_create(grp);
    gw_assert(fields != NULL);

    /*
     * now grap the required information from the 'mysql-connection' group
     * with the mysql-id we just obtained
     *
     * we have to loop through all available MySQL connection definitions
     * and search for the one we are looking for
     */

    grplist = cfg_get_multi_group(cfg, octstr_imm("mysql-connection"));
    while (grplist && (grp = gwlist_extract_first(grplist)) != NULL) {
        p = cfg_get(grp, octstr_imm("id"));
        if (p != NULL && octstr_compare(p, mysql_id) == 0) {
            goto found;
        }
        if (p != NULL) octstr_destroy(p);
    }
    panic(0, "DLR: MySQL: connection settings for id '%s' are not specified!",
          octstr_get_cstr(mysql_id));

found:
    octstr_destroy(p);
    gwlist_destroy(grplist, NULL);

    if (cfg_get_integer(&pool_size, grp, octstr_imm("max-connections")) == -1 || pool_size == 0)
        pool_size = 1;

    if (!(mysql_host = cfg_get(grp, octstr_imm("host"))))
   	    panic(0, "DLR: MySQL: directive 'host' is not specified!");
    if (!(mysql_user = cfg_get(grp, octstr_imm("username"))))
   	    panic(0, "DLR: MySQL: directive 'username' is not specified!");
    if (!(mysql_pass = cfg_get(grp, octstr_imm("password"))))
   	    panic(0, "DLR: MySQL: directive 'password' is not specified!");
    if (!(mysql_db = cfg_get(grp, octstr_imm("database"))))
   	    panic(0, "DLR: MySQL: directive 'database' is not specified!");
    cfg_get_integer(&mysql_port, grp, octstr_imm("port"));  /* optional */

    /*
     * ok, ready to connect to MySQL
     */
    db_conf = gw_malloc(sizeof(DBConf));
    gw_assert(db_conf != NULL);

    db_conf->mysql = gw_malloc(sizeof(MySQLConf));
    gw_assert(db_conf->mysql != NULL);

    db_conf->mysql->host = mysql_host;
    db_conf->mysql->port = mysql_port;
    db_conf->mysql->username = mysql_user;
    db_conf->mysql->password = mysql_pass;
    db_conf->mysql->database = mysql_db;

    pool = dbpool_create(DBPOOL_MYSQL, db_conf, pool_size);
    gw_assert(pool != NULL);

    /*
     * XXX should a failing connect throw panic?!
     */
    if (dbpool_conn_count(pool) == 0)
        panic(0,"DLR: MySQL: database pool has no connections!");

    octstr_destroy(mysql_id);

    return &handles;
}
#else
/*
 * Return NULL , so we point dlr-core that we were
 * not compiled in.
 */
struct dlr_storage *dlr_init_mysql(Cfg* cfg)
{
    return NULL;
}
#endif /* HAVE_MYSQL */

/* ==================================================================== 
 * The Kannel Software License, Version 1.0 
 * 
 * Copyright (c) 2001-2009 Kannel Group  
 * Copyright (c) 1998-2001 WapIT Ltd.   
 * All rights reserved. 
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions 
 * are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright 
 *    notice, this list of conditions and the following disclaimer. 
 * 
 * 2. Redistributions in binary form must reproduce the above copyright 
 *    notice, this list of conditions and the following disclaimer in 
 *    the documentation and/or other materials provided with the 
 *    distribution. 
 * 
 * 3. The end-user documentation included with the redistribution, 
 *    if any, must include the following acknowledgment: 
 *       "This product includes software developed by the 
 *        Kannel Group (http://www.kannel.org/)." 
 *    Alternately, this acknowledgment may appear in the software itself, 
 *    if and wherever such third-party acknowledgments normally appear. 
 * 
 * 4. The names "Kannel" and "Kannel Group" must not be used to 
 *    endorse or promote products derived from this software without 
 *    prior written permission. For written permission, please  
 *    contact [email protected]. 
 * 
 * 5. Products derived from this software may not be called "Kannel", 
 *    nor may "Kannel" appear in their name, without prior written 
 *    permission of the Kannel Group. 
 * 
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
 * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 * ==================================================================== 
 * 
 * This software consists of voluntary contributions made by many 
 * individuals on behalf of the Kannel Group.  For more information on  
 * the Kannel Group, please see <http://www.kannel.org/>. 
 * 
 * Portions of this software are based upon software originally written at  
 * WapIT Ltd., Helsinki, Finland for the Kannel project.  
 */ 

/*
 * dlr_pgsql.c
 *
 * Implementation of handling delivery reports (DLRs)
 * for PostgreSQL database
 *
 * modeled after dlr_mysql.c
 *
 * Alexander Malysh <[email protected]>, cleanup 2004
 */

#include "gwlib/gwlib.h"
#include "gwlib/dbpool.h"
#include "dlr_p.h"


#ifdef HAVE_PGSQL
#include <libpq-fe.h>

/*
 * Our connection pool to pgsql.
 */
static DBPool *pool = NULL;

/*
 * Database fields, which we are use.
 */
static struct dlr_db_fields *fields = NULL;


static inline int pgsql_update(const Octstr *sql)
{
    DBPoolConn *pc;
    int ret = 0;

#if defined(DLR_TRACE)
    debug("dlr.pgsql", 0, "sql: %s", octstr_get_cstr(sql));
#endif

    pc = dbpool_conn_consume(pool);
    if (pc == NULL) {
        error(0, "PGSQL: Database pool got no connection! DB update failed!");
        return -1;
    }

    if ((ret = dbpool_conn_update(pc, sql, NULL)) == -1)
        error(0, "PGSQL: DB update failed!");
    
    dbpool_conn_produce(pc);
    return ret;
}


static inline List *pgsql_select(const Octstr *sql)
{
    DBPoolConn *pc;
    List *ret = NULL;

#if defined(DLR_TRACE)
    debug("dlr.pgsql", 0, "sql: %s", octstr_get_cstr(sql));
#endif

    pc = dbpool_conn_consume(pool);
    if (pc == NULL) {
        error(0, "PGSQL: Database pool got no connection! DB operation failed!");
        return NULL;
    }

    if (dbpool_conn_select(pc, sql, NULL, &ret) == -1)
        error(0, "PGSQL: Select failed!");
    
    dbpool_conn_produce(pc);
    return ret;
}


static void dlr_pgsql_shutdown()
{
    dbpool_destroy(pool);
    dlr_db_fields_destroy(fields);
}


static void dlr_pgsql_add(struct dlr_entry *entry)
{
    Octstr *sql;

    sql = octstr_format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES "
                        "('%s', '%s', '%s', '%s', '%s', '%s', '%d', '%s', '%d');",
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(fields->field_ts),
                        octstr_get_cstr(fields->field_src), octstr_get_cstr(fields->field_dst),
                        octstr_get_cstr(fields->field_serv), octstr_get_cstr(fields->field_url),
                        octstr_get_cstr(fields->field_mask), octstr_get_cstr(fields->field_boxc),
                        octstr_get_cstr(fields->field_status),
                        octstr_get_cstr(entry->smsc), octstr_get_cstr(entry->timestamp), octstr_get_cstr(entry->source),
                        octstr_get_cstr(entry->destination), octstr_get_cstr(entry->service), octstr_get_cstr(entry->url),
                        entry->mask, octstr_get_cstr(entry->boxc_id), 0);


    pgsql_update(sql);
    
    octstr_destroy(sql);
    dlr_entry_destroy(entry);
}


static struct dlr_entry *dlr_pgsql_get(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    struct dlr_entry *res = NULL;
    Octstr *sql;
    List *result, *row;

    debug("dlr.pgsql", 0, "DLR[%s]: Comparing DLR smsc=%s, ts=%s, dst=%s",
              dlr_type(), octstr_get_cstr(smsc), octstr_get_cstr(ts),
              octstr_get_cstr(dst));

    sql = octstr_format("SELECT %s, %s, %s, %s, %s, %s FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' LIMIT 1;",
                        octstr_get_cstr(fields->field_mask), octstr_get_cstr(fields->field_serv),
                        octstr_get_cstr(fields->field_url), octstr_get_cstr(fields->field_src),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(fields->field_boxc),
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(smsc), octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));


    result = pgsql_select(sql);
    octstr_destroy(sql);

    if (result == NULL || gwlist_len(result) < 1) {
        debug("dlr.pgsql", 0, "no rows found");
        while((row = gwlist_extract_first(result)))
            gwlist_destroy(row, octstr_destroy_item);
        gwlist_destroy(result, NULL);
        return NULL;
    }
    
    row = gwlist_get(result, 0);

    debug("dlr.pgsql", 0, "Found entry, col1=%s, col2=%s, col3=%s, col4=%s, col5=%s col6=%s",
		    octstr_get_cstr(gwlist_get(row, 0)),
		    octstr_get_cstr(gwlist_get(row, 1)),
		    octstr_get_cstr(gwlist_get(row, 2)),
		    octstr_get_cstr(gwlist_get(row, 3)),
		    octstr_get_cstr(gwlist_get(row, 4)),
		    octstr_get_cstr(gwlist_get(row, 5))
	 );

    res = dlr_entry_create();
    gw_assert(res != NULL);
    res->mask        = atoi(octstr_get_cstr(gwlist_get(row, 0)));
    res->service     = octstr_duplicate(gwlist_get(row, 1));
    res->url         = octstr_duplicate(gwlist_get(row, 2));
    res->source      = octstr_duplicate(gwlist_get(row, 3));
    res->destination = octstr_duplicate(gwlist_get(row, 4));
    res->boxc_id     = octstr_duplicate(gwlist_get(row, 5));
    res->smsc        = octstr_duplicate(smsc);

    while((row = gwlist_extract_first(result)))
        gwlist_destroy(row, octstr_destroy_item);
    gwlist_destroy(result, NULL);
    
    return res;
}


static void dlr_pgsql_remove(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
{
    Octstr *sql;

    debug("dlr.pgsql", 0, "removing DLR from database");

    sql = octstr_format("DELETE FROM %s WHERE oid = (SELECT oid FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' LIMIT 1);",
                        octstr_get_cstr(fields->table), octstr_get_cstr(fields->table),
                        octstr_get_cstr(fields->field_smsc),
                        octstr_get_cstr(smsc), octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));

    pgsql_update(sql);
    octstr_destroy(sql);
}


static void dlr_pgsql_update(const Octstr *smsc, const Octstr *ts, const Octstr *dst, int status)
{
    Octstr *sql;

    debug("dlr.pgsql", 0, "updating DLR status in database");

    sql = octstr_format("UPDATE %s SET %s=%d WHERE oid = (SELECT oid FROM %s WHERE %s='%s' AND %s='%s' AND %s='%s' LIMIT 1);",
                        octstr_get_cstr(fields->table),
                        octstr_get_cstr(fields->field_status), status,
                        octstr_get_cstr(fields->table),
                        octstr_get_cstr(fields->field_smsc), octstr_get_cstr(smsc),
                        octstr_get_cstr(fields->field_ts), octstr_get_cstr(ts),
                        octstr_get_cstr(fields->field_dst), octstr_get_cstr(dst));

    pgsql_update(sql);
    octstr_destroy(sql);
}


static long dlr_pgsql_messages(void)
{
    Octstr *sql;
    long ret;
    List *res;

    sql = octstr_format("SELECT count(*) FROM %s;", octstr_get_cstr(fields->table));

    res = pgsql_select(sql);
    octstr_destroy(sql);

    if (res == NULL || gwlist_len(res) < 1) {
        error(0, "PGSQL: Could not get count of DLR table");
        ret = -1;
    } else {
        ret = atol(octstr_get_cstr(gwlist_get(gwlist_get(res, 0), 0)));
    }

    gwlist_destroy(gwlist_extract_first(res), octstr_destroy_item);
    gwlist_destroy(res, NULL);
        
    return ret;
}


static void dlr_pgsql_flush(void)
{
    Octstr *sql;

    sql = octstr_format("DELETE FROM %s;", octstr_get_cstr(fields->table));

    pgsql_update(sql);
    octstr_destroy(sql);
}


static struct dlr_storage handles = {
    .type = "pgsql",
    .dlr_add = dlr_pgsql_add,
    .dlr_get = dlr_pgsql_get,
    .dlr_update = dlr_pgsql_update,
    .dlr_remove = dlr_pgsql_remove,
    .dlr_shutdown = dlr_pgsql_shutdown,
    .dlr_messages = dlr_pgsql_messages,
    .dlr_flush = dlr_pgsql_flush
};


struct dlr_storage *dlr_init_pgsql(Cfg *cfg)
{
    CfgGroup *grp;
    List *grplist;
    Octstr *pgsql_host, *pgsql_user, *pgsql_pass, *pgsql_db, *pgsql_id;
    long pgsql_port = 0;
    Octstr *p = NULL;
    long pool_size;
    DBConf *db_conf = NULL;

    /*
     * check for all mandatory directives that specify the field names
     * of the table used
     */
    if (!(grp = cfg_get_single_group(cfg, octstr_imm("dlr-db"))))
        panic(0, "DLR: PgSQL: group 'dlr-db' is not specified!");

    if (!(pgsql_id = cfg_get(grp, octstr_imm("id"))))
   	    panic(0, "DLR: PgSQL: directive 'id' is not specified!");

    fields = dlr_db_fields_create(grp);
    gw_assert(fields != NULL);

    /*
     * now grap the required information from the 'pgsql-connection' group
     * with the pgsql-id we just obtained
     *
     * we have to loop through all available PostgreSQL connection definitions
     * and search for the one we are looking for
     */

    grplist = cfg_get_multi_group(cfg, octstr_imm("pgsql-connection"));
    while (grplist && (grp = gwlist_extract_first(grplist)) != NULL) {
        p = cfg_get(grp, octstr_imm("id"));
        if (p != NULL && octstr_compare(p, pgsql_id) == 0) {
            goto found;
        }
        if (p != NULL) 
            octstr_destroy(p);
    }
    panic(0, "DLR: PgSQL: connection settings for id '%s' are not specified!",
          octstr_get_cstr(pgsql_id));

found:
    octstr_destroy(p);
    gwlist_destroy(grplist, NULL);

    if (cfg_get_integer(&pool_size, grp, octstr_imm("max-connections")) == -1 || pool_size == 0)
        pool_size = 1;

    if (!(pgsql_host = cfg_get(grp, octstr_imm("host"))))
   	    panic(0, "DLR: PgSQL: directive 'host' is not specified!");
    if (!(pgsql_user = cfg_get(grp, octstr_imm("username"))))
   	    panic(0, "DLR: PgSQL: directive 'username' is not specified!");
    if (!(pgsql_pass = cfg_get(grp, octstr_imm("password"))))
   	    panic(0, "DLR: PgSQL: directive 'password' is not specified!");
    if (!(pgsql_db = cfg_get(grp, octstr_imm("database"))))
   	    panic(0, "DLR: PgSQL: directive 'database' is not specified!");
    cfg_get_integer(&pgsql_port, grp, octstr_imm("port"));  /* optional */

    /*
     * ok, ready to connect to the database
     */
    db_conf = gw_malloc(sizeof(DBConf));
    gw_assert(db_conf != NULL);

    db_conf->pgsql = gw_malloc(sizeof(PgSQLConf));
    gw_assert(db_conf->pgsql != NULL);

    db_conf->pgsql->host = pgsql_host;
    db_conf->pgsql->port = pgsql_port;
    db_conf->pgsql->username = pgsql_user;
    db_conf->pgsql->password = pgsql_pass;
    db_conf->pgsql->database = pgsql_db;

    pool = dbpool_create(DBPOOL_PGSQL, db_conf, pool_size);
    gw_assert(pool != NULL);

    /*
     * XXX should a failing connect throw panic?!
     */
    if (dbpool_conn_count(pool) == 0)
        panic(0,"DLR: PgSQL: database pool has no connections!");

    octstr_destroy(pgsql_id);

    return &handles;
}
#else
/*
 * Return NULL , so we point dlr-core that we were
 * not compiled in.
 */
struct dlr_storage *dlr_init_pgsql(Cfg* cfg)
{
    return NULL;
}
#endif /* HAVE_PGSQL */

Reply via email to