aboutsummaryrefslogblamecommitdiffstats
path: root/erts/example/pg_async2.c
blob: 368f9d32d03cabad82a3fadf59c039207fcd7641 (plain) (tree)


















































































































































































































































                                                                              
/*
 * %CopyrightBegin%
 * 
 * Copyright Ericsson AB 2006-2009. All Rights Reserved.
 * 
 * The contents of this file are subject to the Erlang Public License,
 * Version 1.1, (the "License"); you may not use this file except in
 * compliance with the License. You should have received a copy of the
 * Erlang Public License along with this software. If not, it can be
 * retrieved online at http://www.erlang.org/.
 * 
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 * 
 * %CopyrightEnd%
 */
/*
 * Purpose: A driver using libpq to connect to Postgres
 * from erlang, a sample for the driver documentation
 */

#include <erl_driver.h>

#include <libpq-fe.h>

#include <ei.h>

#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include "pg_encode2.h"

#define L     fprintf(stderr, "%d\r\n", __LINE__)

/* Driver interface declarations */
static ErlDrvData start(ErlDrvPort port, char *command);
static void stop(ErlDrvData drv_data);
static int control(ErlDrvData drv_data, unsigned int command, char *buf, 
		   int len, char **rbuf, int rlen); 
static void ready_input(ErlDrvData drv_data, ErlDrvEvent event);

static ErlDrvEntry pq_driver_entry = {
    NULL,			/* init */
    start, 
    stop, 
    NULL,			/* output */
    ready_input,		/* ready_input */
    NULL,			/* ready_output */ 
    "pg_async2", 
    NULL,			/* finish */
    NULL,			/* handle */
    control, 
    NULL,			/* timeout */
    NULL,			/* outputv */
    NULL,
    NULL,
    NULL,
    NULL
};

typedef struct our_data_t {
    PGconn* conn;
    ErlDrvPort port;
    int socket;
    char* s;
} our_data_t;

our_data_t our_data;

/* Keep the following definitions in alignment with the FUNC_LIST
 * in erl_pq_sync.erl
 */

#define DRV_CONNECT             1
#define DRV_DISCONNECT          2
#define DRV_SELECT              3

/* INITIALIZATION AFTER LOADING */

/* 
 * This is the init function called after this driver has been loaded.
 * It must *not* be declared static. Must return the address to 
 * the driver entry.
 */

#ifdef __cplusplus
extern "C" {			/* this should be in the DRIVER_INIT macro! */
#endif
DRIVER_INIT(pq_drv)
{
    return &pq_driver_entry;
}
#ifdef __cplusplus
}
#endif

/* DRIVER INTERFACE */
static ErlDrvData start(ErlDrvPort port, char *command)
{ 
    our_data_t* data = driver_alloc(sizeof(our_data_t));
    data->port = port;
    data->conn = NULL;
    return (ErlDrvData)data;
}


static char* get_s(const char* buf, int len);
static void free_s(char* s);

static int do_connect(const char *s, our_data_t* data);
static int do_disconnect(our_data_t* data);
static int do_select(const char* s, our_data_t* data);

static void stop(ErlDrvData drv_data)
{
    do_disconnect((our_data_t*)drv_data);
}


/* Since we are operating in binary mode, the return value from control
 * is irrelevant, as long as it is not negative.
 */
static int control(ErlDrvData drv_data, unsigned int command, char *buf, 
		   int len, char **rbuf, int rlen)
{
    int r;
    char* s;

    s = get_s(buf, len);
    switch (command) {
    case DRV_CONNECT:
        r = do_connect(s, (our_data_t*)drv_data);
        break;
    case DRV_DISCONNECT:
	r = do_disconnect((our_data_t*)drv_data);
	break;
    case DRV_SELECT:
	r = do_select(s, (our_data_t*)drv_data);
	break;
    default:
	r = -1;
	break;
    }
    free_s(s);
    return r;
}

static int do_connect(const char *s, our_data_t* data)
{
    ei_x_buff x;
    PGconn* conn = PQconnectdb(s);

    ei_x_new_with_version(&x);
    if (PQstatus(conn) != CONNECTION_OK) {
        encode_error(&x, conn);
	PQfinish(conn);
	conn = NULL;
    } else {
        encode_ok(&x);
	data->socket = PQsocket(conn);
	driver_select(data->port, (ErlDrvEvent)data->socket, DO_READ, 1);
    }
    driver_output(data->port, x.buff, x.index);
    ei_x_free(&x);
    data->conn = conn;
    return 0;
}

static int do_disconnect(our_data_t* data)
{
    ei_x_buff x;

    if (data->socket == 0)
	return 0;
    driver_select(data->port, (ErlDrvEvent)data->socket, DO_READ, 0);
    data->socket = 0;
    PQfinish(data->conn);
    data->conn = NULL;
    ei_x_new_with_version(&x);
    encode_ok(&x);
    driver_output(data->port, x.buff, x.index);
    ei_x_free(&x);
    return 0;
}

static int do_select(const char* s, our_data_t* data)
{
    PGconn* conn = data->conn;

    /* if there's an error return it now */
    if (PQsendQueryParams(conn, s, 0, NULL, NULL, NULL, NULL, 1) == 0) {
	ei_x_buff x;
	ei_x_new_with_version(&x);
	encode_error(&x, conn);
	driver_output(data->port, x.buff, x.index);
	ei_x_free(&x);
    }
    /* else wait for ready_output to get results */
    return 0;
}

static void ready_input(ErlDrvData drv_data, ErlDrvEvent event)
{
    our_data_t* data = (our_data_t*)drv_data;
    PGconn* conn = data->conn;
    ei_x_buff x;
    PGresult* res;

    PQconsumeInput(conn);
    if (PQisBusy(conn)) 
	return;
    ei_x_new_with_version(&x);
    res = PQgetResult(conn);
    encode_result(&x, res, conn);
    driver_output(data->port, x.buff, x.index);
    ei_x_free(&x);
    PQclear(res);
    for (;;) {
	res = PQgetResult(conn);
	if (res == NULL)
	    break;
	PQclear(res);
    }
}

/* utilities */

static char* get_s(const char* buf, int len)
{
    char* result;
    if (len < 1 || len > 1000) return NULL;
    result = driver_alloc(len+1);
    memcpy(result, buf, len);
    result[len] = '\0';
    return result;
}

static void free_s(char* s)
{
    driver_free(s);
}