/*
* 1999-2008
* Ericsson AB, All Rights Reserved
*
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Initial Developer of the Original Code is Ericsson AB.
*
*/
/*
* Purpose: Implementation of Secure Socket Layer (SSL).
*
* This is an "SSL proxy" for Erlang in the form of a port
* program.
*
* The implementation has borrowed somewhat from the original
* implementation of `socket' by Claes Wikström, and the former
* implementation of `ssl_socket' by Helen Ariyan.
*
* All I/O is now non-blocking.
*
* When a connection (cp) is in the state JOINED we have the following
* picture:
*
* proxy->fd fd
* | |
* proxy->eof | --------> wq -----------> | bp
* | |
* Erlang | | SSL
* | |
* proxy->bp | <------ proxy->wq --------- | eof
* | |
*
* We read from Erlang (proxy->fd) and write to SSL (fd); and read from
* SSL (fd) and write to Erlang (proxy->fd).
*
* The variables bp (broken pipe) and eof (end of file) take the
* values 0 and 1.
*
* What has been read and cannot be immediately written is put in a
* write queue (wq). A wq is emptied before reads are continued, which
* means that at most one chunk that is read can be in a wq.
*
* The proxy-to-ssl part of a cp is valid iff
*
* !bp && (wq.len > 0 || !proxy->eof).
*
* The ssl-to-proxy part of a cp is valid iff
*
* !proxy->bp && (proxy->wq.len > 0 || !eof).
*
* The connection is valid if any of the above parts are valid, i.e.
* invalid if both parts are invalid.
*
* Every SELECT_TIMEOUT second we try to write to those file
* descriptors that have non-empty wq's (the only way to detect that a
* far end has gone away is to write to it).
*
* STATE TRANSITIONS
*
* Below (*) means that the corresponding file descriptor is published
* (i.e. kwown outside this port program) when the state is entered,
* and thus cannot be closed without synchronization with the
* ssl_server.
*
* Listen:
*
* STATE_NONE ---> (*) PASSIVE_LISTENING <---> ACTIVE_LISTENING
*
* Accept:
*
* STATE_NONE ---> SSL_ACCEPT ---> (*) CONNECTED ---> JOINED --->
* ---> SSL_SHUTDOWN ---> DEFUNCT
*
* Connect:
*
* STATE_NONE ---> (*) WAIT_CONNECT ---> SSL_CONNECT ---> CONNECTED --->
* ---> JOINED ---> SSL_SHUTDOWN ---> DEFUNCT
*
* In states where file descriptors has been published, and where
* something goes wrong, the state of the connection is set to
* DEFUNCT. A connection in such a state can only be closed by a CLOSE
* message from Erlang (a reception of such a message is registered in
* cp->closed). The possible states are: WAIT_CONNECT, SSL_CONNECT,
* CONNECTED, JOINED, and SSL_SHUTDOWN.
*
* A connection in state SSL_ACCEPT can be closed and removed without
* synchronization.
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef __WIN32__
#include "esock_winsock.h"
#endif
#include
#include
#include
#include
#include
#include
#include
#include
#ifdef __WIN32__
#include
#else
#include
#include
#include
#include
#include
#include
#include
#include
#endif
#ifndef INADDR_NONE
#define INADDR_NONE 0xffffffff /* Should be in . */
#endif
#include "esock.h"
#include "debuglog.h"
#include "esock_utils.h"
#include "esock_ssl.h"
#include "esock_osio.h"
#include "esock_posix_str.h"
#include "esock_poll.h"
#define MAJOR_VERSION 2
#define MINOR_VERSION 0
#define MAXREPLYBUF 256
#define RWBUFLEN (32*1024)
#define IS_CLIENT 0
#define IS_SERVER 1
#define SELECT_TIMEOUT 2 /* seconds */
#define psx_errstr() esock_posix_str(sock_errno())
#define ssl_errstr() esock_ssl_errstr
#define PROXY_TO_SSL_VALID(cp) (!(cp)->bp && \
((cp)->wq.len > 0 || !(cp)->proxy->eof))
#define SSL_TO_PROXY_VALID(cp) (!(cp)->proxy->bp && \
((cp)->proxy->wq.len > 0 || !(cp)->eof))
#define JOINED_STATE_INVALID(cp) (!(PROXY_TO_SSL_VALID(cp)) && \
!(SSL_TO_PROXY_VALID(cp)))
static int loop(void);
static int set_poll_conns(Connection *cp, EsockPoll *ep, int verbose);
static Connection *next_polled_conn(Connection *cp, Connection **cpnext,
EsockPoll *ep, int set_wq_fds);
static void leave_joined_state(Connection *cp);
static void do_shutdown(Connection *cp);
static void close_and_remove_connection(Connection *cp);
static int reply(int cmd, char *fmt, ...);
static int input(char *fmt, ...);
static int put_pars(unsigned char *buf, char *fmt, va_list args);
static int get_pars(unsigned char *buf, char *fmt, va_list args);
static FD do_connect(char *lipstring, int lport, char *fipstring, int fport);
static FD do_listen(char *ipstring, int lport, int backlog, int *aport);
static FD do_accept(FD listensock, struct sockaddr *saddr, int *len);
static void print_connections(void);
static void dump_connections(void);
static int check_num_sock_fds(FD fd);
static void safe_close(FD fd);
static Connection *new_connection(int state, FD fd);
static Connection *get_connection(FD fd);
static void remove_connection(Connection *conn);
static Proxy *get_proxy_by_peerport(int port);
static Proxy *new_proxy(FD fd);
static void remove_proxy(Proxy *proxy);
static void ensure_write_queue(WriteQueue *wq, int size);
static void clean_up(void);
static Connection *connections = NULL;
static int num_sock_fds; /* On UNIX all file descriptors */
static Proxy *proxies = NULL;
static int proxy_listensock = INVALID_FD;
static int proxy_listenport = 0;
static int proxy_backlog = 128;
static int proxysock_last_err = 0;
static int proxysock_err_cnt = 0;
static char rwbuf[RWBUFLEN];
static unsigned char *ebuf = NULL; /* Set by read_ctrl() */
static char *connstr[] = {
"STATE_NONE",
"ACTIVE_LISTENING",
"PASSIVE_LISTENING",
"CONNECTED",
"WAIT_CONNECT",
"SSL_CONNECT",
"SSL_ACCEPT",
"TRANSPORT_ACCEPT",
"JOINED",
"SSL_SHUTDOWN",
"DEFUNCT"
};
static char *originstr[] = {
"listen",
"accept",
"connect"
};
int main(int argc, char **argv)
{
char *logfile = NULL;
int i;
esock_version *vsn;
char *ciphers;
#ifdef __WIN32__
int pid;
WORD version;
WSADATA wsa_data;
set_binary_mode();
setvbuf(stderr, NULL, _IONBF, 0);
/* Two sockets for the stdin socket pipe (local thread). */
num_sock_fds = 2;
#else
pid_t pid;
num_sock_fds = 3; /* 0, 1, 2 */
#endif
pid = getpid();
i = 1;
while (i < argc) {
if (strcmp(argv[i], "-d") == 0) {
debug = 1;
i++;
} else if (strcmp(argv[i], "-dm") == 0) {
debugmsg = 1;
i++;
} else if (strcmp(argv[i], "-pp") == 0) {
i++;
proxy_listenport = atoi(argv[i]);
i++;
} else if (strcmp(argv[i], "-pb") == 0) {
i++;
proxy_backlog = atoi(argv[i]);
i++;
} else if (strcmp(argv[i], "-pv") == 0) {
i++;
protocol_version = atoi(argv[i]);
i++;
} else if (strcmp(argv[i], "-dd") == 0) {
i++;
logfile = esock_malloc(strlen(argv[i]) + 64);
sprintf(logfile, "%s/ssl_esock.%d.log", argv[i], (int)pid);
i++;
} else if (strcmp(argv[i], "-ersa") == 0) {
ephemeral_rsa = 1;
i++;
} else if (strcmp(argv[i], "-edh") == 0) {
ephemeral_dh = 1;
i++;
}
}
if (debug || debugmsg) {
DEBUGF(("Starting ssl_esock\n"));
if (logfile) {
open_ssllog(logfile);
#ifndef __WIN32__
num_sock_fds++;
#endif
}
atexit(close_ssllog);
DEBUGF(("pid = %d\n", getpid()));
}
if (esock_ssl_init() < 0) {
fprintf(stderr, "esock: Could not do esock_ssl_init\n");
exit(EXIT_FAILURE);
}
atexit(esock_ssl_finish);
#ifdef __WIN32__
/* Start Windows' sockets */
version = MAKEWORD(MAJOR_VERSION, MINOR_VERSION);
if (WSAStartup(version, &wsa_data) != 0) {
fprintf(stderr, "esock: Could not start up Windows' sockets\n");
exit(EXIT_FAILURE);
}
atexit((void (*)(void))WSACleanup);
if (LOBYTE(wsa_data.wVersion) < MAJOR_VERSION ||
(LOBYTE(wsa_data.wVersion) == MAJOR_VERSION &&
HIBYTE(wsa_data.wVersion) < MINOR_VERSION)) {
fprintf(stderr, "esock: Windows socket version error. "
"Requested version:"
"%d.%d, version found: %d.%d\n", MAJOR_VERSION,
MINOR_VERSION, LOBYTE(wsa_data.wVersion),
HIBYTE(wsa_data.wVersion));
exit(EXIT_FAILURE);
}
DEBUGF(("Using Windows socket version: %d.%d\n",
LOBYTE(wsa_data.wVersion), HIBYTE(wsa_data.wVersion)));
DEBUGF(("Maximum number of sockets available: %d\n",
wsa_data.iMaxSockets));
if (esock_osio_init() < 0) {
fprintf(stderr, "esock: Could not init osio\n");
exit(EXIT_FAILURE);
}
atexit(esock_osio_finish);
#endif
/* Create the local proxy listen socket and set it to non-blocking */
proxy_listensock = do_listen("127.0.0.1", proxy_listenport,
proxy_backlog, &proxy_listenport);
if (proxy_listensock == INVALID_FD) {
fprintf(stderr, "esock: Cannot create local listen socket\n");
exit(EXIT_FAILURE);
}
SET_NONBLOCKING(proxy_listensock);
DEBUGF(("Local proxy listen socket: fd = %d, port = %d\n",
proxy_listensock, proxy_listenport));
vsn = esock_ssl_version();
ciphers = esock_ssl_ciphers();
/* Report: port number of the local proxy listen socket, the native
* os pid, the compile and lib versions of the ssl library, and
* the list of available ciphers. */
reply(ESOCK_PROXY_PORT_REP, "24sss", proxy_listenport, (int)pid,
vsn->compile_version, vsn->lib_version, ciphers);
atexit(clean_up);
loop();
if (logfile)
esock_free(logfile);
exit(EXIT_SUCCESS);
}
/*
* Local functions
*
*/
static int loop(void)
{
EsockPoll pollfd;
FD fd, msgsock, listensock, connectsock, proxysock;
int cc, wc, fport, lport, pport, length, backlog, intref, op;
int value;
char *lipstring, *fipstring;
char *flags;
char *protocol_vsn, *cipher;
unsigned char *cert, *bin;
int certlen, binlen;
struct sockaddr_in iserv_addr;
int sret = 1;
Connection *cp, *cpnext, *newcp;
Proxy *pp;
time_t last_time = 0, now = 0;
int set_wq_fds;
esock_poll_init(&pollfd);
while(1) {
esock_poll_zero(&pollfd);
esock_poll_fd_set_read(&pollfd, proxy_listensock);
esock_poll_fd_set_read(&pollfd, local_read_fd);
set_wq_fds = 0;
if (sret) /* sret == 1 the first time. */
DEBUGF(("==========LOOP=============\n"));
cc = set_poll_conns(connections, &pollfd, sret) + 1;
if (sret) {
print_connections();
DEBUGF(("Before poll/select: %d descriptor%s (total %d)\n",
cc, (cc == 1) ? "" : "s", num_sock_fds));
}
sret = esock_poll(&pollfd, SELECT_TIMEOUT);
if (sret < 0) {
DEBUGF(("select/poll error: %s\n", psx_errstr()));
continue;
}
time(&now);
if (now >= last_time + SELECT_TIMEOUT) {
set_wq_fds = 1;
last_time = now;
}
/*
* First accept as many connections as possible on the
* proxy listen socket. We record the peer port, which
* is later used as a reference for joining a proxy
* connection with a network connection.
*/
if (esock_poll_fd_isset_read(&pollfd, proxy_listensock)) {
while (1) {
length = sizeof(iserv_addr);
proxysock = do_accept(proxy_listensock,
(struct sockaddr *)&iserv_addr,
(int*)&length);
if(proxysock == INVALID_FD) {
if (sock_errno() != ERRNO_BLOCK) {
/* We can here for example get the error
* EMFILE, i.e. no more file descriptors
* available, but we do not have any specific
* connection to report the error to. We
* increment the error counter and saves the
* last err.
*/
proxysock_err_cnt++;
proxysock_last_err = sock_errno();
DEBUGF(("accept error (proxy_listensock): %s\n",
psx_errstr()));
}
break;
} else {
/* Get peer port number */
/* length = sizeof(iserv_addr); */
/* if (getpeername(proxysock, (struct sockaddr *)&iserv_addr, */
/* &length) < 0) { */
/* DEBUGF(("Can't get peername of proxy socket")); */
/* safe_close(proxysock); */
/* } else { */
/* Add to pending proxy connections */
SET_NONBLOCKING(proxysock);
pp = new_proxy(proxysock);
pp->peer_port = ntohs(iserv_addr.sin_port);
DEBUGF(("-----------------------------------\n"));
DEBUGF(("[PROXY_LISTEN_SOCK] conn accepted: "
"proxyfd = %d, "
"peer port = %d\n", proxysock, pp->peer_port));
/* } */
}
}
}
/*
* Read control messages from Erlang
*/
if (esock_poll_fd_isset_read(&pollfd, local_read_fd)) {
cc = read_ctrl(&ebuf);
if ( cc < 0 ) {
DEBUGF(("Read loop -1 or 0\n"));
return -1;
} else if (cc == 0) { /* not eof */
DEBUGF(("GOT empty string \n"));
} else {
switch((int)*ebuf) {
case ESOCK_SET_SEED_CMD:
/*
* ebuf = {cmd(1), binary(N) }
*/
input("b", &binlen, &bin);
DEBUGF(("[SET_SEED_CMD]\n"));
esock_ssl_seed(bin, binlen);
/* no reply */
break;
case ESOCK_GETPEERNAME_CMD:
/*
* ebuf = {cmd(1), fd(4)}
*/
input("4", &fd);
DEBUGF(("[GETPEERNAME_CMD] fd = %d\n", fd));
cp = get_connection(fd);
length = sizeof(iserv_addr);
if (!cp) {
sock_set_errno(ERRNO_NOTSOCK);
reply(ESOCK_GETPEERNAME_ERR, "4s", fd, psx_errstr());
} else if (getpeername(fd,
(struct sockaddr *) &iserv_addr,
&length) < 0) {
reply(ESOCK_GETPEERNAME_ERR, "4s", fd, psx_errstr());
} else {
/*
* reply = {cmd(1), fd(4), port(2),
* ipstring(N), 0(1)}
*/
reply(ESOCK_GETPEERNAME_REP, "42s", fd,
ntohs(iserv_addr.sin_port),
inet_ntoa(iserv_addr.sin_addr));
}
break;
case ESOCK_GETSOCKNAME_CMD:
/*
* ebuf = {cmd(1), fd(4)}
*/
input("4", &fd);
DEBUGF(("[GETSOCKNAME_CMD] fd = %d\n", fd));
cp = get_connection(fd);
length = sizeof(iserv_addr);
if (!cp) {
sock_set_errno(ERRNO_NOTSOCK);
reply(ESOCK_GETSOCKNAME_ERR, "4s", fd, psx_errstr());
} else if (getsockname(fd,
(struct sockaddr *)&iserv_addr,
&length) < 0) {
reply(ESOCK_GETSOCKNAME_ERR, "4s", fd, psx_errstr());
} else {
/*
* reply = {cmd(1), fd(4), port(2),
* ipstring(N), 0(1)}
*/
reply(ESOCK_GETSOCKNAME_REP, "42s", fd,
ntohs(iserv_addr.sin_port),
inet_ntoa(iserv_addr.sin_addr));
}
break;
case ESOCK_GETCONNINFO_CMD:
/*
* ebuf = {cmd(1), fd(4)}
*/
input("4", &fd);
DEBUGF(("[GETCONNINFO_CMD] fd = %d\n", fd));
cp = get_connection(fd);
if (!cp) {
sock_set_errno(ERRNO_NOTSOCK);
reply(ESOCK_GETCONNINFO_ERR, "4s", fd, psx_errstr());
} else {
if (esock_ssl_getprotocol_version(cp,
&protocol_vsn) < 0)
reply(ESOCK_GETCONNINFO_ERR, "4s", fd, psx_errstr());
else if (esock_ssl_getcipher(cp, &cipher) < 0)
reply(ESOCK_GETCONNINFO_ERR, "4s", fd, psx_errstr());
else
/*
* reply = {cmd(1), fd(4), protocol(N), 0(1),
* cipher(N), 0(1)}
*/
reply(ESOCK_GETCONNINFO_REP, "4ss", fd,
protocol_vsn, cipher);
}
break;
case ESOCK_GETPEERCERT_CMD:
/*
* ebuf = {cmd(1), fd(4)}
*/
input("4", &fd);
DEBUGF(("[GETPEERCERT_CMD] fd = %d\n", fd));
cp = get_connection(fd);
if (!cp) {
sock_set_errno(ERRNO_NOTSOCK);
reply(ESOCK_GETPEERCERT_ERR, "4s", fd, psx_errstr());
} else {
if ((certlen = esock_ssl_getpeercert(cp, &cert)) < 0)
reply(ESOCK_GETPEERCERT_ERR, "4s", fd, psx_errstr());
else {
/*
* reply = {cmd(1), fd(4), certlen(4), cert(N)}
*/
reply(ESOCK_GETPEERCERT_REP, "4b", fd,
certlen, cert);
esock_free(cert);
}
}
break;
case ESOCK_CONNECT_CMD:
/*
* ebuf = {cmd(1), intref(4),
* lport(2), lipstring(N), 0(1), -- local
* fport(2), fipstring(N), 0(1), -- foreign
* flags(N), 0(1)}
*/
input("42s2ss", &intref, &lport, &lipstring,
&fport, &fipstring, &flags);
DEBUGF(("[CONNECT_CMD] intref = %d, "
"lipstring = %s lport = %d, "
"fipstring = %s fport = %d, "
"flags = %s\n", intref, lipstring, lport,
fipstring, fport, flags));
connectsock = do_connect(lipstring, lport,
fipstring, fport);
if(connectsock == INVALID_FD) {
reply(ESOCK_CONNECT_SYNC_ERR, "4s", intref, psx_errstr());
break;
}
DEBUGF((" fd = %d\n", connectsock));
cp = new_connection(ESOCK_WAIT_CONNECT, connectsock);
cp->origin = ORIG_CONNECT;
length = strlen(flags);
cp->flags = esock_malloc(length + 1);
strcpy(cp->flags, flags);
DEBUGF(("-> WAIT_CONNECT fd = %d\n", connectsock));
/* Publish connectsock */
reply(ESOCK_CONNECT_WAIT_REP, "44", intref, connectsock);
break;
case ESOCK_TERMINATE_CMD:
/*
* ebuf = {cmd(1)}
*/
exit(EXIT_SUCCESS);
break;
case ESOCK_CLOSE_CMD:
/*
* ebuf = {cmd(1), fd(4)}
*/
input("4", &fd);
if ((cp = get_connection(fd))) {
DEBUGF(("%s[CLOSE_CMD]: fd = %d\n",
connstr[cp->state], fd));
if (cp->proxy)
cp->proxy->bp = 1;
switch (cp->state) {
case ESOCK_JOINED:
cp->close = 1;
if (JOINED_STATE_INVALID(cp))
leave_joined_state(cp);
break;
case ESOCK_SSL_SHUTDOWN:
cp->close = 1;
DEBUGF((" close flag set\n"));
break;
default:
DEBUGF(("-> (removal)\n"));
close_and_remove_connection(cp);
}
} else
DEBUGF(("[CLOSE_CMD]: ERROR: fd = %d not found\n", fd));
break;
case ESOCK_SET_SOCKOPT_CMD:
/*
* ebuf = {cmd(1), fd(4), op(1), on(1)}
*/
input("411", &fd, &op, &value);
switch(op) {
case ESOCK_SET_TCP_NODELAY:
if(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
(void *)&value, sizeof(value)) < 0) {
DEBUGF(("Error: setsockopt TCP_NODELAY\n"));
reply(ESOCK_IOCTL_ERR, "4s", fd, psx_errstr());
} else {
reply(ESOCK_IOCTL_OK, "4", fd);
}
break;
default:
DEBUGF(("Error: set_sock_opt - Not implemented\n"));
sock_set_errno(ERRNO_OPNOTSUPP);
reply(ESOCK_IOCTL_ERR, "4", fd, psx_errstr());
break;
}
break;
case ESOCK_LISTEN_CMD:
/*
* ebuf = {cmd(1), intref(4), lport(2), ipstring(N), 0(1),
* backlog(2), flags(N), 0(1)}
*/
input("42s2s", &intref, &lport, &lipstring, &backlog,
&flags);
DEBUGF(("[LISTEN_CMD] intref = %d, port = %d, "
"ipstring = %s, backlog = %d, flags = %s\n",
intref, lport, lipstring, backlog, flags));
listensock = do_listen(lipstring, lport, backlog, &lport);
if(listensock == INVALID_FD) {
reply(ESOCK_LISTEN_SYNC_ERR, "4s", intref, psx_errstr());
break;
}
cp = new_connection(ESOCK_PASSIVE_LISTENING, listensock);
/* Flags may be an empty string */
length = strlen(flags);
cp->flags = esock_malloc(length + 1);
strcpy(cp->flags, flags);
cp->origin = ORIG_LISTEN;
if (esock_ssl_listen_init(cp) < 0) {
DEBUGF(("esock_ssl_listen_init() failed.\n"));
reply(ESOCK_LISTEN_SYNC_ERR, "4s", intref,
ssl_errstr());
close_and_remove_connection(cp);
break;
}
DEBUGF(("-> PASSIVE_LISTENING (fd = %d)\n", listensock));
/* Publish listensock */
reply(ESOCK_LISTEN_REP, "442", intref, listensock,
ntohs(iserv_addr.sin_port));
break;
case ESOCK_TRANSPORT_ACCEPT_CMD:
/*
* ebuf = { op(1), fd(4), flags(N), 0(1)}
*/
input("4s", &fd, &flags);
DEBUGF(("[TRANSPORT_ACCEPT_CMD] listenfd = %d, flags = %s\n", fd,
flags));
cp = get_connection(fd);
if (cp) {
/* We store the flags in the listen socket's
* connection, and overwrite previous flags.
*/
if ((length = strlen(flags)) > 0) {
if (cp->flags)
cp->flags = esock_realloc(cp->flags,
length + 1);
else
cp->flags = esock_malloc(length + 1);
strcpy(cp->flags, flags);
}
if (cp->flags && cp->flags[0] != '\0') {
cp->acceptors++;
cp->state = ESOCK_ACTIVE_LISTENING;
DEBUGF(("-> ACTIVE_LISTENING\n"));
break;
}
DEBUGF(("ERROR: flags empty\n"));
}
reply(ESOCK_TRANSPORT_ACCEPT_ERR, "4s", fd, "ebadf");
break;
case ESOCK_SSL_ACCEPT_CMD:
input("4s", &fd, &flags);
DEBUGF(("[SSL_ACCEPT_CMD] fd = %d, flags = %s\n", fd, flags));
cp = get_connection(fd);
if (cp)
cp->state = ESOCK_SSL_ACCEPT;
//reply(ESOCK_SSL_ACCEPT_REP, "4", fd);
break;
case ESOCK_NOACCEPT_CMD:
/*
* ebuf = {cmd(1), fd(4)}
*/
input("4", &fd);
DEBUGF(("[NOACCEPT_CMD] listenfd = %d\n", fd));
cp = get_connection(fd);
if (cp && (--cp->acceptors <= 0)) {
cp->acceptors = 0;
cp->state = ESOCK_PASSIVE_LISTENING;
esock_poll_clear_event(&pollfd, fd);
DEBUGF(("-> PASSIVE_LISTENING\n"));
}
break;
case ESOCK_PROXY_JOIN_CMD:
/*
* ebuf = {cmd(1), fd(4), portnum(2)}
*
* fd - file descriptor of a connection in state
* CONNECTED
* portnum - port number of the Erlang proxy peer
*/
input("42", &fd, &pport);
cp = get_connection(fd);
pp = get_proxy_by_peerport(pport);
if (cp && cp->state == ESOCK_CONNECTED && pp) {
DEBUGF(("CONNECTED[PROXY_JOIN_CMD] fd = %d "
"portnum = %d\n", fd, pport));
cp->proxy = pp;
pp->conn = cp;
reply(ESOCK_PROXY_JOIN_REP, "4", fd);
cp->state = ESOCK_JOINED;
DEBUGF(("-> JOINED\n"));
break;
}
if (!cp) {
DEBUGF(("[PROXY_JOIN_CMD] ERROR: No connection "
"having fd = %d\n", fd));
reply(ESOCK_PROXY_JOIN_ERR, "4s", fd, "ebadsocket");
} else if (cp->state != ESOCK_CONNECTED) {
DEBUGF(("%s[PROXY_JOIN_CMD] ERROR: Bad state: "
"fd = %d\n", connstr[cp->state], cp->fd));
reply(ESOCK_PROXY_JOIN_ERR, "4s", fd, "ebadstate");
} else {
DEBUGF(("ERROR: No proxy: fd = %d, pport = %d\n",
fd, pport));
if (proxysock_err_cnt > 0) {
proxysock_err_cnt--;
reply(ESOCK_PROXY_JOIN_ERR, "4s", fd,
esock_posix_str(proxysock_last_err));
} else {
reply(ESOCK_PROXY_JOIN_ERR, "4s", fd,
"enoproxysocket");
}
cp->state = ESOCK_DEFUNCT;
}
break;
case ESOCK_DUMP_STATE_CMD:
dump_connections();
break;
case ESOCK_SET_DEBUG_CMD:
/*
* ebuf = {cmd(1), debug(1)}
*/
input("1", &debug);
break;
case ESOCK_SET_DEBUGMSG_CMD:
/*
* ebuf = {cmd(1), debugmsg(1)}
*/
input("1", &debugmsg);
break;
default:
fprintf(stderr, "esock: default value in loop %c\n",
*ebuf);
exit(EXIT_FAILURE);
break;
}
}
}
/* Go through all connections that have their file descriptors
set. */
/* Note: We may remove the current connection (cp). Thus we
* must be careful not to read cp->next after cp has been
* removed. */
for (cp = next_polled_conn(connections, &cpnext, &pollfd, set_wq_fds);
cp != NULL;
cp = next_polled_conn(cpnext, &cpnext, &pollfd, set_wq_fds)
) {
switch(cp->state) {
case ESOCK_PASSIVE_LISTENING:
DEBUGF(("-----------------------------------\n"));
fprintf(stderr, "esock: Got connect request while PASSIVE\n");
exit(EXIT_FAILURE);
break;
case ESOCK_ACTIVE_LISTENING:
/* new connect from network */
DEBUGF(("-----------------------------------\n"));
DEBUGF(("ACTIVE_LISTENING - trying to accept on %d\n",
cp->fd));
length = sizeof(iserv_addr);
msgsock = do_accept(cp->fd, (struct sockaddr*)&iserv_addr,
(int*)&length);
if(msgsock == INVALID_FD) {
DEBUGF(("accept error: %s\n", psx_errstr()));
reply(ESOCK_TRANSPORT_ACCEPT_ERR, "4s", cp->fd, psx_errstr());
break;
}
SET_NONBLOCKING(msgsock);
if (--cp->acceptors <= 0) {
cp->acceptors = 0;
cp->state = ESOCK_PASSIVE_LISTENING;
DEBUGF(("-> PASSIVE_LISTENING\n"));
}
DEBUGF(("server accepted connection on fd %d\n", msgsock));
newcp = new_connection(ESOCK_TRANSPORT_ACCEPT, msgsock);
newcp->origin = ORIG_ACCEPT;
reply(ESOCK_TRANSPORT_ACCEPT_REP, "44", cp->fd, msgsock);
newcp->listen_fd = cp->fd; /* Needed for ESOCK_ACCEPT_ERR */
length = strlen(cp->flags);
/* XXX new flags are not needed */
newcp->flags = esock_malloc(length + 1);
strcpy(newcp->flags, cp->flags); /* XXX Why? */
if (esock_ssl_accept_init(newcp, cp->opaque) < 0) {
cp->errstr = ssl_errstr();
break;
}
newcp->ssl_want = ESOCK_SSL_WANT_READ;
break;
case ESOCK_SSL_ACCEPT:
/* SSL accept handshake. msgsock is *not* published yet. */
msgsock = cp->fd;
DEBUGF(("-----------------------------------\n"));
DEBUGF(("SSL_ACCEPT fd = %d\n", msgsock));
if (cp->errstr != NULL) { /* this means we got an error in ssl_accept_init */
/* N.B.: The *listen fd* is reported. */
reply(ESOCK_SSL_ACCEPT_ERR, "4s", msgsock, cp->errstr);
close_and_remove_connection(cp);
break;
}
if (esock_ssl_accept(cp) < 0) {
if (sock_errno() != ERRNO_BLOCK) {
/* Handshake failed. */
reply(ESOCK_SSL_ACCEPT_ERR, "4s", msgsock,
ssl_errstr());
DEBUGF(("ERROR: handshake: %s\n", ssl_errstr()));
close_and_remove_connection(cp);
}
} else {
/* SSL handshake successful: publish */
reply(ESOCK_SSL_ACCEPT_REP, "4", msgsock);
DEBUGF(("-> CONNECTED\n"));
DEBUGF((" Session was %sreused.\n",
(esock_ssl_session_reused(cp)) ? "" : "NOT "));
cp->state = ESOCK_CONNECTED;
}
break;
case ESOCK_CONNECTED:
/* Should not happen. We do not read or write until
the connection is in state JOINED. */
DEBUGF(("-----------------------------------\n"));
DEBUGF(("CONNECTED: Error: should not happen. fd = %d\n",
cp->fd));
break;
case ESOCK_JOINED:
/*
* Reading from Proxy, writing to SSL
*/
if (esock_poll_fd_isset_write(&pollfd, cp->fd)) {
/* If there is a write queue, write to ssl only */
if (cp->wq.len > 0) {
/* The write retry semantics of SSL_write in
* the OpenSSL package is strange. Partial
* writes never occur, only complete writes or
* failures. A failure, however, still
* consumes all data written, although not all
* encrypted data could be written to the
* underlying socket. To retry a write we have
* to provide the same buf and length as in
* the original call, in our case rwbuf and
* the original buffer length. Hence the
* strange memcpy(). Note that wq.offset will
* always be zero when we use OpenSSL.
*/
DEBUGF(("-----------------------------------\n"));
DEBUGF(("JOINED: writing to ssl "
"fd = %d, from write queue only, wc = %d\n",
cp->fd, cp->wq.len - cp->wq.offset));
memcpy(rwbuf, cp->wq.buf, cp->wq.len - cp->wq.offset);
/* esock_ssl_write sets cp->eof, cp->bp when return
* value is zero */
wc = esock_ssl_write(cp, rwbuf,
cp->wq.len - cp->wq.offset);
if (wc < 0) {
if (sock_errno() != ERRNO_BLOCK) {
/* Assume broken SSL pipe */
DEBUGF(("broken SSL pipe\n"));
cp->bp = 1;
shutdown(cp->proxy->fd, SHUTDOWN_READ);
cp->proxy->eof = 1;
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
}
} else if (wc == 0) {
/* SSL broken pipe */
DEBUGF(("broken SSL pipe\n"));
cp->bp = 1;
shutdown(cp->proxy->fd, SHUTDOWN_READ);
cp->proxy->eof = 1;
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
} else {
cp->wq.offset += wc;
if (cp->wq.offset == cp->wq.len)
cp->wq.len = 0;
}
}
} else if (esock_poll_fd_isset_read(&pollfd, cp->proxy->fd)) {
/* Read from proxy and write to SSL */
DEBUGF(("-----------------------------------\n"));
DEBUGF(("JOINED: reading from proxy, "
"proxyfd = %d\n", cp->proxy->fd));
cc = sock_read(cp->proxy->fd, rwbuf, RWBUFLEN);
DEBUGF(("read from proxyfd = %d, cc = %d\n",
cp->proxy->fd, cc));
if (cc > 0) {
/* esock_ssl_write sets cp->eof, cp->bp when return
* value is zero */
wc = esock_ssl_write(cp, rwbuf, cc);
if (wc < 0) {
if (sock_errno() != ERRNO_BLOCK) {
/* Assume broken pipe */
DEBUGF(("broken SSL pipe\n"));
cp->bp = 1;
shutdown(cp->proxy->fd, SHUTDOWN_READ);
cp->proxy->eof = 1;
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
} else {
/* add to write queue */
DEBUGF(("adding all to write queue "
"%d bytes\n", cc));
ensure_write_queue(&cp->wq, cc);
memcpy(cp->wq.buf, rwbuf, cc);
cp->wq.len = cc;
cp->wq.offset = 0;
}
} else if (wc == 0) {
/* Broken SSL pipe */
DEBUGF(("broken SSL pipe\n"));
cp->bp = 1;
shutdown(cp->proxy->fd, SHUTDOWN_READ);
cp->proxy->eof = 1;
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
} else if (wc < cc) {
/* add remainder to write queue */
DEBUGF(("adding remainder to write queue "
"%d bytes\n", cc - wc));
ensure_write_queue(&cp->wq, cc - wc);
memcpy(cp->wq.buf, rwbuf + wc, cc - wc);
cp->wq.len = cc - wc;
cp->wq.offset = 0;
}
} else {
/* EOF proxy or error */
DEBUGF(("proxy eof or error %d\n", errno));
cp->proxy->eof = 1;
if (cp->wq.len == 0) {
esock_ssl_shutdown(cp);
cp->bp = 1;
}
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
}
}
/*
* Reading from SSL, writing to proxy
*/
if (esock_poll_fd_isset_write(&pollfd, cp->proxy->fd)) {
/* If there is a write queue, write to proxy only */
if (cp->proxy->wq.len > 0) {
DEBUGF(("-----------------------------------\n"));
DEBUGF(("JOINED: writing to proxyfd = %d, "
"from write queue only, wc = %d\n",
cp->proxy->fd, cp->proxy->wq.len -
cp->proxy->wq.offset));
wc = sock_write(cp->proxy->fd, cp->proxy->wq.buf +
cp->proxy->wq.offset,
cp->proxy->wq.len -
cp->proxy->wq.offset);
if (wc < 0) {
if (sock_errno() != ERRNO_BLOCK) {
/* Assume broken pipe */
DEBUGF(("broken proxy pipe\n"));
cp->proxy->bp = 1;
/* There is no SSL shutdown for read */
cp->eof = 1;
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
}
} else {
cp->proxy->wq.offset += wc;
if (cp->proxy->wq.offset == cp->proxy->wq.len)
cp->proxy->wq.len = 0;
}
}
} else if (esock_poll_fd_isset_read(&pollfd, cp->fd)) {
/* Read from SSL and write to proxy */
DEBUGF(("-----------------------------------\n"));
DEBUGF(("JOINED: read from ssl fd = %d\n",
cp->fd));
cc = esock_ssl_read(cp, rwbuf, RWBUFLEN);
DEBUGF(("read from fd = %d, cc = %d\n", cp->fd, cc));
if (cc > 0) {
wc = sock_write(cp->proxy->fd, rwbuf, cc);
if (wc < 0) {
if (sock_errno() != ERRNO_BLOCK) {
DEBUGF(("broken proxy pipe\n"));
/* Assume broken pipe */
cp->proxy->bp = 1;
/* There is no SSL shutdown for read */
cp->eof = 1;
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
} else {
/* add all to write queue */
DEBUGF(("adding to write queue %d bytes\n",
cc));
ensure_write_queue(&cp->proxy->wq, cc);
memcpy(cp->proxy->wq.buf, rwbuf, cc);
cp->proxy->wq.len = cc;
cp->proxy->wq.offset = 0;
}
} else if (wc < cc) {
/* add to write queue */
DEBUGF(("adding to write queue %d bytes\n",
cc - wc));
ensure_write_queue(&cp->proxy->wq, cc - wc);
memcpy(cp->proxy->wq.buf, rwbuf + wc, cc - wc);
cp->proxy->wq.len = cc - wc;
cp->proxy->wq.offset = 0;
}
} else if (cc == 0) {
/* SSL eof */
DEBUGF(("SSL eof\n"));
cp->eof = 1;
if (cp->proxy->wq.len == 0) {
shutdown(cp->proxy->fd, SHUTDOWN_WRITE);
cp->proxy->bp = 1;
}
if (JOINED_STATE_INVALID(cp)) {
leave_joined_state(cp);
break;
}
} else {
/* This may very well happen when reading from SSL. */
DEBUGF(("NOTE: readmask set, cc < 0, fd = %d, "
"is ok\n", cp->fd));
}
}
break;
case ESOCK_SSL_SHUTDOWN:
DEBUGF(("-----------------------------------\n"));
DEBUGF(("SSL_SHUTDOWN: fd = %d\n", cp->fd));
do_shutdown(cp);
break;
case ESOCK_DEFUNCT:
DEBUGF(("-----------------------------------\n"));
DEBUGF(("DEFUNCT: ERROR: should not happen. fd = %d\n",
cp->fd));
break;
case ESOCK_WAIT_CONNECT:
/* New connection shows up */
connectsock = cp->fd;/* Is published */
DEBUGF(("-----------------------------------\n"));
DEBUGF(("WAIT_CONNECT fd = %d\n", connectsock));
/* If the connection did succeed it's possible to
* fetch the peer name (UNIX); or failure shows in
* exceptmask (WIN32). Sorry for the mess below, but
* we have to have balanced paren's in #ifdefs in
* order not to confuse Emacs' indentation. */
length = sizeof(iserv_addr);
if (
#ifdef __WIN32__
esock_poll_fd_isset_exception(&pollfd, connectsock)
#else
getpeername(connectsock, (struct sockaddr *)&iserv_addr,
&length) < 0
#endif
) {
sock_set_errno(ERRNO_CONNREFUSED);
DEBUGF(("connect error: %s\n", psx_errstr()));
reply(ESOCK_CONNECT_ERR, "4s", connectsock, psx_errstr());
cp->state = ESOCK_DEFUNCT;
break;
}
if (esock_ssl_connect_init(cp) < 0) {
DEBUGF(("esock_ssl_connect_init() failed\n"));
reply(ESOCK_CONNECT_ERR, "4s", connectsock, ssl_errstr());
cp->state = ESOCK_DEFUNCT;
break;
}
DEBUGF(("-> SSL_CONNECT\n"));
cp->state = ESOCK_SSL_CONNECT;
cp->ssl_want = ESOCK_SSL_WANT_WRITE;
break;
case ESOCK_SSL_CONNECT:
/* SSL connect handshake. connectsock is published. */
connectsock = cp->fd;
DEBUGF(("-----------------------------------\n"));
DEBUGF(("SSL_CONNECT fd = %d\n", connectsock));
if (esock_ssl_connect(cp) < 0) {
if (sock_errno() != ERRNO_BLOCK) {
/* Handshake failed */
DEBUGF(("ERROR: handshake: %s\n", ssl_errstr()));
reply(ESOCK_CONNECT_ERR, "4s", connectsock,
ssl_errstr());
cp->state = ESOCK_DEFUNCT;
}
} else {
/* SSL connect handshake successful */
DEBUGF(("-> CONNECTED\n"));
reply(ESOCK_CONNECT_REP, "4", connectsock);
cp->state = ESOCK_CONNECTED;
}
break;
default:
DEBUGF(("ERROR: Connection in unknown state.\n"));
}
}
}
}
static int set_poll_conns(Connection *cp, EsockPoll *ep, int verbose)
{
int i = 0;
if (verbose)
DEBUGF(("MASKS SET FOR FD: "));
while (cp) {
switch (cp->state) {
case ESOCK_ACTIVE_LISTENING:
if (verbose)
DEBUGF(("%d (read) ", cp->fd));
esock_poll_fd_set_read(ep, cp->fd);
break;
case ESOCK_WAIT_CONNECT:
if (verbose)
DEBUGF(("%d (write) ", cp->fd));
esock_poll_fd_set_write(ep, cp->fd);
#ifdef __WIN32__
esock_poll_fd_set_exception(ep, cp->fd); /* Failure shows in exceptions */
#endif
break;
case ESOCK_SSL_CONNECT:
case ESOCK_SSL_ACCEPT:
if (cp->ssl_want == ESOCK_SSL_WANT_READ) {
if (verbose)
DEBUGF(("%d (read) ", cp->fd));
esock_poll_fd_set_read(ep, cp->fd);
} else if (cp->ssl_want == ESOCK_SSL_WANT_WRITE) {
if (verbose)
DEBUGF(("%d (write) ", cp->fd));
esock_poll_fd_set_write(ep, cp->fd);
}
break;
case ESOCK_JOINED:
if (!cp->bp) {
if (cp->wq.len) {
if (verbose)
DEBUGF(("%d (write) ", cp->fd));
esock_poll_fd_set_write(ep, cp->fd);
} else if (!cp->proxy->eof) {
if (verbose)
DEBUGF(("%d (read) ", cp->proxy->fd));
esock_poll_fd_set_read(ep, cp->proxy->fd);
}
}
if (!cp->proxy->bp) {
if (cp->proxy->wq.len) {
if (verbose)
DEBUGF(("%d (write) ", cp->proxy->fd));
esock_poll_fd_set_write(ep, cp->proxy->fd);
} else if (!cp->eof) {
if (verbose)
DEBUGF(("%d (read) ", cp->fd));
esock_poll_fd_set_read(ep, cp->fd);
}
}
break;
case ESOCK_SSL_SHUTDOWN:
if (cp->ssl_want == ESOCK_SSL_WANT_READ) {
if (verbose)
DEBUGF(("%d (read) ", cp->fd));
esock_poll_fd_set_read(ep, cp->fd);
} else if (cp->ssl_want == ESOCK_SSL_WANT_WRITE) {
if (verbose)
DEBUGF(("%d (write) ", cp->fd));
esock_poll_fd_set_write(ep, cp->fd);
}
break;
default:
break;
}
i++;
cp = cp->next;
}
if (verbose)
DEBUGF(("\n"));
return i;
}
static Connection *next_polled_conn(Connection *cp, Connection **cpnext,
EsockPoll *ep, int set_wq_fds)
{
while(cp) {
if (esock_poll_fd_isset_read(ep, cp->fd) ||
(cp->proxy && esock_poll_fd_isset_read(ep, cp->proxy->fd)) ||
(esock_poll_fd_isset_write(ep, cp->fd)) ||
(cp->proxy && esock_poll_fd_isset_write(ep, cp->proxy->fd))
#ifdef __WIN32__
|| esock_poll_fd_isset_exception(ep, cp->fd) /* Connect failure in WIN32 */
#endif
|| (set_wq_fds && (cp->wq.len ||
(cp->proxy && cp->proxy->wq.len)))
|| cp->errstr != NULL) {
*cpnext = cp->next;
return cp;
}
cp = cp->next;
}
*cpnext = NULL;
return NULL;
}
static void leave_joined_state(Connection *cp)
{
shutdown(cp->proxy->fd, SHUTDOWN_ALL);
if (((cp->bp || cp->eof) && cp->clean) ||
(!cp->bp && !cp->eof)) {
DEBUGF(("-> SSL_SHUTDOWN\n"));
cp->state = ESOCK_SSL_SHUTDOWN;
cp->ssl_want = ESOCK_SSL_WANT_WRITE;
do_shutdown(cp);
} else if (cp->close) {
DEBUGF(("-> (removal)\n"));
close_and_remove_connection(cp);
} else {
DEBUGF(("-> DEFUNCT\n"));
cp->state = ESOCK_DEFUNCT;
}
}
/* We are always in state SHUTDOWN here */
static void do_shutdown(Connection *cp)
{
int ret;
ret = esock_ssl_shutdown(cp);
if (ret < 0) {
if (sock_errno() == ERRNO_BLOCK) {
return;
} else {
/* Something is wrong -- close and remove or move to DEFUNCT */
DEBUGF(("Error in SSL shutdown\n"));
if (cp->close) {
DEBUGF(("-> (removal)\n"));
close_and_remove_connection(cp);
} else {
DEBUGF(("-> DEFUNCT\n"));
cp->state = ESOCK_DEFUNCT;
}
}
} else if (ret == 0) {
/* `close_notify' has been sent. Wait for reception of
same. */
return;
} else if (ret == 1) {
/* `close_notify' has been sent, and received. */
if (cp->close) {
DEBUGF(("-> (removal)\n"));
close_and_remove_connection(cp);
} else {
DEBUGF(("-> DEFUNCT\n"));
cp->state = ESOCK_DEFUNCT;
}
}
}
static void close_and_remove_connection(Connection *cp)
{
safe_close(cp->fd);
remove_connection(cp);
}
static int reply(int cmd, char *fmt, ...)
{
static unsigned char replybuf[MAXREPLYBUF];
unsigned char *buf = replybuf;
va_list args;
int len;
va_start(args, fmt);
len = put_pars(NULL, fmt, args);
va_end(args);
len++;
if (len > sizeof(replybuf))
buf = esock_malloc(len);
PUT_INT8(cmd, buf);
va_start(args, fmt);
(void) put_pars(buf + 1, fmt, args);
va_end(args);
write_ctrl(buf, len);
if (buf != replybuf)
esock_free(buf);
return len;
}
static int input(char *fmt, ...)
{
va_list args;
int len;
va_start(args, fmt);
len = get_pars(ebuf + 1, fmt, args);
va_end(args);
return len + 1;
}
static int put_pars(unsigned char *buf, char *fmt, va_list args)
{
char *s, *str, *bin;
int val, len, pos = 0;
s = fmt;
while (*s) {
switch (*s) {
case '1':
val = va_arg(args, int);
if (buf)
PUT_INT8(val, buf + pos);
pos++;
break;
case '2':
val = va_arg(args, int);
if (buf)
PUT_INT16(val, buf + pos);
pos += 2;
break;
case '4':
val = va_arg(args, int);
if (buf)
PUT_INT32(val, buf + pos);
pos += 4;
break;
case 's': /* string */
str = va_arg(args, char *);
if (buf)
strcpy((char *)(buf + pos), str);
pos += strlen(str) + 1;
break;
case 'b': /* binary */
len = va_arg(args, int);
if (buf)
PUT_INT32(len, buf + pos);
pos += 4;
bin = va_arg(args, char *);
if (buf)
memcpy(buf + pos, bin, len);
pos += len;
break;
default:
fprintf(stderr, "esock: Invalid format character: %c\n", *s);
exit(EXIT_FAILURE);
break;
}
s++;
}
return pos;
}
static int get_pars(unsigned char *buf, char *fmt, va_list args)
{
int *ip;
char *s, **strp, **bin;
int pos = 0;
s = fmt;
while (*s) {
switch (*s) {
case '1':
ip = va_arg(args, int *);
*ip = GET_INT8(buf + pos);
pos++;
break;
case '2':
ip = va_arg(args, int *);
*ip = GET_INT16(buf + pos);
pos += 2;
break;
case '4':
ip = va_arg(args, int *);
*ip = GET_INT32(buf + pos);
pos += 4;
break;
case 's':
strp = va_arg(args, char **);
*strp = (char *)(buf + pos);
pos += strlen(*strp) + 1;
break;
case 'b':
ip = va_arg(args, int *);
*ip = GET_INT32(buf + pos);
pos += 4;
bin = va_arg(args, char **);
*bin = (char *)(buf + pos);
pos += *ip;
break;
default:
fprintf(stderr, "esock: Invalid format character: %c\n", *s);
exit(EXIT_FAILURE);
break;
}
s++;
}
return pos;
}
static FD do_connect(char *lipstring, int lport, char *fipstring, int fport)
{
struct sockaddr_in sock_addr;
long inaddr;
FD fd;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_FD) {
DEBUGF(("Error calling socket()\n"));
return fd;
}
if (check_num_sock_fds(fd) < 0)
return INVALID_FD;
DEBUGF((" fd = %d\n", fd));
/* local */
if ((inaddr = inet_addr(lipstring)) == INADDR_NONE) {
DEBUGF(("Error in inet_addr(): lipstring = %s\n", lipstring));
safe_close(fd);
sock_set_errno(ERRNO_ADDRNOTAVAIL);
return INVALID_FD;
}
memset(&sock_addr, 0, sizeof(sock_addr));
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = inaddr;
sock_addr.sin_port = htons(lport);
if(bind(fd, (struct sockaddr*) &sock_addr, sizeof(sock_addr)) < 0) {
DEBUGF(("Error in bind()\n"));
safe_close(fd);
/* XXX Set error code for bind error */
return INVALID_FD;
}
/* foreign */
if ((inaddr = inet_addr(fipstring)) == INADDR_NONE) {
DEBUGF(("Error in inet_addr(): fipstring = %s\n", fipstring));
safe_close(fd);
sock_set_errno(ERRNO_ADDRNOTAVAIL);
return INVALID_FD;
}
memset(&sock_addr, 0, sizeof(sock_addr));
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = inaddr;
sock_addr.sin_port = htons(fport);
SET_NONBLOCKING(fd);
if(connect(fd, (struct sockaddr*)&sock_addr, sizeof(sock_addr)) < 0) {
if (sock_errno() != ERRNO_PROGRESS && /* UNIX */
sock_errno() != ERRNO_BLOCK) { /* WIN32 */
DEBUGF(("Error in connect()\n"));
safe_close(fd);
return INVALID_FD;
}
}
return fd;
}
static FD do_listen(char *ipstring, int lport, int backlog, int *aport)
{
static int one = 1; /* Type must be int, not long */
struct sockaddr_in sock_addr;
long inaddr;
int length;
FD fd;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_FD) {
DEBUGF(("Error calling socket()\n"));
return fd;
}
if (check_num_sock_fds(fd) < 0)
return INVALID_FD;
DEBUGF((" fd = %d\n", fd));
if ((inaddr = inet_addr(ipstring)) == INADDR_NONE) {
DEBUGF(("Error in inet_addr(): ipstring = %s\n", ipstring));
safe_close(fd);
sock_set_errno(ERRNO_ADDRNOTAVAIL);
return INVALID_FD;
}
memset(&sock_addr, 0, sizeof(sock_addr));
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = inaddr;
sock_addr.sin_port = htons(lport);
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&one, sizeof(one));
if(bind(fd, (struct sockaddr*) &sock_addr, sizeof(sock_addr)) < 0) {
DEBUGF(("Error in bind()\n"));
safe_close(fd);
return INVALID_FD;
}
if (listen(fd, backlog) < 0) {
DEBUGF(("Error in listen()\n"));
safe_close(fd);
return INVALID_FD;
}
/* find out assigned local port number */
length = sizeof(sock_addr);
if (getsockname(fd, (struct sockaddr *)&sock_addr, &length) < 0) {
DEBUGF(("Error in getsockname()\n"));
safe_close(fd);
return INVALID_FD;
}
if (aport)
*aport = ntohs(sock_addr.sin_port);
return fd;
}
static FD do_accept(FD listensock, struct sockaddr *saddr, int *len)
{
FD fd;
if ((fd = accept(listensock, saddr, len)) == INVALID_FD) {
DEBUGF(("Error calling accept()\n"));
return fd;
}
if (check_num_sock_fds(fd) < 0)
return INVALID_FD;
return fd;
}
static Connection *new_connection(int state, FD fd)
{
Connection *cp;
if (!(cp = esock_malloc(sizeof(Connection))))
return NULL;
cp->state = state;
cp->acceptors = 0;
cp->fd = fd;
cp->listen_fd = INVALID_FD;
cp->proxy = NULL;
cp->opaque = NULL;
cp->ssl_want = 0;
cp->eof = 0;
cp->bp = 0;
cp->clean = 0; /* XXX Used? */
cp->close = 0;
cp->origin = -1;
cp->flags = NULL;
cp->logfp = NULL;
cp->wq.size = 0;
cp->wq.buf = NULL;
cp->wq.len = 0;
cp->wq.offset = 0;
cp->next = connections;
cp->errstr = NULL;
connections = cp;
return cp;
}
static void print_connections(void)
{
if (debug) {
Connection *cp = connections;
DEBUGF(("CONNECTIONS:\n"));
while (cp) {
if (cp->state == ESOCK_JOINED) {
DEBUGF((" - %s [%8p] (origin = %s)\n"
" (fd = %d, eof = %d, wq = %d, bp = %d)\n"
" (proxyfd = %d, eof = %d, wq = %d, bp = %d)\n",
connstr[cp->state], cp, originstr[cp->origin],
cp->fd, cp->eof, cp->wq.len, cp->bp,
cp->proxy->fd, cp->proxy->eof, cp->proxy->wq.len,
cp->proxy->bp));
} else if (cp->state == ESOCK_ACTIVE_LISTENING) {
DEBUGF((" - %s [%8p] (fd = %d, acceptors = %d)\n",
connstr[cp->state], cp, cp->fd, cp->acceptors));
} else {
DEBUGF((" - %s [%8p] (fd = %d)\n", connstr[cp->state], cp,
cp->fd));
}
cp= cp->next;
}
}
}
static void dump_connections(void)
{
Connection *cp = connections;
Proxy *pp = proxies;
time_t t = time(NULL);
int length = 0;
struct sockaddr_in iserv_addr;
__debugprintf("CONNECTIONS %s", ctime(&t));
while (cp) {
if (cp->state == ESOCK_JOINED) {
__debugprintf(" - %s [%8p] (origin = %s)\n"
" (fd = %d, eof = %d, wq = %d, bp = %d), close = %d\n"
" (proxyfd = %d, eof = %d, wq = %d, bp = %d)\n",
connstr[cp->state], cp, originstr[cp->origin],
cp->fd, cp->eof, cp->wq.len, cp->bp, cp->close,
cp->proxy->fd, cp->proxy->eof, cp->proxy->wq.len,
cp->proxy->bp);
} else if (cp->state == ESOCK_ACTIVE_LISTENING) {
__debugprintf(" - %s [%8p] (fd = %d, acceptors = %d)\n",
connstr[cp->state], cp, cp->fd, cp->acceptors);
} else {
__debugprintf(" - %s [%8p] (fd = %d)\n", connstr[cp->state], cp,
cp->fd);
}
length = sizeof(iserv_addr);
if ((cp->state == ESOCK_ACTIVE_LISTENING) ||
(cp->state == ESOCK_PASSIVE_LISTENING)) {
getsockname(cp->fd, (struct sockaddr *) &iserv_addr, &length);
__debugprintf(" (ip = %s, port = %d)\n",
inet_ntoa(iserv_addr.sin_addr),
ntohs(iserv_addr.sin_port));
}
else {
getsockname(cp->fd, (struct sockaddr *) &iserv_addr, &length);
__debugprintf(" (local_ip = %s, local_port = %d)\n",
inet_ntoa(iserv_addr.sin_addr),
ntohs(iserv_addr.sin_port));
length = sizeof(iserv_addr);
getpeername(cp->fd, (struct sockaddr *) &iserv_addr, &length);
__debugprintf(" (remote_ip = %s, remote_port = %d)\n",
inet_ntoa(iserv_addr.sin_addr),
ntohs(iserv_addr.sin_port));
}
cp=cp->next;
}
__debugprintf("PROXIES\n");
while (pp) {
__debugprintf(" - fd = %d [%8p] (external_fd = %d, peer_port = %d,"
" eof = %d)\n", pp->fd, pp, pp->conn->fd, pp->peer_port,
pp->eof);
pp= pp->next;
}
}
static Connection *get_connection(FD fd)
{
Connection *cp = connections;
while(cp) {
if(cp->fd == fd)
return cp;
cp = cp->next;
}
return NULL;
}
/*
* Remove a connection from the list of connection, close the proxy
* socket and free all resources. The main socket (fd) is *not*
* closed here, because the closing of that socket has to be synchronized
* with the Erlang process controlling this port program.
*/
static void remove_connection(Connection *conn)
{
Connection **prev = &connections;
Connection *cp = connections;
while (cp) {
if(cp == conn) {
DEBUGF(("remove_connection: fd = %d\n", cp->fd));
esock_ssl_free(cp); /* frees cp->opaque only */
esock_free(cp->flags);
closelog(cp->logfp); /* XXX num_sock_fds */
esock_free(cp->wq.buf);
if (cp->proxy) {
safe_close(cp->proxy->fd);
remove_proxy(cp->proxy);
}
*prev = cp->next;
esock_free(cp);
return;
}
prev = &cp->next;
cp = cp->next;
}
}
static Proxy *get_proxy_by_peerport(int port)
{
Proxy *p = proxies;
while(p) {
if (p->peer_port == port)
return p;
p = p->next;
}
return NULL;
}
static Proxy *new_proxy(FD fd)
{
Proxy *p;
if (!(p = esock_malloc(sizeof(Proxy))))
return NULL;
p->fd = fd;
p->peer_port = -1;
p->eof = 0;
p->bp = 0;
p->conn = NULL;
p->wq.size = 0;
p->wq.buf = NULL;
p->wq.len = 0;
p->wq.offset = 0;
p->next = proxies;
proxies = p;
return p;
}
static void remove_proxy(Proxy *proxy)
{
Proxy *p = proxies, **pp = &proxies;
while(p) {
if (p == proxy) {
DEBUGF(("remove_proxyfd = %d\n", p->fd));
esock_free(p->wq.buf);
*pp = p->next;
esock_free(p);
return;
}
pp = &p->next;
p = p->next;
}
}
static int check_num_sock_fds(FD fd)
{
num_sock_fds++; /* fd is valid */
#ifdef USE_SELECT
if (num_sock_fds > FD_SETSIZE) {
num_sock_fds--;
sock_set_errno(ERRNO_MFILE);
safe_close(fd);
return -1;
}
#endif
return 0;
}
static void safe_close(FD fd)
{
int err;
err = sock_errno();
DEBUGF(("safe_close fd = %d\n", fd));
if (sock_close(fd) < 0) {
DEBUGF(("safe_close failed\n"));
} else {
num_sock_fds--;
}
sock_set_errno(err);
}
static void clean_up(void)
{
Connection *cp, *cpnext;
Proxy *pp, *ppnext;
cp = connections;
while (cp) {
safe_close(cp->fd);
cpnext = cp->next;
remove_connection(cp);
cp = cpnext;
}
pp = proxies;
while (pp) {
safe_close(pp->fd);
ppnext = pp->next;
remove_proxy(pp);
pp = ppnext;
}
}
static void ensure_write_queue(WriteQueue *wq, int size)
{
if (wq->size < size) {
wq->buf = esock_realloc(wq->buf, size);
wq->size = size;
}
}