ldmud-3.4.1/doc/
ldmud-3.4.1/doc/efun.de/
ldmud-3.4.1/doc/efun/
ldmud-3.4.1/doc/man/
ldmud-3.4.1/doc/other/
ldmud-3.4.1/mud/
ldmud-3.4.1/mud/heaven7/
ldmud-3.4.1/mud/lp-245/
ldmud-3.4.1/mud/lp-245/banish/
ldmud-3.4.1/mud/lp-245/doc/
ldmud-3.4.1/mud/lp-245/doc/examples/
ldmud-3.4.1/mud/lp-245/doc/sefun/
ldmud-3.4.1/mud/lp-245/log/
ldmud-3.4.1/mud/lp-245/obj/Go/
ldmud-3.4.1/mud/lp-245/players/lars/
ldmud-3.4.1/mud/lp-245/room/death/
ldmud-3.4.1/mud/lp-245/room/maze1/
ldmud-3.4.1/mud/lp-245/room/sub/
ldmud-3.4.1/mud/lp-245/secure/
ldmud-3.4.1/mud/morgengrauen/
ldmud-3.4.1/mud/morgengrauen/lib/
ldmud-3.4.1/mud/sticklib/
ldmud-3.4.1/mud/sticklib/src/
ldmud-3.4.1/mudlib/uni-crasher/
ldmud-3.4.1/pkg/
ldmud-3.4.1/pkg/debugger/
ldmud-3.4.1/pkg/diff/
ldmud-3.4.1/pkg/misc/
ldmud-3.4.1/src/autoconf/
ldmud-3.4.1/src/hosts/
ldmud-3.4.1/src/hosts/GnuWin32/
ldmud-3.4.1/src/hosts/amiga/
ldmud-3.4.1/src/hosts/win32/
ldmud-3.4.1/src/ptmalloc/
ldmud-3.4.1/src/util/
ldmud-3.4.1/src/util/erq/
ldmud-3.4.1/src/util/indent/hosts/next/
ldmud-3.4.1/src/util/xerq/
ldmud-3.4.1/src/util/xerq/lpc/
ldmud-3.4.1/src/util/xerq/lpc/www/
ldmud-3.4.1/test/t-030925/
ldmud-3.4.1/test/t-040413/
ldmud-3.4.1/test/t-041124/
/*---------------------------------------------------------------------------
 * PostgreSQL Database package.
 *
 * Based on code written and donated 2001 by Florian Heinz.
 *---------------------------------------------------------------------------
 * The interface to the PostgreSQL database is implemented through the
 * concept of a controlling object: when opening a database connection,
 * the LPC code has to provide a callback function. The object this function
 * is bound to is the controlling object: all queries to the database
 * will be issued by this object, and the responses will be sent to the
 * callback function.
 *
 * The interface is also asynchronous: the pg_query() efun just queues
 * the query with the database connection, and returns immediately. When
 * the database has finished working the query, the callback function is
 * called with the results.
 *
 * The callback function can be defined by name or by closure, and can
 * be defined with extra parameters:
 *
 *   void <callback>(int type, mixed ret, int id [, mixed extra...])
 *
 *     <type> is the type of the call, <id> identifies the query for which
 *     this call is executed:
 *
 *       PGRES_TUPLES_OK: <ret> is the result from a query.
 *                        It is either a mapping (field name as key, indexing
 *                        <n> values for n returned tuples), or an array
 *                        of arrays (one per row).
 *
 *       PGRES_COMMAND_OK: <ret> is a string which contains the
 *                         server response (e.g. on INSERT or DELETE) 
 *
 *       PGRES_BAD_RESPONSE,
 *       PGRES_NONFATAL_ERROR,
 *       PGRES_FATAL_ERROR: ret is the error-string
 *
 *
 *   void <callback>(int type, mixed ret [, mixed extra...])
 *
 *     <type> is the type of the call, which is not related a specific query:
 *
 *       PGCONN_SUCCESS: The database-connection was established, <ret> is
 *                       a dummy string.
 *       PGCONN_FAILED:  The database-connection failed, <ret> is the error
 *                       message.
 *         The first message to the callback after a call to pg_connect()
 *         is always one of these two.
 *
 *       PGRES_NOTICE: <ret> is a informational text.
 *
 *       PGCONN_ABORTED: If the connection to the backend fails
 *                       we try to re-establish (reset) it. If the
 *                       reset fails, the connection is closed and
 *                       this value is returned. Consider the
 *                       connection gone and don't try to close or 
 *                       otherwise operate further on it.
 *                       <ret> is a dummy string.
 *---------------------------------------------------------------------------
 */

#include "driver.h"

#ifdef USE_PGSQL

#include "typedefs.h"

#include "my-alloca.h"
#include <errno.h>
#include <stddef.h>
#include <time.h>
#include <unistd.h>

#ifdef HAVE_POLL

#include <sys/poll.h>

#else

#    if defined(_AIX) || defined(__EMX__) || defined(OS2)
#        include <sys/select.h>
#    endif

#endif /* HAVE_POLL */

#include <libpq-fe.h>

#include "pkg-pgsql.h"

#include "actions.h"
#include "array.h"
#include "gcollect.h"
#include "instrs.h"
#include "interpret.h"
#include "main.h"
#include "mapping.h"
#include "mstrings.h"
#include "simulate.h"
#include "stdstrings.h"
#include "xalloc.h"

#include "../mudlib/sys/pgsql.h"

/*-------------------------------------------------------------------------*/

#define MAX_RESETS 5   /* Number of reset tries, at max. one per second */

/*-------------------------------------------------------------------------*/
/* Types */

/* --- struct query_queue_s: one entry in the query queue
 * The queue is organized as single-linked list, one for each database
 * connection.
 */
struct query_queue_s
{
   long                   id;
   char                 * str;
   int                    flags;
   struct query_queue_s * next;
};

/* ---  struct dbconn_s: one active database connection.
 * The connections are held in a singly linked list.
 * There can be only one connection per object.
 */

struct dbconn_s
{
   callback_t                  callback;  /* The callback */
   
   PGconn                    * conn;
   PostgresPollingStatusType   pgstate;
   
   int                         state;
   int                         fd;
   int                         resets;

   time_t                      lastreset;
   time_t                      lastreply;

   struct query_queue_s      * queue;
   
   struct dbconn_s           * next;
};


/* Possible struct dbconn_s.states */

#define PG_UNCONNECTED 0
#define PG_CONNECTING  1
#define PG_RESETTING   2
#define PG_RESET_NEXT  3
#define PG_IDLE        4
#define PG_SENDQUERY   5
#define PG_WAITREPLY   6
#define PG_REPLYREADY  7


typedef struct dbconn_s      dbconn_t;
typedef struct query_queue_s query_queue_t;

/*-------------------------------------------------------------------------*/

static dbconn_t *head = NULL;
  /* The list of database connections.
   */

static long query_id = 1;
  /* The query ID counter, used to generate unique IDs.
   */

/*-------------------------------------------------------------------------*/
/* Forward Declarations */

static void pgnotice (dbconn_t *pgconn, const char *msg);

/*-------------------------------------------------------------------------*/
void
pg_setfds (fd_set *readfds, fd_set *writefds, int *nfds)

/* Called from the get_message() loop in comm.c, this function has to add
 * the fds of the database connections to the fd sets.
 */

{
    dbconn_t *ptr;
    
    for (ptr = head; ptr != NULL; ptr = ptr->next)
    {
        if (ptr->fd < 0)
            continue;
        if ((ptr->pgstate == PGRES_POLLING_WRITING)
         || (ptr->state == PG_SENDQUERY)
           )
            FD_SET(ptr->fd, writefds);
        FD_SET(ptr->fd, readfds);
        if (*nfds <= ptr->fd)
            *nfds = ptr->fd + 1;
    }
} /* pg_setfds() */

/*-------------------------------------------------------------------------*/
static dbconn_t *
find_current_connection (object_t * obj)

/* Find the <dbconn> which has a callback in the object <obj>.
 */

{
   dbconn_t *ptr = head;
   
   while (ptr && callback_object(&ptr->callback) != obj)
        ptr = ptr->next;
   
   return ptr;
} /* find_current_connection() */

/*=========================================================================*/

/*                             Queue management                            */

/*-------------------------------------------------------------------------*/
static query_queue_t *
queue (dbconn_t *db, const char *query)

/* Create a new query_queue entry for <query>, link it into the list
 * for database connection <db>, and return it.
 * The query string is duplicated.
 *
 * Throw an error when out of memory.
 */

{
    query_queue_t *tmp;
    
    tmp = db->queue;
    if (!tmp)
        tmp = db->queue = xalloc(sizeof(*tmp));
    else
    {
        while (tmp->next)
            tmp = tmp->next;
        tmp->next = xalloc(sizeof(*tmp));
        tmp = tmp->next;
    }

    if (!tmp)
    {
        outofmemory("new Postgres query");
        /* NOTREACHED */
        return NULL;
    }

    memset(tmp, 0, sizeof(*tmp));
    tmp->id  = query_id++;
    tmp->str = string_copy(query);
    tmp->flags = 0;
    
    return tmp;
} /* queue() */

/*-------------------------------------------------------------------------*/
static void
dequeue (dbconn_t *db)

/* Unqueue the first query from connection <db> and deallocate it.
 */

{
    query_queue_t *tmp;
  
    tmp = db->queue;
    db->queue = tmp->next;
    if (tmp->str)
        xfree(tmp->str);
    xfree(tmp);
} /* dequeue() */

/*-------------------------------------------------------------------------*/
static dbconn_t *
alloc_dbconn (void)

/* Allocate a new database connection structure, link it into the global
 * list and return it.
 *
 * Throw an error when out of memory.
 */

{
    dbconn_t *ret;
    
    memsafe(ret = xalloc(sizeof(*ret)), sizeof(*ret), "new DB connection");
    
    memset(ret, 0, sizeof(*ret));
    ret->next = head;
    ret->fd = -1;
    ret->state = PG_UNCONNECTED;
    head = ret;
    
    return ret;
} /* alloc_dbconn() */

/*-------------------------------------------------------------------------*/
static void
dealloc_dbconn (dbconn_t *del)

/* Unlink the database connection <del> from the list and deallocate it
 * and all resources held by it.
 */

{
    dbconn_t *ptr;
    
    if (!del)
        return;

    free_callback(&del->callback);

    if (del == head)
        head = head->next;
    else
    {
        ptr = head;
        while (ptr->next && (ptr->next != del))
            ptr = ptr->next;
        if (ptr->next)
            ptr->next = ptr->next->next;
    }
    while (del->queue)
      dequeue(del);
    xfree(del);
} /* dealloc_dbconn() */

/*=========================================================================*/

/*                          Connection management                          */

/*-------------------------------------------------------------------------*/
static int
pgconnect (dbconn_t *db, char *connstr)

/* Connect <db> to a database, using <connstr> for the connection parameters.
 * Return 0 on success, and -1 on failure.
 */

{
    db->conn = PQconnectStart(connstr);
    if (!db->conn)
        return -1;

    if (PQstatus(db->conn) == CONNECTION_BAD)
        return -1;
    
    PQsetNoticeProcessor(db->conn, (void*) pgnotice, db);
    db->fd = PQsocket(db->conn);
    db->state = PG_CONNECTING;
    db->pgstate = PGRES_POLLING_WRITING;
    return 0;
} /* pgconnect() */

/*-------------------------------------------------------------------------*/
static void
pgclose (dbconn_t *pgconn)

/* Close the database connection of <pgconn>.
 */

{
    pgconn->state = PG_UNCONNECTED;
    if (pgconn->conn)
        PQfinish(pgconn->conn);
    pgconn->conn = NULL;
    pgconn->fd = -1;
} /* pgclose() */

/*-------------------------------------------------------------------------*/
static void
pgreset (dbconn_t *pgconn)

/* Reset the connection to <pgconn>.
 */

{
    if (!PQresetStart(pgconn->conn))
    {
        pgclose(pgconn);
        
        if (callback_object(&pgconn->callback))
        {
            push_number(inter_sp, PGCONN_ABORTED);
            push_ref_string(inter_sp, STR_PG_RESET_FAILED);
            (void)apply_callback(&pgconn->callback, 2);
        }
        return;
    }
    
    pgconn->state = PG_RESETTING;
    pgconn->pgstate = PGRES_POLLING_WRITING;
} /* pgreset() */


/*=========================================================================*/

/*                     Database Result Handling                            */

/*-------------------------------------------------------------------------*/
static void
pgnotice (dbconn_t *pgconn, const char *msg)

/* Database connection <pgconn> wishes to send <msg> to the controlling
 * object.
 */

{
    current_object = callback_object(&pgconn->callback);
    command_giver = 0;
    current_interactive = 0;
    
    if (current_object != NULL)
    {
        push_number(inter_sp, PGRES_NOTICE);
        push_c_string(inter_sp, msg);
        (void)apply_callback(&pgconn->callback, 2);
    }
    else
    {
        debug_message("%s PG connection object destructed.\n", time_stamp());
        pgreset(pgconn);
    }
} /* pgnotice() */

/*-------------------------------------------------------------------------*/
static void
pgresult (dbconn_t *pgconn, PGresult *res)

/* The most recent query on <pgconn> returned the result <res>. Encode it into
 * a nice LPC data package and send it to the controlling object.
 * The query is removed from <pgconn>.
 */

{
    int type;
    
    current_object = callback_object(&pgconn->callback);
    command_giver = 0;
    current_interactive = 0;
    
    type = PQresultStatus(res);
    
    push_number(inter_sp, type);
    
    switch (type)
    {
    case PGRES_TUPLES_OK:
      {
        int nfields, ntuples, i, j;

        nfields = PQnfields(res);
        ntuples = PQntuples(res);
        
        if (pgconn->queue->flags & PG_RESULT_MAP)
        {
            /* Return the result as mapping */

            mapping_t *map;

            if (max_mapping_size
             && (nfields * (ntuples + 1)) > (p_int)max_mapping_size)
            {
                PQclear(res);
                dequeue(pgconn);
                errorf("Query result exceeded mappingsize limit.\n");
            }
            
            if (max_mapping_keys && nfields > (p_int)max_mapping_keys)
            {
                PQclear(res);
                dequeue(pgconn);
                errorf("Query result exceeded mappingsize limit.\n");
            }
            
            map = allocate_mapping(nfields, ntuples);
            if (!map)
            {
                push_number(inter_sp, 0);
                break;
            }
            
            for (i = 0; i < nfields; i++)
            {
                svalue_t * entry, fname;

                put_c_string(&fname, PQfname(res, i));
                entry = get_map_lvalue(map, &fname);
                free_svalue(&fname);
                if (!entry)
                    break;
                for (j = 0; j < ntuples; j++)
                    put_c_string(&entry[j], PQgetvalue(res, j, i));
            }
            
            push_mapping(inter_sp, map);
        }
        else
        {
            /* Return the result as array of arrays */

            vector_t * array;
            svalue_t * entry;

            if (max_array_size
             && (   (ntuples >= (p_int)max_array_size)
                 || (nfields >= (p_int)max_array_size))
               )
            {
                PQclear(res);
                dequeue(pgconn);
                errorf("Query result exceeded array limit.\n");
            }

            array = allocate_array(ntuples+1);
            if (!array)
            {
                push_number(inter_sp, 0);
                break;
            }
            
            entry = &array->item[0];
            put_array(entry, allocate_array(nfields));
            for (j = 0; j < nfields; j++)
                put_c_string(&entry->u.vec->item[j], PQfname(res, j));
            
            for (i = 0; i < ntuples; i++)
            {
                entry = &array->item[i+1];
                put_array(entry, allocate_array(nfields));
                for (j = 0; j < nfields; j++)
                    put_c_string(&entry->u.vec->item[j], PQgetvalue(res, i, j));
            }
            push_array(inter_sp, array);
        }
        break;
      }
      
    case PGRES_COMMAND_OK:
        push_c_string(inter_sp, PQcmdStatus(res));
        break;
       
     case PGRES_BAD_RESPONSE:
        push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
        break;
       
     case PGRES_FATAL_ERROR:
     case PGRES_NONFATAL_ERROR:
        push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
        break;
       
     default:
        PQclear(res);
        return;
    }
    
    if (callback_object(&pgconn->callback))
    {
        push_number(inter_sp, pgconn->queue->id);
        (void)apply_callback(&pgconn->callback, 3);
    }
    else
    {
        debug_message("%s PG connection object destructed.\n", time_stamp());
        pgreset(pgconn);
    }

    dequeue(pgconn);
    PQclear(res);
} /* pgresult() */


/*=========================================================================*/

/*                   Database Connection Handling                          */

/*-------------------------------------------------------------------------*/
static void
pg_process_connect_reset (dbconn_t *pgconn)

/* Reset the connection <pgconn>.
 */

{
    int reset;
    PostgresPollingStatusType (*pqpollhandler)(PGconn *);
    
    reset = (pgconn->state == PG_RESETTING);
    
    if (reset)
        debug_message("%s PGSQL Connection resetting.\n", time_stamp());
    
    pqpollhandler = reset ? PQresetPoll : PQconnectPoll;
    
    if (pgconn->pgstate != PGRES_POLLING_ACTIVE)
    {
        int rc;

#ifdef HAVE_POLL
        struct pollfd ufd;

        ufd.fd = pgconn->fd;
#else
        fd_set readfds, writefds;

        FD_ZERO(&readfds);
        FD_ZERO(&writefds);
#endif /* HAVE_POLL */

        switch (pgconn->pgstate)
        {
        case PGRES_POLLING_READING:
#ifdef HAVE_POLL
           ufd.events = POLLIN;
#else
           FD_SET(pgconn->fd, &readfds);
#endif /* HAVE_POLL */
           break;

        case PGRES_POLLING_WRITING:
#ifdef HAVE_POLL
           ufd.events = POLLOUT;
#else
           FD_SET(pgconn->fd, &writefds);
#endif /* HAVE_POLL */
           break;

        default:
           /* Shouldn't happen */
           break;
        }

#ifdef HAVE_POLL
        do {
            rc = poll(&ufd, 1, 0);
        } while (rc < 0 && errno == EINTR);

        if (rc > 0)
        {
           pgconn->pgstate = PGRES_POLLING_ACTIVE;
           if (ufd.revents & POLLIN)
               pgconn->lastreply = time(NULL);
        }
#else
        do {
            struct timeval timeout;

            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            rc = select(pgconn->fd+1, &readfds, &writefds, NULL, &timeout);
            if (rc >= 0 || errno != EINTR)
                break;
        } while (rc < 0 && errno == EINTR);

        if (rc > 0)
        {
           pgconn->pgstate = PGRES_POLLING_ACTIVE;
           if (FD_ISSET(pgconn->fd, &readfds))
               pgconn->lastreply = time(NULL);
        }
#endif /* HAVE_POLL */
    }
    
    if (pgconn->pgstate == PGRES_POLLING_ACTIVE)
        pgconn->pgstate = pqpollhandler(pgconn->conn);
    
    if (pgconn->pgstate == PGRES_POLLING_FAILED)
    {
        if (reset && (pgconn->resets < MAX_RESETS))
        {
            pgconn->resets++;
            pgconn->state = PG_RESET_NEXT;
            pgconn->lastreset = time(NULL);
        }
        else
        {
            if (callback_object(&pgconn->callback))
            {
                push_number(inter_sp, reset ? PGCONN_ABORTED : PGCONN_FAILED);
                push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
                (void)apply_callback(&pgconn->callback, 2);
            }
            else
            {
                debug_message("%s PG connection object destructed.\n", time_stamp());
            }
            pgclose(pgconn);
        }
    }
    else if (pgconn->pgstate == PGRES_POLLING_OK)
    {
        if (!reset)
        {
           /* The program should not notice a successful reset */
            if (callback_object(&pgconn->callback))
            {
                push_number(inter_sp, PGCONN_SUCCESS);
                push_ref_string(inter_sp, STR_SUCCESS);
                (void)apply_callback(&pgconn->callback, 2);
            }
            else
            {
                debug_message("%s PG connection object destructed.\n", time_stamp());
                pgreset(pgconn);
            }
        }
        pgconn->resets = 0;
        if (pgconn->queue)
            pgconn->state = PG_SENDQUERY;
        else
            pgconn->state = PG_IDLE;
    }
} /* pg_process_connect_reset() */

/*-------------------------------------------------------------------------*/
static void
pg_process_query (dbconn_t *pgconn)

/* Query the connection <pgconn> for data and act on it.
 */

{
    int rc;

#ifdef HAVE_POLL
    struct pollfd ufd;
    
    ufd.fd = pgconn->fd;
    ufd.events = POLLIN;
#else
    struct fd_set readfds;

    FD_ZERO(&readfds);
    FD_SET(pgconn->fd, &readfds);

#endif /* HAVE_POLL */
    
    PQflush(pgconn->conn);

#ifdef HAVE_POLL

    do {
        rc = poll(&ufd, 1, 0);
    } while (rc < 0 && errno == EINTR);

#else

    do {
        struct timeval timeout;

        timeout.tv_sec = 0;
        timeout.tv_usec = 0;
        rc = select(pgconn->fd+1, &readfds, NULL, NULL, &timeout);
    } while (rc < 0 && errno == EINTR);

#endif /* HAVE_POLL */

    if (rc > 0)
    {
        pgconn->lastreply = time(NULL);
        PQconsumeInput(pgconn->conn);
        if (!PQisBusy(pgconn->conn))
            pgconn->state = PG_REPLYREADY;
    }
} /* pg_process_query() */

/*-------------------------------------------------------------------------*/
static void
pg_process_one (dbconn_t *pgconn)

/* Check the state of <pgconn> and take appropriate action.
 */

{
    PGresult *res;
    
    switch (pgconn->state)
    {
    case PG_CONNECTING:
    case PG_RESETTING:
        pg_process_connect_reset(pgconn);
        break;

    case PG_SENDQUERY:
    case PG_IDLE:
        if (pgconn->queue)
        {
            pgconn->lastreply = time(NULL);
            PQsendQuery(pgconn->conn, pgconn->queue->str);
            pgconn->state = PG_WAITREPLY;
        }
        break;

    case PG_WAITREPLY:
        pg_process_query(pgconn);
        break;

    case PG_UNCONNECTED:
        dealloc_dbconn(pgconn);
        break;

    case PG_RESET_NEXT:
        if (pgconn->lastreset != time(NULL))
            pgreset(pgconn);
        break;
    } /* switch() */
    
    /* Validate the connection */
    if ((PQstatus(pgconn->conn) != CONNECTION_OK)
     && (pgconn->state >= PG_IDLE)
       )
        pgreset(pgconn);
    
    /* If there is a result waiting, get it and forward
     * it to the controlling object.
     */
    if (pgconn->state == PG_REPLYREADY)
    {
        do
        {
            res = PQgetResult(pgconn->conn);
            if (!res)
            {
                pgconn->state = PG_IDLE;
                pg_process_one(pgconn);
                break;
            } else
                pgresult(pgconn, res);
        }
        while (!PQisBusy(pgconn->conn));
    }
} /* pg_process_one() */

/*-------------------------------------------------------------------------*/
void
pg_process_all (void)

/* Called from the get_message() loop in comm.c, this function checks
 * all known database connections for their status and takes appropriate
 * actions.
 */

{
    dbconn_t *ptr = head;
    Bool got_dead = MY_FALSE;
    
    while (ptr)
    {
        if (!callback_object(&ptr->callback)
         && ptr->state != PG_UNCONNECTED
           )
        {
            debug_message("%s PG connection object destructed.\n", time_stamp());
            pgclose(ptr);
            got_dead = MY_TRUE;
        }
        else
           pg_process_one(ptr);
        ptr = ptr->next;
    }

    if (got_dead)
        pg_purge_connections();
} /* pg_process_all() */


/*-------------------------------------------------------------------------*/
void
pg_purge_connections (void)

/* Check the list of database connections and purge all UNCONNECTED
 * connections and those with destructed callback objects.
 */

{
    dbconn_t *prev;
    
    while (head)
    {
        if (head->state != PG_UNCONNECTED
         && !callback_object(&head->callback)
           )
        {
            debug_message("%s PG connection object destructed.\n", time_stamp());
            pgclose(head);
        }
        if (head->state == PG_UNCONNECTED)
            dealloc_dbconn(head);
        else
            break;
    }

    if (head)
    {
        prev = head;
        while (prev->next)
        {
            if (prev->next->state != PG_UNCONNECTED
             && !callback_object(&prev->next->callback)
               )
            {
                debug_message("%s PG connection object destructed.\n", time_stamp());
                pgclose(prev->next);
            }
            if (prev->next->state == PG_UNCONNECTED)
                dealloc_dbconn(prev->next);
            else
                prev = prev->next;
        }
    }
} /* pg_purge_connections() */

/*-------------------------------------------------------------------------*/
static
Bool check_privilege (const char * efun_name, Bool raise_error, svalue_t * sp)

/* Check if the user has the privileges to execute efun <efun_name>.
 * The function executes a call to master->privilege_violation("mysql",
 * efun_name) and evaluates the result.
 * If the master result is TRUE, the function returns TRUE.
 * If the master result is FALSE, the function returns FALSE if <raise_error>
 * is FALSE, and raises an error if <raise_error> is true.
 */

{
    Bool rc;

    inter_sp = sp+1;
    put_c_string(inter_sp, efun_name);
    rc = privilege_violation(STR_PGSQL, inter_sp, inter_sp);
    free_svalue(inter_sp);
    inter_sp--;

    if (rc)
        return MY_TRUE;

    if (raise_error)
    {
        errorf("%s(): Privilege violation.\n", efun_name);
        /* NOTREACHED */
    }

    return MY_FALSE;
} /* check_privilege() */



/*=========================================================================*/

/*                           EFUNS                                         */

/*-------------------------------------------------------------------------*/
svalue_t *
v_pg_connect (svalue_t *sp, int num_arg)

/* EFUN pg_connect()
 *
 *   int pg_connect (string conn, string fun)
 *   int pg_connect (string conn, string fun, string|object obj, mixed extra, ...)
 *   int pg_connect (string conn, closure cl, mixed extra, ...)
 *
 * Open a database connection as directed by <conn>, and assign the
 * callback function <fun>/<cl> with the optional <extra> parameters to
 * it.
 *
 * The object holding the callback function becomes the controlling object;
 * obiously it is an error to assign more than one connection to the same
 * controlling object.
 *
 * The <conn> string is in the format accepted by Postgres' PQconnectStart()
 * API functions. Pass an empty string to use the default options, or
 * a string holding the '<key>=<value>' options separated by whitespace.
 * The most useful options are:
 *   dbname:   The database name
 *   user:     The user name to connect as.
 *   password: Password to be used.
 *
 * Return 0 on success, and -1 on failure.
 */

{
    dbconn_t   *db;
    int         st;
    int         error_index;
    callback_t  cb;
    object_t   *cb_object;
    svalue_t   *arg = sp - num_arg + 1;

    check_privilege(instrs[F_PG_CONNECT].name, MY_TRUE, sp);

    /* Get the callback information */

    error_index = setup_efun_callback(&cb, arg+1, num_arg-1);

    if (error_index >= 0)
    {
        vefun_bad_arg(error_index+2, arg);
        /* NOTREACHED */
        return arg;
    }
    inter_sp = sp = arg+1;
    put_callback(sp, &cb);

    cb_object = callback_object(&cb);
    if (!cb_object)
    {
        free_callback(&cb);
        errorf("pgconnect(): Callback object is destructed.\n");
        /* NOTREACHED */
        return arg;
    }

    /* Check the callback object if it has a connection already */

    db = find_current_connection(cb_object);
    if (db)
    {
        if (db->state == PG_UNCONNECTED)
            dealloc_dbconn(db);
        else
        {
            free_callback(&cb);
            errorf("pgconnect(): Already connected\n");
            /* NOTREACHED */
            return arg;
        }
    }

    /* Connect to the database */

    db = alloc_dbconn();
    db->callback = cb;
    
    st = pgconnect(db, get_txt(arg[0].u.str));
    if (st < 0)
        pgclose(db);

    free_svalue(arg); /* the callback entries are gone already */
    put_number(arg, st);
    return arg;
} /* f_pg_connect() */

/*-------------------------------------------------------------------------*/
svalue_t *
f_pg_pending (svalue_t *sp)

/* EFUN pg_pending()
 *
 *   int pg_pending ()
 *   int pg_pending (object obj)
 *
 * Return the number of pending queries for the connection on the given
 * object <obj> (default is the current object). The object has no
 * database connection, return -1.
 */

{
    dbconn_t *db;
    int       count = -1;
    
    check_privilege(instrs[F_PG_PENDING].name, MY_TRUE, sp);

    db = find_current_connection(sp->u.ob);
    if (db)
    {
        query_queue_t * qu;

        for (count = 0, qu = db->queue
            ; qu != NULL
            ; count++, qu = qu->next
            ) NOOP;
    }
    
    free_svalue(sp);
    put_number(sp, count);
    return sp;
} /* f_pg_pending() */

/*-------------------------------------------------------------------------*/
svalue_t *
v_pg_query (svalue_t *sp, int numarg)

/* EFUN pg_query()
 *
 *  int pg_query (string query)
 *  int pg_query (string query, int flags)
 *
 * Queue a new query <query> to the database connection on the current
 * object. Return the unique id of the query. The query result itself
 * will be passed as argument to the callback function.
 *
 * <flags> can be one of these values:
 *   PG_RESULT_ARRAY: Pass the query result as array.
 *   PG_RESULT_MAP:   Pass the query result as mapping.
 */

{
    dbconn_t *db;
    query_queue_t *q;
    int flags = PG_RESULT_ARRAY;
    
    check_privilege(instrs[F_PG_QUERY].name, MY_TRUE, sp);

    if (numarg == 2)
    {
        flags = sp->u.number;
        sp--;
    }
    
    db = find_current_connection(current_object);
    if (!db)
        errorf("pgquery(): not connected\n");

    q = queue(db, get_txt(sp->u.str));
    q->flags = flags;
    if (db->state == PG_IDLE)
        db->state = PG_SENDQUERY;

    free_svalue(sp);
    put_number(sp, q->id);
    return sp;
} /* f_pg_query() */

/*-------------------------------------------------------------------------*/
svalue_t *
f_pg_close (svalue_t *sp)

/* EFUN pg_close()
 *
 *   void pg_close()
 *
 * Close the database connection for the current object, if there is one.
 */

{
    dbconn_t *db;
    
    check_privilege(instrs[F_PG_CLOSE].name, MY_TRUE, sp);

    db = find_current_connection(current_object);
    if (db)
        pgclose(db);
    
    return sp;
} /* f_pg_close() */

/*-------------------------------------------------------------------------*/
svalue_t *
f_pg_conv_string (svalue_t *sp)

/* EFUN pg_escapeString
 *
 * string pg_conv_string(string input)
 *
 * Escape a string for use within an SQL command.
 */
{
    string_t *escaped;
    int size = mstrsize(sp->u.str);
    memsafe(escaped = alloc_mstring(2 * size), 2 * size
                                             , "escaped sql string");

    // PQescapeString(char *to, char *from, size_t length);
    PQescapeString( (unsigned char *)get_txt(escaped)
                  , (unsigned char *)get_txt(sp->u.str), size);
    free_string_svalue(sp);
    put_string(sp, escaped);
    return sp;
} /* pg_conv_string() */

/*=========================================================================*/

/*                          GC SUPPORT                                     */

#ifdef GC_SUPPORT

/*-------------------------------------------------------------------------*/
void
pg_clear_refs (void)

/* GC Support: Clear all references from the database connections
 */

{
    dbconn_t *dbconn;

    for (dbconn = head; dbconn != NULL; dbconn = dbconn->next)
    {
        clear_ref_in_callback(&(dbconn->callback));
    }
} /* pg_clear_refs() */

/*-------------------------------------------------------------------------*/
void
pg_count_refs (void)

/* GC Support: Count all references from the database connections
 */

{
    dbconn_t *dbconn;

    for (dbconn = head; dbconn != NULL; dbconn = dbconn->next)
    {
        query_queue_t *qu;

        note_malloced_block_ref(dbconn);
        count_ref_in_callback(&(dbconn->callback));

        for (qu = dbconn->queue; qu != NULL; qu = qu->next)
        {
            note_malloced_block_ref(qu);
            note_malloced_block_ref(qu->str);
        }
    }
} /* pg_count_refs() */

#endif /* GC_SUPPORT */

/*-------------------------------------------------------------------------*/

#endif /* USE_PGSQL */

/*************************************************************************/