diff options
Diffstat (limited to 'erts/example/pg_async.c')
-rw-r--r-- | erts/example/pg_async.c | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/erts/example/pg_async.c b/erts/example/pg_async.c new file mode 100644 index 0000000000..7ffb4bb1f3 --- /dev/null +++ b/erts/example/pg_async.c @@ -0,0 +1,224 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2006-2009. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ +/* + * Purpose: A driver using libpq to connect to Postgres + * from erlang, a sample for the driver documentation + */ + +#include <erl_driver.h> + +#include <libpq-fe.h> + +#include <ei.h> + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "pg_encode.h" + +/* Driver interface declarations */ +static ErlDrvData start(ErlDrvPort port, char *command); +static void stop(ErlDrvData drv_data); +static int control(ErlDrvData drv_data, unsigned int command, char *buf, + int len, char **rbuf, int rlen); +static void ready_io(ErlDrvData drv_data, ErlDrvEvent event); + +static ErlDrvEntry pq_driver_entry = { + NULL, /* init */ + start, + stop, + NULL, /* output */ + ready_io, /* ready_input */ + ready_io, /* ready_output */ + "pg_async", /* the name of the driver */ + NULL, /* finish */ + NULL, /* handle */ + control, + NULL, /* timeout */ + NULL, /* outputv */ + NULL, /* ready_async */ + NULL, /* flush */ + NULL, /* call */ + NULL /* event */ +}; + +typedef struct our_data_t { + PGconn* conn; + ErlDrvPort port; + int socket; + int connecting; +} our_data_t; + +/* Keep the following definitions in alignment with the FUNC_LIST + * in erl_pq_sync.erl + */ + +#define DRV_CONNECT 'C' +#define DRV_DISCONNECT 'D' +#define DRV_SELECT 'S' + +/* #define L fprintf(stderr, "%d\r\n", __LINE__) */ + +/* INITIALIZATION AFTER LOADING */ + +/* + * This is the init function called after this driver has been loaded. + * It must *not* be declared static. Must return the address to + * the driver entry. + */ +DRIVER_INIT(pq_drv) +{ + return &pq_driver_entry; +} + +static char* get_s(const char* buf, int len); +static int do_connect(const char *s, our_data_t* data); +static int do_disconnect(our_data_t* data); +static int do_select(const char* s, our_data_t* data); + +/* DRIVER INTERFACE */ +static ErlDrvData start(ErlDrvPort port, char *command) +{ + our_data_t* data = driver_alloc(sizeof(our_data_t)); + data->port = port; + data->conn = NULL; + return (ErlDrvData)data; +} + +static void stop(ErlDrvData drv_data) +{ + do_disconnect((our_data_t*)drv_data); +} + +static int control(ErlDrvData drv_data, unsigned int command, char *buf, + int len, char **rbuf, int rlen) +{ + int r; + char* s = get_s(buf, len); + our_data_t* data = (our_data_t*)drv_data; + switch (command) { + case DRV_CONNECT: r = do_connect(s, data); break; + case DRV_DISCONNECT: r = do_disconnect(data); break; + case DRV_SELECT: r = do_select(s, data); break; + default: r = -1; break; + } + driver_free(s); + return r; +} + +static int do_connect(const char *s, our_data_t* data) +{ + PGconn* conn = PQconnectStart(s); + if (PQstatus(conn) == CONNECTION_BAD) { + ei_x_buff x; + ei_x_new_with_version(&x); + encode_error(&x, conn); + PQfinish(conn); + conn = NULL; + driver_output(data->port, x.buff, x.index); + ei_x_free(&x); + } + PQconnectPoll(conn); + int socket = PQsocket(conn); + data->socket = socket; + driver_select(data->port, (ErlDrvEvent)socket, DO_READ, 1); + driver_select(data->port, (ErlDrvEvent)socket, DO_WRITE, 1); + data->conn = conn; + data->connecting = 1; + return 0; +} + +static int do_disconnect(our_data_t* data) +{ + ei_x_buff x; + driver_select(data->port, (ErlDrvEvent)data->socket, DO_READ, 0); + driver_select(data->port, (ErlDrvEvent)data->socket, DO_WRITE, 0); + PQfinish(data->conn); + data->conn = NULL; + ei_x_new_with_version(&x); + encode_ok(&x); + driver_output(data->port, x.buff, x.index); + ei_x_free(&x); + return 0; +} + +static int do_select(const char* s, our_data_t* data) +{ + data->connecting = 0; + PGconn* conn = data->conn; + /* if there's an error return it now */ + if (PQsendQuery(conn, s) == 0) { + ei_x_buff x; + ei_x_new_with_version(&x); + encode_error(&x, conn); + driver_output(data->port, x.buff, x.index); + ei_x_free(&x); + } + /* else wait for ready_output to get results */ + return 0; +} + +static void ready_io(ErlDrvData drv_data, ErlDrvEvent event) +{ + PGresult* res = NULL; + our_data_t* data = (our_data_t*)drv_data; + PGconn* conn = data->conn; + ei_x_buff x; + ei_x_new_with_version(&x); + if (data->connecting) { + ConnStatusType status; + PQconnectPoll(conn); + status = PQstatus(conn); + if (status == CONNECTION_OK) + encode_ok(&x); + else if (status == CONNECTION_BAD) + encode_error(&x, conn); + } else { + PQconsumeInput(conn); + if (PQisBusy(conn)) + return; + res = PQgetResult(conn); + encode_result(&x, res, conn); + PQclear(res); + for (;;) { + res = PQgetResult(conn); + if (res == NULL) + break; + PQclear(res); + } + } + if (x.index > 1) { + driver_output(data->port, x.buff, x.index); + if (data->connecting) + driver_select(data->port, (ErlDrvEvent)data->socket, DO_WRITE, 0); + } + ei_x_free(&x); +} + +/* utilities */ +static char* get_s(const char* buf, int len) +{ + char* result; + if (len < 1 || len > 1000) return NULL; + result = driver_alloc(len+1); + memcpy(result, buf, len); + result[len] = '\0'; + return result; +} |