Update for new NsFindVersion def.
/*
* The contents of this file are subject to the AOLserver Public License
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://aolserver.com/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is AOLserver Code and related documentation
* distributed by AOL.
*
* The Initial Developer of the Original Code is America Online,
* Inc. Portions created by AOL are Copyright (C) 1999 America Online,
* Inc. All Rights Reserved.
*
* Alternatively, the contents of this file may be used under the terms
* of the GNU General Public License (the "GPL"), in which case the
* provisions of GPL are applicable instead of those above. If you wish
* to allow use of your version of this file only under the terms of the
* GPL and not to allow others to use your version of this file under the
* License, indicate your decision by deleting the provisions above and
* replace them with the notice and other provisions required by the GPL.
* If you do not delete the provisions above, a recipient may use your
* version of this file under either the License or the GPL.
*/
/*
* driver.c --
*
* Connection I/O for loadable socket drivers.
*
*/
static const char *RCSID = "@(#) $Header: /cvsroot/aolserver/aolserver/nsd/driver.c,v 1.46 2005/05/07 23:35:46 jgdavidson Exp $, compiled: " __DATE__ " " __TIME__;
#include "nsd.h"
/*
* The following are valid Sock states.
*/
enum {
SOCK_READWAIT, /* Waiting for request or content from client. */
SOCK_PREQUE, /* Ready to invoke pre-queue filters. */
SOCK_QUEWAIT, /* Running pre-queue filters and event callbacks. */
SOCK_RUNWAIT, /* Ready to run when allowed by connection limits. */
SOCK_RUNNING, /* Running in a connection queue. */
SOCK_CLOSEWAIT, /* Graceful close wait draining remaining bytes .*/
SOCK_DROPPED, /* Client dropped connection. */
SOCK_TIMEOUT, /* Request timeout waiting to be queued. */
SOCK_OVERFLOW, /* Request was denied by connection limits. */
SOCK_ERROR /* Sock read error or invalid request. */
};
char *states[] = {
"readwait",
"preque",
"quewait",
"runwait",
"running",
"closewait",
"dropped",
"timeout",
"overflow",
"error"
};
/*
* The following are error codes for all possible connection faults.
*/
typedef enum {
E_NOERROR = 0,
E_RECV,
E_FDAGAIN,
E_FDWRITE,
E_FDSEEK,
E_NOHOST,
E_NOSERV,
E_HINVAL,
E_RINVAL,
E_NINVAL,
E_LRANGE,
E_RRANGE,
E_CRANGE,
} DrvErr;
/*
* The following are valid driver state flags.
*/
#define DRIVER_STARTED 1
#define DRIVER_STOPPED 2
#define DRIVER_SHUTDOWN 4
#define DRIVER_FAILED 8
#define DRIVER_QUERY 16
/*
* The following structure manages polling. The PollIn macro is
* used for the common case of checking for readability.
*/
typedef struct PollData {
int nfds; /* Number of fd's being monitored. */
int maxfds; /* Max fd's (will grow as needed). */
struct pollfd *pfds; /* Dynamic array of Poll struct's. */
Ns_Time *timeoutPtr; /* Min timeout, if any, for next spin. */
} PollData;
#define PollIn(ppd,i) ((ppd)->pfds[(i)].revents & POLLIN)
/*
* The following structure defines a Host header to server mappings.
*/
typedef struct ServerMap {
NsServer *servPtr;
char location[8]; /* Location starting with http://. */
} ServerMap;
/*
* The following structure defines a pre-queue event wait callback.
*/
typedef struct QueWait {
struct QueWait *nextPtr;
SOCKET sock;
short events;
int pidx;
Ns_Time timeout;
Ns_QueueWaitProc *proc;
void *arg;
} QueWait;
/*
* Static functions defined in this file.
*/
static Ns_ThreadProc DriverThread;
static Ns_ThreadProc ReaderThread;
static void TriggerDriver(Driver *drvPtr);
static Sock *SockAccept(SOCKET lsock, Driver *drvPtr);
static void SockClose(Sock *sockPtr);
static void SockRead(Sock *sockPtr);
static int SockReadLine(Driver *drvPtr, Ns_Sock *sock, Conn *connPtr);
static int SockReadContent(Driver *drvPtr, Ns_Sock *sock, Conn *connPtr);
static int Poll(PollData *pdataPtr, SOCKET sock, int events, Ns_Time *timeoutPtr);
static Conn *AllocConn(Driver *drvPtr, Ns_Time *nowPtr, Sock *sockPtr);
static void FreeConn(Conn *connPtr);
static int RunQueWaits(PollData *pdataPtr, Ns_Time *nowPtr, Sock *sockPtr);
static void ThreadName(Driver *drvPtr, char *name);
static void SockWait(Sock *sockPtr, Ns_Time *nowPtr, int timeout,
Sock **listPtrPtr);
static void AppendConn(Driver *drvPtr, Conn *connPtr);
#define SockPush(s, sp) ((s)->nextPtr = *(sp), *(sp) = (s))
static void LogError(Sock *sockPtr, DrvErr err);
/*
* Static variables defined in this file.
*/
static Driver *firstDrvPtr; /* First in list of all drivers. */
static Conn *firstConnPtr; /* Conn free list. */
static Ns_Mutex connlock; /* Lock around Conn free list. */
static Tcl_HashTable hosts; /* Host header to server table. */
static ServerMap *defMapPtr; /* Default server when not found in table. */
/*
*----------------------------------------------------------------------
*
* NsInitDrivers --
*
* Init communication drivers data structures.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
void
NsInitDrivers(void)
{
Ns_MutexSetName(&connlock, "ns:conns");
Tcl_InitHashTable(&hosts, TCL_STRING_KEYS);
}
/*
*----------------------------------------------------------------------
*
* Ns_DriverInit --
*
* Initialize a driver.
*
* Results:
* NS_OK if initialized, NS_ERROR if config or other error.
*
* Side effects:
* Listen socket will be opened later in NsStartDrivers.
*
*----------------------------------------------------------------------
*/
int
Ns_DriverInit(char *server, char *module, Ns_DriverInitData *init)
{
char *path, *address, *host, *bindaddr, *defproto, *defserver;
int i, n, socktimeout, defport;
ServerMap *mapPtr;
Tcl_HashEntry *hPtr;
Ns_DString ds;
Ns_Set *set;
struct in_addr ia;
struct hostent *he;
Driver *drvPtr;
Sock *sockPtr;
NsServer *servPtr = NULL;
if (init->version != NS_DRIVER_VERSION_1) {
Ns_Log(Error, "%s: version field of init argument is invalid: %d",
module, init->version);
return NS_ERROR;
}
path = (init->path ? init->path : Ns_ConfigGetPath(server, module, NULL));
if (server != NULL && (servPtr = NsGetServer(server)) == NULL) {
Ns_Log(Error, "%s: no such server: %s", module, server);
return NS_ERROR;
}
defserver = Ns_ConfigGetValue(path, "defaultserver");
if (server == NULL && defserver == NULL) {
Ns_Log(Error, "%s: no defaultserver defined: %s", module, path);
return NS_ERROR;
}
/*
* Determine the hostname used for the local address to bind
* to and/or the HTTP location string.
*/
host = Ns_ConfigGetValue(path, "hostname");
bindaddr = address = Ns_ConfigGetValue(path, "address");
/*
* If the listen address was not specified, attempt to determine it
* through a DNS lookup of the specified hostname or the server's
* primary hostname.
*/
if (address == NULL) {
he = gethostbyname(host ? host : Ns_InfoHostname());
/*
* If the lookup suceeded but the resulting hostname does not
* appear to be fully qualified, attempt a reverse lookup on the
* address which often returns the fully qualified name.
*
* NB: This is a common but sloppy configuration for a Unix
* network.
*/
if (he != NULL && he->h_name != NULL &&
strchr(he->h_name, '.') == NULL) {
he = gethostbyaddr(he->h_addr, he->h_length, he->h_addrtype);
}
/*
* If the lookup suceeded, use the first address in host entry list.
*/
if (he == NULL || he->h_name == NULL) {
Ns_Log(Error, "%s: could not resolve %s: %s", module,
host ? host : Ns_InfoHostname(),
ns_sockstrerror(ns_sockerrno));
return NS_ERROR;
}
if (*(he->h_addr_list) == NULL) {
Ns_Log(Error, "%s: no addresses for %s", module, he->h_name);
return NS_ERROR;
}
memcpy(&ia.s_addr, *(he->h_addr_list), sizeof(ia.s_addr));
address = ns_inet_ntoa(ia);
/*
* Finally, if no hostname was specified, set it to the hostname
* derived from the lookup(s) above.
*/
if (host == NULL) {
host = he->h_name;
}
}
/*
* If the hostname was not specified and not determined by the lookup
* above, set it to the specified or derived IP address string.
*/
if (host == NULL) {
host = address;
}
/*
* Set the protocol and port defaults.
*/
if (init->opts & NS_DRIVER_SSL) {
defproto = "https";
defport = 443;
} else {
defproto = "http";
defport = 80;
}
/*
* Allocate a new driver instance and set configurable parameters.
*/
Ns_DStringInit(&ds);
drvPtr = ns_calloc(1, sizeof(Driver));
drvPtr->flags = DRIVER_STOPPED;
Ns_MutexSetName2(&drvPtr->lock, "ns:drv", module);
if (ns_sockpair(drvPtr->trigger) != 0) {
Ns_Fatal("ns_sockpair() failed: %s", ns_sockstrerror(ns_sockerrno));
}
Ns_DStringVarAppend(&ds, server, "/", module, NULL);
drvPtr->fullname = Ns_DStringExport(&ds);
drvPtr->server = server;
drvPtr->module = module;
drvPtr->name = init->name;
drvPtr->proc = init->proc;
drvPtr->arg = init->arg;
drvPtr->opts = init->opts;
drvPtr->servPtr = servPtr;
if (!Ns_ConfigGetInt(path, "bufsize", &n) || n < 1) {
n = 16000; /* ~16k */
}
drvPtr->bufsize = _MAX(n, 1024);
if (!Ns_ConfigGetInt(path, "rcvbuf", &n)) {
n = 0; /* Use OS default. */
}
drvPtr->rcvbuf = _MAX(n, 0);
if (!Ns_ConfigGetInt(path, "sndbuf", &n)) {
n = 0; /* Use OS default. */
}
drvPtr->sndbuf = _MAX(n, 0);
if (!Ns_ConfigGetInt(path, "socktimeout", &n) || n < 1) {
n = 30; /* 30 seconds. */
}
socktimeout = n;
if (!Ns_ConfigGetInt(path, "sendwait", &n) || n < 1) {
n = socktimeout; /* Use previous socktimeout option. */
}
drvPtr->sendwait = _MAX(n, 1);
if (!Ns_ConfigGetInt(path, "recvwait", &n) || n < 1) {
n = socktimeout; /* Use previous socktimeout option. */
}
drvPtr->recvwait = _MAX(n, 1);
if (!Ns_ConfigGetInt(path, "backlog", &n) || n < 1) {
n = 5; /* 5 pending connections. */
}
drvPtr->backlog = _MAX(n, 1);
if (!Ns_ConfigGetInt(path, "maxsock", &n) || n < 1) {
n = 100; /* 100 total open sockets. */
}
drvPtr->maxsock = _MAX(n, 1);
if (!Ns_ConfigGetInt(path, "maxline", &n) || n < 1) {
n = 4 * 1024; /* 4k per-line limit. */
}
drvPtr->maxline = _MAX(n, 256);
if (!Ns_ConfigGetInt(path, "maxheader", &n) || n < 1) {
n = 32 * 1024; /* 32k total header limit. */
}
drvPtr->maxheader = _MAX(n, 1024);
if (!Ns_ConfigGetInt(path, "maxinput", &n) || n < 1) {
n = 1000 * 1024;/* 1m in-memory limit including request & headers. */
}
drvPtr->maxinput = _MAX(n, 2024);
if (!Ns_ConfigGetInt(path, "closewait", &n) || n < 0) {
n = 2; /* 2 second wait for graceful client close. */
}
drvPtr->closewait = _MAX(n, 0); /* NB: 0 for no graceful close. */
if (!Ns_ConfigGetInt(path, "keepwait", &n) || n < 0) {
n = 30; /* 30 seconds wait for more data in keep-alive.*/
}
drvPtr->keepwait = _MAX(n, 0); /* NB: 0 for no keepalive. */
if (!Ns_ConfigGetInt(path, "maxreaders", &n) || n < 1) {
n = 10; /* Max of 10 threads for non-event driven I/O. */
}
n = _MAX(n, 1); /* Minimum of 1 reader thread. */
drvPtr->maxreaders = n;
drvPtr->readers = ns_calloc((size_t) n, sizeof(Ns_Thread));
/*
* Pre-allocate Sock structures.
*/
drvPtr->freeSockPtr = NULL;
sockPtr = ns_malloc(sizeof(Sock) * drvPtr->maxsock);
for (n = 0; n < drvPtr->maxsock; ++n) {
sockPtr->nextPtr = drvPtr->freeSockPtr;
drvPtr->freeSockPtr = sockPtr;
++sockPtr;
}
/*
* Determine the port and then set the HTTP location string either
* as specified in the config file or constructed from the
* hostname and port.
*/
drvPtr->bindaddr = bindaddr;
drvPtr->address = ns_strdup(address);
if (!Ns_ConfigGetInt(path, "port", &drvPtr->port)) {
drvPtr->port = defport;
}
drvPtr->location = Ns_ConfigGetValue(path, "location");
if (drvPtr->location != NULL) {
drvPtr->location = ns_strdup(drvPtr->location);
} else {
Ns_DStringVarAppend(&ds, defproto, "://", host, NULL);
if (drvPtr->port != defport) {
Ns_DStringPrintf(&ds, ":%d", drvPtr->port);
}
drvPtr->location = Ns_DStringExport(&ds);
}
drvPtr->nextPtr = firstDrvPtr;
firstDrvPtr = drvPtr;
/*
* Map Host headers for drivers not bound to servers.
*/
if (server == NULL) {
defMapPtr = NULL;
path = Ns_ConfigGetPath(NULL, module, "servers", NULL);
set = Ns_ConfigGetSection(path);
for (i = 0; set != NULL && i < Ns_SetSize(set); ++i) {
server = Ns_SetKey(set, i);
host = Ns_SetValue(set, i);
servPtr = NsGetServer(server);
if (servPtr == NULL) {
Ns_Log(Error, "%s: no such server: %s", module, server);
return NS_ERROR;
}
hPtr = Tcl_CreateHashEntry(&hosts, host, &n);
if (!n) {
Ns_Log(Error, "%s: duplicate host map: %s", module, host);
return NS_ERROR;
}
Ns_DStringVarAppend(&ds, defproto, "://", host, NULL);
mapPtr = ns_malloc(sizeof(ServerMap) + ds.length);
mapPtr->servPtr = servPtr;
strcpy(mapPtr->location, ds.string);
Ns_DStringTrunc(&ds, 0);
if (defMapPtr == NULL && STREQ(defserver, server)) {
defMapPtr = mapPtr;
}
Tcl_SetHashValue(hPtr, mapPtr);
}
if (defMapPtr == NULL) {
Ns_Fatal("%s: default server %s not defined in %s",
module, path);
}
}
Ns_DStringFree(&ds);
return NS_OK;
}
/*
*----------------------------------------------------------------------
*
* Ns_RegisterDriver --
*
* Register a set of communications driver procs (no longer
* supported).
*
* Results:
* NULL.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
void *
Ns_RegisterDriver(char *server, char *label, void *procs, void *drvData)
{
Ns_Log(Error, "driver: loadable drivers no longer supported");
return NULL;
}
/*
*----------------------------------------------------------------------
*
* Ns_GetDriverContext --
*
* Return the driver's context (no longer supported)
*
* Results:
* NULL.
*
* Side effects:
* None
*
*----------------------------------------------------------------------
*/
void *
Ns_GetDriverContext(Ns_Driver drv)
{
return NULL;
}
/*
*----------------------------------------------------------------------
*
* Ns_QueueWait --
*
* Arrange for connection to wait for requested I/O on given
* socket.
*
* Results:
* None.
*
* Side effects:
* Proc will be called with given arg and sock as arguements
* when the requested I/O condition is met or the given timeout
* has expired. The connection will not be queued until all
* such callbacks are processed.
*
*----------------------------------------------------------------------
*/
void
Ns_QueueWait(Ns_Conn *conn, SOCKET sock, Ns_QueueWaitProc *proc,
void *arg, int when, Ns_Time *timePtr)
{
Conn *connPtr = (Conn *) conn;
QueWait *queWaitPtr;
queWaitPtr = ns_malloc(sizeof(QueWait));
queWaitPtr->proc = proc;
queWaitPtr->arg = arg;
queWaitPtr->sock = sock;
queWaitPtr->events = 0;
if (when & NS_SOCK_READ) {
queWaitPtr->events |= POLLIN;
}
if (when & NS_SOCK_WRITE) {
queWaitPtr->events |= POLLOUT;
}
queWaitPtr->nextPtr = connPtr->queWaitPtr;
connPtr->queWaitPtr = queWaitPtr;
queWaitPtr->timeout = *timePtr;
}
/*
*----------------------------------------------------------------------
*
* NsStartDrivers --
*
* Start all driver threads.
*
* Results:
* NS_OK if all drivers started, NS_ERROR otherwise.
*
* Side effects:
* See DriverThread.
*
*----------------------------------------------------------------------
*/
int
NsStartDrivers(void)
{
Driver *drvPtr;
int status = NS_OK;
/*
* Signal and wait for each driver to start.
*/
drvPtr = firstDrvPtr;
while (drvPtr != NULL) {
Ns_Log(Notice, "driver: starting: %s", drvPtr->module);
Ns_ThreadCreate(DriverThread, drvPtr, 0, &drvPtr->thread);
Ns_MutexLock(&drvPtr->lock);
while (!(drvPtr->flags & DRIVER_STARTED)) {
Ns_CondWait(&drvPtr->cond, &drvPtr->lock);
}
if ((drvPtr->flags & DRIVER_FAILED)) {
status = NS_ERROR;
}
Ns_MutexUnlock(&drvPtr->lock);
drvPtr = drvPtr->nextPtr;
}
return status;
}
/*
*----------------------------------------------------------------------
*
* NsStopDrivers --
*
* Trigger driver threads to begin shutdown.
*
* Results:
* None.
*
* Side effects:
* Driver threads will close listen sockets and exit after all
* outstanding connections are complete and closed.
*
*----------------------------------------------------------------------
*/
void
NsStopDrivers(void)
{
Driver *drvPtr = firstDrvPtr;
while (drvPtr != NULL) {
Ns_MutexLock(&drvPtr->lock);
Ns_Log(Notice, "driver: stopping: %s", drvPtr->module);
drvPtr->flags |= DRIVER_SHUTDOWN;
Ns_CondBroadcast(&drvPtr->cond);
Ns_MutexUnlock(&drvPtr->lock);
TriggerDriver(drvPtr);
drvPtr = drvPtr->nextPtr;
}
}
/*
*----------------------------------------------------------------------
*
* NsWaitDriversShutdown --
*
* Wait for exit of all driver threads.
*
* Results:
* None.
*
* Side effects:
* Driver threads are joined.
*
*----------------------------------------------------------------------
*/
void
NsWaitDriversShutdown(Ns_Time *toPtr)
{
Driver *drvPtr = firstDrvPtr;
int status = NS_OK;
while (drvPtr != NULL) {
Ns_MutexLock(&drvPtr->lock);
while (!(drvPtr->flags & DRIVER_STOPPED) && status == NS_OK) {
status = Ns_CondTimedWait(&drvPtr->cond, &drvPtr->lock, toPtr);
}
Ns_MutexUnlock(&drvPtr->lock);
if (status != NS_OK) {
Ns_Log(Warning, "driver: shutdown timeout: %s", drvPtr->module);
} else {
Ns_Log(Notice, "driver: stopped: %s", drvPtr->module);
Ns_ThreadJoin(&drvPtr->thread, NULL);
drvPtr->thread = NULL;
}
drvPtr = drvPtr->nextPtr;
}
}
/*
*----------------------------------------------------------------------
*
* NsTclDriverObjCmd --
*
* Implements ns_driver command.
*
* Results:
* Standard Tcl result..
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
int
NsTclDriverObjCmd(ClientData dummy, Tcl_Interp *interp, int objc, Tcl_Obj **objv)
{
Tcl_DString ds;
Driver *drvPtr;
char *fullname;
static CONST char *opts[] = {
"list", "query", NULL
};
enum {
DListIdx, DQueryIdx
} opt;
if (objc < 2) {
Tcl_WrongNumArgs(interp, 1, objv, "option ?args?");
return TCL_ERROR;
}
if (Tcl_GetIndexFromObj(interp, objv[1], opts, "option", 0,
(int *) &opt) != TCL_OK) {
return TCL_ERROR;
}
switch (opt) {
case DListIdx:
drvPtr = firstDrvPtr;
while (drvPtr != NULL) {
Tcl_AppendElement(interp, drvPtr->fullname);
drvPtr = drvPtr->nextPtr;
}
break;
case DQueryIdx:
if (objc != 3) {
Tcl_WrongNumArgs(interp, 2, objv, "driver");
return TCL_ERROR;
}
fullname = Tcl_GetString(objv[2]);
drvPtr = firstDrvPtr;
while (drvPtr != NULL) {
if (STREQ(fullname, drvPtr->fullname)) {
break;
}
drvPtr = drvPtr->nextPtr;
}
if (drvPtr == NULL) {
Tcl_AppendResult(interp, "no such driver: ", fullname, NULL);
return TCL_ERROR;
}
Tcl_DStringInit(&ds);
Ns_MutexLock(&drvPtr->lock);
while (drvPtr->flags & DRIVER_QUERY) {
Ns_CondWait(&drvPtr->cond, &drvPtr->lock);
}
drvPtr->queryPtr = &ds;
drvPtr->flags |= DRIVER_QUERY;
TriggerDriver(drvPtr);
while (drvPtr->flags & DRIVER_QUERY) {
Ns_CondWait(&drvPtr->cond, &drvPtr->lock);
}
Ns_MutexUnlock(&drvPtr->lock);
Tcl_DStringResult(interp, &ds);
break;
}
return TCL_OK;
}
/*
*----------------------------------------------------------------------
*
* NsSockSend --
*
* Send buffers via the socket's driver callback.
*
* Results:
* # of bytes sent or -1 on error.
*
* Side effects:
* Depends on driver proc.
*
*----------------------------------------------------------------------
*/
int
NsSockSend(Sock *sockPtr, struct iovec *bufs, int nbufs)
{
Ns_Sock *sock = (Ns_Sock *) sockPtr;
++sockPtr->nwrites;
return (*sockPtr->drvPtr->proc)(DriverSend, sock, bufs, nbufs);
}
/*
*----------------------------------------------------------------------
*
* NsSockClose --
*
* Return a Sock to its DriverThread for closing or keepalive.
* Note the connection may continue to run after releasing the
* Sock (traces, etc.).
*
* Results:
* None.
*
* Side effects:
* Socket may be reused by a keepalive connection.
*
*----------------------------------------------------------------------
*/
void
NsSockClose(Sock *sockPtr, int keep)
{
Driver *drvPtr = sockPtr->drvPtr;
Ns_Sock *sock = (Ns_Sock *) sockPtr;
/*
* If keepalive is requested and enable, set the read wait
* state. Otherwise, set close wait which simply drains any
* remaining bytes to read.
*/
if (keep && drvPtr->keepwait > 0
&& (*drvPtr->proc)(DriverKeep, sock, NULL, 0) == 0) {
sockPtr->state = SOCK_READWAIT;
} else {
sockPtr->state = SOCK_CLOSEWAIT;
}
Ns_MutexLock(&drvPtr->lock);
sockPtr->nextPtr = drvPtr->closeSockPtr;
drvPtr->closeSockPtr = sockPtr;
Ns_MutexUnlock(&drvPtr->lock);
TriggerDriver(drvPtr);
}
/*
*----------------------------------------------------------------------
*
* NsFreeConn --
*
* Return a Conn structure to the free list after processing.
*
* Results:
* None.
*
* Side effects:
* Will close Sock structure if still open.
*
*----------------------------------------------------------------------
*/
void
NsFreeConn(Conn *connPtr)
{
Driver *drvPtr = connPtr->drvPtr;
/*
* Return the Conn to the driver.
*/
Ns_MutexLock(&drvPtr->lock);
connPtr->nextPtr = drvPtr->freeConnPtr;
drvPtr->freeConnPtr = connPtr;
Ns_MutexUnlock(&drvPtr->lock);
TriggerDriver(drvPtr);
}
/*
*----------------------------------------------------------------------
*
* DriverThread --
*
* Main communication driver thread. A driver thread is created
* for each loaded module. The thread continuously loops
* handling I/O events, accepting new connections, and queuing
* connections for reader and execution threads. The driver will
* also processes any pre-queue callbacks and resulting I/O events,
* if any.
*
* Results:
* None.
*
* Side effects:
* Connections are accepted and processed.
*
*----------------------------------------------------------------------
*/
static void
DriverThread(void *arg)
{
SOCKET lsock;
Driver *drvPtr = (Driver *) arg;
int n, flags, stop, lidx, tidx;
Sock *sockPtr, *closePtr, *nextPtr;
QueWait *queWaitPtr;
Conn *connPtr, *nextConnPtr, *freeConnPtr;
PollData pdata;
Limits *limitsPtr;
char drain[1024];
Ns_Time now;
Sock *waitPtr = NULL; /* Sock's waiting for I/O events. */
Sock *readSockPtr = NULL; /* Sock's to send to reader threads. */
Sock *preqSockPtr = NULL; /* Sock's ready for pre-queue callbacks. */
Sock *queSockPtr = NULL; /* Sock's ready to queue. */
ThreadName(drvPtr, "driver");
/*
* Create the listen socket.
*/
flags = DRIVER_STARTED;
lsock = Ns_SockListenEx(drvPtr->bindaddr, drvPtr->port, drvPtr->backlog);
if (lsock != INVALID_SOCKET) {
Ns_Log(Notice, "%s: listening on %s:%d", drvPtr->name,
drvPtr->address, drvPtr->port);
Ns_SockSetNonBlocking(lsock);
} else {
Ns_Log(Error, "%s: failed to listen on %s:%d: %s", drvPtr->name,
drvPtr->address, drvPtr->port, ns_sockstrerror(ns_sockerrno));
flags |= (DRIVER_FAILED | DRIVER_SHUTDOWN);
}
/*
* Update and signal state of driver.
*/
Ns_MutexLock(&drvPtr->lock);
drvPtr->flags |= flags;
Ns_CondBroadcast(&drvPtr->cond);
Ns_MutexUnlock(&drvPtr->lock);
/*
* Loop forever until signalled to shutdown and all
* connections are complete and gracefully closed.
*/
pdata.nfds = pdata.maxfds = 0;
pdata.pfds = NULL;
pdata.timeoutPtr = NULL;
stop = (flags & DRIVER_SHUTDOWN);
while (!stop || drvPtr->nactive) {
/*
* Poll the trigger pipe and, if a Sock structure is available,
* the listen socket.
*/
pdata.nfds = 0;
pdata.timeoutPtr = NULL;
tidx = Poll(&pdata, drvPtr->trigger[0], POLLIN, NULL);
if (!stop && drvPtr->freeSockPtr != NULL) {
lidx = Poll(&pdata, lsock, POLLIN, NULL);
} else {
lidx = -1;
}
/*
* Poll waiting sockets, determining the minimum relative timeout.
*/
sockPtr = waitPtr;
while (sockPtr != NULL) {
if (sockPtr->state != SOCK_QUEWAIT) {
sockPtr->pidx = Poll(&pdata, sockPtr->sock, POLLIN,
&sockPtr->timeout);
} else {
/* NB: No client timeout with active queue wait events. */
sockPtr->pidx = Poll(&pdata, sockPtr->sock, POLLIN, NULL);
queWaitPtr = sockPtr->connPtr->queWaitPtr;
while (queWaitPtr != NULL) {
queWaitPtr->pidx = Poll(&pdata, queWaitPtr->sock,
queWaitPtr->events,
&queWaitPtr->timeout);
queWaitPtr = queWaitPtr->nextPtr;
}
}
sockPtr = sockPtr->nextPtr;
}
/*
* Poll, drain the trigger pipe if necessary, and get current time.
*/
++drvPtr->stats.spins;
n = NsPoll(pdata.pfds, pdata.nfds, pdata.timeoutPtr);
if (PollIn(&pdata, tidx)
&& recv(drvPtr->trigger[0], drain, sizeof(drain), 0) <= 0) {
Ns_Fatal("driver: trigger recv() failed: %s",
ns_sockstrerror(ns_sockerrno));
}
Ns_GetTime(&now);
/*
* Get current flags, free conns, closing socks, and socks returning
* from the reader threads.
*/
Ns_MutexLock(&drvPtr->lock);
flags = drvPtr->flags;
freeConnPtr = drvPtr->freeConnPtr;
drvPtr->freeConnPtr = NULL;
closePtr = drvPtr->closeSockPtr;
drvPtr->closeSockPtr = NULL;
while ((sockPtr = drvPtr->runSockPtr) != NULL) {
drvPtr->runSockPtr = sockPtr->nextPtr;
SockPush(sockPtr, &preqSockPtr);
}
Ns_MutexUnlock(&drvPtr->lock);
/*
* Free connections done executing.
*/
while ((connPtr = freeConnPtr) != NULL) {
freeConnPtr = connPtr->nextPtr;
limitsPtr = connPtr->limitsPtr;
if (limitsPtr != NULL) {
Ns_MutexLock(&limitsPtr->lock);
--limitsPtr->nrunning;
Ns_MutexUnlock(&limitsPtr->lock);
}
connPtr->times.done = now;
/*
* Add the Sock to the gracefull close list if still open.
*/
if ((sockPtr = connPtr->sockPtr) != NULL) {
connPtr->sockPtr = NULL;
sockPtr->state = SOCK_CLOSEWAIT;
SockPush(sockPtr, &closePtr);
}
FreeConn(connPtr);
}
/*
* Process ready sockets.
*/
stop = (flags & DRIVER_SHUTDOWN);
sockPtr = waitPtr;
waitPtr = NULL;
while (sockPtr != NULL) {
nextPtr = sockPtr->nextPtr;
switch (sockPtr->state) {
case SOCK_CLOSEWAIT:
/*
* Cleanup connections in graceful close.
*/
if (PollIn(&pdata, sockPtr->pidx)) {
n = recv(sockPtr->sock, drain, sizeof(drain), 0);
if (n <= 0) {
/* NB: Timeout Sock on end-of-file or error. */
sockPtr->timeout = now;
}
}
if (Ns_DiffTime(&sockPtr->timeout, &now, NULL) <= 0 || stop) {
/* Close wait complete or timeout. */
SockClose(sockPtr);
} else {
SockPush(sockPtr, &waitPtr);
}
break;
case SOCK_QUEWAIT:
/*
* Run connections with queue-wait callbacks.
*/
if (!RunQueWaits(&pdata, &now, sockPtr)) {
SockClose(sockPtr);
} else if (sockPtr->connPtr->queWaitPtr == NULL) {
SockPush(sockPtr, &queSockPtr); /* Ready to queue. */
} else {
SockPush(sockPtr, &waitPtr); /* Still pending. */
}
break;
case SOCK_READWAIT:
/*
* Read connection for more input.
*/
if (!PollIn(&pdata, sockPtr->pidx)) {
if (Ns_DiffTime(&sockPtr->timeout, &now, NULL) <= 0
|| stop) {
/* Timeout waiting for input. */
SockClose(sockPtr);
} else {
SockPush(sockPtr, &waitPtr);
}
} else {
/* Input now available */
if (sockPtr->connPtr->ibuf.length == 0) {
sockPtr->connPtr->times.read = now;
}
if (!(drvPtr->opts & NS_DRIVER_ASYNC)) {
/* Queue for read by reader threads. */
SockPush(sockPtr, &readSockPtr);
} else {
/* Read directly. */
SockRead(sockPtr);
if (sockPtr->state == SOCK_READWAIT) {
SockWait(sockPtr, &now, drvPtr->recvwait, &waitPtr);
} else {
SockPush(sockPtr, &preqSockPtr);
}
}
}
break;
case SOCK_RUNWAIT:
/* NB: Handled below when processing Conn queue. */
break;
default:
Ns_Fatal("impossible state");
break;
}
sockPtr = nextPtr;
}
/*
* Move Sock's to the reader threads if necessary.
*/
if (readSockPtr != NULL) {
n = 0;
Ns_MutexLock(&drvPtr->lock);
while ((sockPtr = readSockPtr) != NULL) {
readSockPtr = sockPtr->nextPtr;
sockPtr->nextPtr = drvPtr->readSockPtr;
drvPtr->readSockPtr = sockPtr;
++n;
}
while (n > drvPtr->idlereaders
&& drvPtr->nreaders < drvPtr->maxreaders) {
Ns_ThreadCreate(ReaderThread, drvPtr, 0,
&drvPtr->readers[drvPtr->nreaders]);
++drvPtr->nreaders;
++drvPtr->idlereaders;
--n;
}
Ns_MutexUnlock(&drvPtr->lock);
if (n > 0) {
Ns_CondSignal(&drvPtr->cond);
}
}
/*
* Process Sock's returned for keep-alive or close.
*/
while ((sockPtr = closePtr) != NULL) {
closePtr = sockPtr->nextPtr;
if (!stop && sockPtr->state == SOCK_READWAIT) {
sockPtr->connPtr = AllocConn(drvPtr, &now, sockPtr);
SockWait(sockPtr, &now, drvPtr->keepwait, &waitPtr);
} else if (!drvPtr->closewait || shutdown(sockPtr->sock, 1) != 0) {
/* Graceful close diabled or shutdown() failed. */
SockClose(sockPtr);
} else {
SockWait(sockPtr, &now, drvPtr->closewait, &waitPtr);
}
}
/*
* Process sockets ready to run, starting with pre-queue callbacks.
*/
while ((sockPtr = preqSockPtr) != NULL) {
preqSockPtr = sockPtr->nextPtr;
sockPtr->connPtr->times.ready = now;
if (sockPtr->state != SOCK_PREQUE ||
NsRunFilters((Ns_Conn *) sockPtr->connPtr,
NS_FILTER_PRE_QUEUE) != NS_OK) {
SockClose(sockPtr);
} else if (sockPtr->connPtr->queWaitPtr != NULL) {
/* NB: Sock timeout ignored during que wait. */
sockPtr->state = SOCK_QUEWAIT;
SockPush(sockPtr, &waitPtr);
} else {
SockPush(sockPtr, &queSockPtr);
}
}
/*
* Add Sock's now ready to the queue.
*/
while ((sockPtr = queSockPtr) != NULL) {
queSockPtr = sockPtr->nextPtr;
connPtr = sockPtr->connPtr;
sockPtr->timeout = connPtr->times.queue = now;
Ns_IncrTime(&sockPtr->timeout, connPtr->limitsPtr->timeout, 0);
AppendConn(drvPtr, connPtr);
}
/*
* Attempt to queue any waiting connections.
*/
connPtr = drvPtr->firstConnPtr;
drvPtr->firstConnPtr = drvPtr->lastConnPtr = NULL;
while (connPtr != NULL) {
nextConnPtr = connPtr->nextPtr;
sockPtr = connPtr->sockPtr;
limitsPtr = connPtr->limitsPtr;
Ns_MutexLock(&limitsPtr->lock);
if (sockPtr->state == SOCK_RUNWAIT) {
--limitsPtr->nwaiting;
if (PollIn(&pdata, sockPtr->pidx)) {
++drvPtr->stats.dropped;
sockPtr->state = SOCK_DROPPED;
goto dropped;
}
}
if (limitsPtr->nrunning < limitsPtr->maxrun) {
++limitsPtr->nrunning;
sockPtr->state = SOCK_RUNNING;
} else if (Ns_DiffTime(&sockPtr->timeout, &now, NULL) <= 0) {
++limitsPtr->ntimeout;
++drvPtr->stats.timeout;
sockPtr->state = SOCK_TIMEOUT;
} else if (limitsPtr->nwaiting < limitsPtr->maxwait) {
++limitsPtr->nwaiting;
sockPtr->state = SOCK_RUNWAIT;
} else {
++limitsPtr->noverflow;
++drvPtr->stats.overflow;
sockPtr->state = SOCK_OVERFLOW;
}
dropped:
Ns_MutexUnlock(&limitsPtr->lock);
switch (sockPtr->state) {
case SOCK_RUNWAIT:
AppendConn(drvPtr, connPtr);
SockPush(sockPtr, &waitPtr);
break;
case SOCK_DROPPED:
SockClose(sockPtr);
break;
case SOCK_TIMEOUT:
connPtr->flags |= NS_CONN_TIMEOUT;
/* FALLTHROUGH */
case SOCK_OVERFLOW:
connPtr->limitsPtr = NULL;
connPtr->flags |= NS_CONN_OVERFLOW;
/* FALLTHROUGH */
case SOCK_RUNNING:
/* NB: Sock no longer responsible for Conn. */
sockPtr->connPtr->times.run = now;
sockPtr->connPtr = NULL;
NsQueueConn(connPtr);
++drvPtr->stats.queued;
break;
default:
Ns_Fatal("impossible state");
break;
}
connPtr = nextConnPtr;
}
/*
* Attempt to accept new sockets.
*/
if (!stop && lidx >= 0 && PollIn(&pdata, lidx)
&& ((sockPtr = SockAccept(lsock, drvPtr)) != NULL)) {
sockPtr->acceptTime = now;
sockPtr->connPtr = AllocConn(drvPtr, &now, sockPtr);
SockWait(sockPtr, &now, drvPtr->recvwait, &waitPtr);
++drvPtr->stats.accepts;
}
/*
* Copy current driver details if requested.
*/
if (flags & DRIVER_QUERY) {
Ns_MutexLock(&drvPtr->lock);
Tcl_DStringAppendElement(drvPtr->queryPtr, "stats");
Tcl_DStringStartSublist(drvPtr->queryPtr);
Ns_DStringPrintf(drvPtr->queryPtr,
"time %ld:%ld "
"spins %ld accepts %u queued %u reads %u "
"dropped %u overflow %d timeout %d",
now.sec, now.usec,
drvPtr->stats.spins, drvPtr->stats.accepts,
drvPtr->stats.queued, drvPtr->stats.reads,
drvPtr->stats.dropped, drvPtr->stats.overflow,
drvPtr->stats.timeout);
Tcl_DStringEndSublist(drvPtr->queryPtr);
Tcl_DStringAppendElement(drvPtr->queryPtr, "socks");
sockPtr = waitPtr;
Tcl_DStringStartSublist(drvPtr->queryPtr);
while (sockPtr != NULL) {
Tcl_DStringStartSublist(drvPtr->queryPtr);
Ns_DStringPrintf(drvPtr->queryPtr,
"id %u sock %d state %s idx %d events %d revents %d "
"accept %ld:%ld timeout %ld:%ld",
sockPtr->id, sockPtr->sock, states[sockPtr->state], sockPtr->pidx,
pdata.pfds[sockPtr->pidx].events,
pdata.pfds[sockPtr->pidx].revents,
sockPtr->acceptTime.sec, sockPtr->acceptTime.usec,
sockPtr->timeout.sec, sockPtr->timeout.usec);
NsAppendConn(drvPtr->queryPtr, sockPtr->connPtr, "i/o");
Tcl_DStringEndSublist(drvPtr->queryPtr);
sockPtr = sockPtr->nextPtr;
}
Tcl_DStringEndSublist(drvPtr->queryPtr);
drvPtr->flags &= ~DRIVER_QUERY;
Ns_CondBroadcast(&drvPtr->cond);
Ns_MutexUnlock(&drvPtr->lock);
}
}
/*
* TODO: Handle waiting Sock's on shutdown.
*/
if (lsock != INVALID_SOCKET) {
ns_sockclose(lsock);
}
while (drvPtr->nreaders > 0) {
--drvPtr->nreaders;
Ns_ThreadJoin(&drvPtr->readers[drvPtr->nreaders], NULL);
}
Ns_MutexLock(&drvPtr->lock);
drvPtr->flags |= DRIVER_STOPPED;
Ns_CondBroadcast(&drvPtr->cond);
Ns_MutexUnlock(&drvPtr->lock);
Ns_Log(Notice, "exiting");
}
/*
*----------------------------------------------------------------------
*
* Poll --
*
* Update given PollData to monitor given socket on next spin.
*
* Results:
* Index into Poll array.
*
* Side effects:
* Min timeout is updated if necessary.
*
*----------------------------------------------------------------------
*/
static int
Poll(PollData *pdataPtr, SOCKET sock, int events, Ns_Time *timeoutPtr)
{
int idx;
/*
* Allocate or grow the pfds array if necessary.
*/
if (pdataPtr->nfds >= pdataPtr->maxfds) {
pdataPtr->maxfds += 100;
pdataPtr->pfds = ns_realloc(pdataPtr->pfds,
pdataPtr->maxfds * sizeof(struct pollfd));
}
/*
* Set the next pollfd struct with this socket.
*/
pdataPtr->pfds[pdataPtr->nfds].fd = sock;
pdataPtr->pfds[pdataPtr->nfds].events = events;
pdataPtr->pfds[pdataPtr->nfds].revents = 0;
idx = pdataPtr->nfds++;
/*
* Check for new minimum timeout.
*/
if (timeoutPtr != NULL && (pdataPtr->timeoutPtr == NULL
|| (Ns_DiffTime(timeoutPtr, pdataPtr->timeoutPtr, NULL) < 0))) {
pdataPtr->timeoutPtr = timeoutPtr;
}
return idx;
}
/*
*----------------------------------------------------------------------
*
* SockWait --
*
* Update Sock timeout and queue on given list.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
SockWait(Sock *sockPtr, Ns_Time *nowPtr, int timeout, Sock **listPtrPtr)
{
sockPtr->timeout = *nowPtr;
Ns_IncrTime(&sockPtr->timeout, timeout, 0);
SockPush(sockPtr, listPtrPtr);
}
/*
*----------------------------------------------------------------------
*
* AppendConn --
*
* Append a connection to the waiting list.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
AppendConn(Driver *drvPtr, Conn *connPtr)
{
if ((connPtr->prevPtr = drvPtr->lastConnPtr) == NULL) {
drvPtr->firstConnPtr = connPtr;
} else {
connPtr->prevPtr->nextPtr = connPtr;
}
drvPtr->lastConnPtr = connPtr;
connPtr->nextPtr = NULL;
}
/*
*----------------------------------------------------------------------
*
* SockAccept --
*
* Accept and initialize a new Sock.
*
* Results:
* Pointer to Sock or NULL on error.
*
* Side effects:
* Socket buffer sizes are set as configured.
*
*----------------------------------------------------------------------
*/
static Sock *
SockAccept(SOCKET lsock, Driver *drvPtr)
{
Sock *sockPtr = drvPtr->freeSockPtr;
int slen;
/*
* Accept the new connection.
*/
slen = sizeof(struct sockaddr_in);
sockPtr->sock = Ns_SockAccept(lsock,
(struct sockaddr *) &sockPtr->sa, &slen);
if (sockPtr->sock == INVALID_SOCKET) {
return NULL;
}
sockPtr->id = drvPtr->nextid++;
sockPtr->drvPtr = drvPtr;
sockPtr->state = SOCK_READWAIT;
sockPtr->arg = NULL;
sockPtr->connPtr = NULL;
/*
* Even though the socket should have inherited
* non-blocking from the accept socket, set again
* just to be sure.
*/
Ns_SockSetNonBlocking(sockPtr->sock);
/*
* Set the send/recv socket bufsizes if required.
*/
if (drvPtr->sndbuf > 0) {
setsockopt(sockPtr->sock, SOL_SOCKET, SO_SNDBUF,
(char *) &drvPtr->sndbuf, sizeof(drvPtr->sndbuf));
}
if (drvPtr->rcvbuf > 0) {
setsockopt(sockPtr->sock, SOL_SOCKET, SO_RCVBUF,
(char *) &drvPtr->rcvbuf, sizeof(drvPtr->rcvbuf));
}
++drvPtr->nactive;
drvPtr->freeSockPtr = sockPtr->nextPtr;
sockPtr->nextPtr = NULL;
return sockPtr;
}
/*
*----------------------------------------------------------------------
*
* SockClose --
*
* Close a socket and release the connection structure for
* re-use.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
SockClose(Sock *sockPtr)
{
Driver *drvPtr = sockPtr->drvPtr;
/*
* Free the Conn if the Sock is still responsible for it.
*/
if (sockPtr->connPtr != NULL) {
FreeConn(sockPtr->connPtr);
sockPtr->connPtr = NULL;
}
(void) (*drvPtr->proc)(DriverClose, (Ns_Sock *) sockPtr, NULL, 0);
ns_sockclose(sockPtr->sock);
sockPtr->sock = INVALID_SOCKET;
drvPtr->stats.reads += sockPtr->nreads;
drvPtr->stats.writes += sockPtr->nwrites;
sockPtr->nextPtr = drvPtr->freeSockPtr;
drvPtr->freeSockPtr = sockPtr;
--drvPtr->nactive;
}
/*
*----------------------------------------------------------------------
*
* TriggerDriver --
*
* Wakeup driver from blocking Poll().
*
* Results:
* None.
*
* Side effects:
* Given driver will wakeup.
*
*----------------------------------------------------------------------
*/
static void
TriggerDriver(Driver *drvPtr)
{
if (send(drvPtr->trigger[1], "", 1, 0) != 1) {
Ns_Fatal("driver: trigger send() failed: %s",
ns_sockstrerror(ns_sockerrno));
}
}
/*
*----------------------------------------------------------------------
*
* SockRead --
*
* Read content from the given Sock, processing the input as
* necessary. This is the core callback routine designed to
* either be called repeatedly from a driver thread during
* an async read-ahead or in a blocking loop in ReaderThread.
*
* Results:
* None.
*
* Side effects:
* Will set sockPtr->state to next state as needed. Also,
* the next byte to read mark and bytes available are set
* to the beginning of the content, just beyond the headers,
* when all content has been read.
*
*----------------------------------------------------------------------
*/
static void
SockRead(Sock *sockPtr)
{
Driver *drvPtr = sockPtr->drvPtr;
Conn *connPtr = sockPtr->connPtr;
Ns_Sock *sock = (Ns_Sock *) sockPtr;
int err;
++sockPtr->nreads;
if ((connPtr->flags & NS_CONN_READHDRS)) {
err = SockReadContent(drvPtr, sock, connPtr);
} else {
err = SockReadLine(drvPtr, sock, connPtr);
}
/*
* Rewind content and mark the connection ready if all input received.
*/
if (!err && (connPtr->flags & NS_CONN_READHDRS) &&
connPtr->avail >= connPtr->contentLength) {
connPtr->avail = connPtr->contentLength;
if (!(connPtr->flags & NS_CONN_FILECONTENT)) {
connPtr->content[connPtr->avail] = '\0';
connPtr->next = connPtr->content;
} else if (lseek(connPtr->tfd, (off_t) 0, SEEK_SET) != 0) {
err = E_FDSEEK;
}
if (!err) {
sockPtr->state = SOCK_PREQUE;
}
}
if (err) {
LogError(sockPtr, err);
sockPtr->state = SOCK_ERROR;
}
}
static int
SockReadContent(Driver *drvPtr, Ns_Sock *sock, Conn *connPtr)
{
struct iovec buf;
char fbuf[4096];
int n;
/*
* When reading content, allow for 2 bytes more then the expected
* content to absorb the extra \r\n, if any, at the end of a POST
* request. Note this isn't guaranteed to work in the case the two
* bytes would have arrived in the next packet.
*/
buf.iov_len = connPtr->contentLength - connPtr->avail + 2;
if (!(connPtr->flags & NS_CONN_FILECONTENT)) {
buf.iov_base = connPtr->content + connPtr->avail;
} else {
if (buf.iov_len > sizeof(fbuf)) {
buf.iov_len = sizeof(fbuf);
}
buf.iov_base = fbuf;
}
n = (*drvPtr->proc)(DriverRecv, sock, &buf, 1);
if (n <= 0) {
return E_RECV;
}
if ((connPtr->flags & NS_CONN_FILECONTENT) &&
write(connPtr->tfd, fbuf, n) != n) {
return E_FDWRITE;
}
connPtr->avail += n;
return 0;
}
static int
SockReadLine(Driver *drvPtr, Ns_Sock *sock, Conn *connPtr)
{
Tcl_DString *bufPtr;
NsServer *servPtr;
Ns_Request *request;
ServerMap *mapPtr;
Tcl_HashEntry *hPtr;
struct iovec buf;
char *s, *e, *hdr, save;
int len, n, max, nbuf, major;
/*
* Setup the request buffer and read more input.
*/
bufPtr = &connPtr->ibuf;
len = bufPtr->length;
max = bufPtr->spaceAvl - 1;
if (len == drvPtr->maxinput) {
return E_RRANGE;
}
if (max < drvPtr->bufsize) {
max += drvPtr->bufsize;
if (max > drvPtr->maxinput) {
max = drvPtr->maxinput;
}
}
Tcl_DStringSetLength(bufPtr, max);
buf.iov_base = bufPtr->string + len;
buf.iov_len = max - len;
n = (*drvPtr->proc)(DriverRecv, sock, &buf, 1);
if (n <= 0) {
return E_RECV;
}
len += n;
Tcl_DStringSetLength(bufPtr, len);
/*
* Scan available content for lines until end-of-headers.
*/
while (!(connPtr->flags & NS_CONN_READHDRS)) {
/*
* Look for a newline past the current read offset. If the buffer
* does not include a full line, return now to request more input.
*/
s = bufPtr->string + connPtr->roff;
e = strchr(s, '\n');
if (e == NULL) {
return 0;
}
/*
* Check for max single line overflow.
*/
if ((e - s) > drvPtr->maxline) {
return E_LRANGE;
}
/*
* Update next read pointer to end of this line, trim any
* expected \r before the \n, and temporarily null-terminate
* the line.
*/
connPtr->roff += (e - s + 1);
if (e > s && e[-1] == '\r') {
--e;
}
save = *e;
*e = '\0';
/*
* On the first line, save the offsets for later request parsing,
* checking for pre-HTTP/1.0 requests. Otherwise, parse the line
* as the next header.
*/
if (connPtr->rstart == NULL) {
connPtr->rstart = s;
connPtr->rend = e;
if (NsFindVersion(s, &major, NULL) == NULL || major < 1) {
connPtr->flags |= NS_CONN_SKIPHDRS;
e = s;
}
} else if (e > s) {
if (Ns_ParseHeader(connPtr->headers, s, Preserve) != NS_OK) {
return E_HINVAL;
}
}
/*
* Restore the line and check for end-of-headers.
*/
*e = save;
if (e == s) {
connPtr->flags |= NS_CONN_READHDRS;
}
}
/*
* With the request and headers read, setup the connection. First,
* determine the virtual server and driver location, handling
* Host: header based multi-server drivers.
*/
servPtr = connPtr->drvPtr->servPtr;
if (servPtr != NULL) {
connPtr->location = connPtr->drvPtr->location;
} else {
hdr = Ns_SetIGet(connPtr->headers, "host");
if (hdr == NULL) {
return E_NOHOST;
}
hPtr = Tcl_FindHashEntry(&hosts, hdr);
if (hPtr == NULL) {
mapPtr = defMapPtr;
} else {
mapPtr = Tcl_GetHashValue(hPtr);
}
if (mapPtr == NULL) {
return E_NOSERV;
}
servPtr = mapPtr->servPtr;
connPtr->location = mapPtr->location;
}
connPtr->servPtr = servPtr;
connPtr->server = servPtr->server;
/*
* Next, parse the request using the server-default URL encoding.
*/
save = *connPtr->rend;
*connPtr->rend = '\0';
connPtr->request = request = Ns_ParseRequestEx(connPtr->rstart,
servPtr->urlEncoding);
*connPtr->rend = save;
if (request == NULL || request->method == NULL) {
return E_RINVAL;
}
if (STREQ(request->method, "HEAD")) {
connPtr->flags |= NS_CONN_SKIPBODY;
}
/*
* Parse authorization header, if any.
*/
hdr = Ns_SetIGet(connPtr->headers, "authorization");
if (hdr != NULL) {
s = hdr;
while (*s != '\0' && !isspace(UCHAR(*s))) {
++s;
}
if (*s != '\0') {
save = *s;
*s = '\0';
if (STRIEQ(hdr, "basic")) {
e = s + 1;
while (*e != '\0' && isspace(UCHAR(*e))) {
++e;
}
len = strlen(e) + 3;
connPtr->authUser = ns_malloc((size_t) len);
len = Ns_HtuuDecode(e, connPtr->authUser, len);
connPtr->authUser[len] = '\0';
e = strchr(connPtr->authUser, ':');
if (e != NULL) {
*e++ = '\0';
connPtr->authPasswd = e;
}
}
*s = save;
}
}
/*
* Get limits and content length, checking for overflow.
*/
connPtr->limitsPtr = NsGetRequestLimits(connPtr->server,
request->method, request->url);
hdr = Ns_SetIGet(connPtr->headers, "content-length");
if (hdr == NULL) {
len = 0;
} else if (sscanf(hdr, "%d", &len) != 1 || len < 0) {
return E_NINVAL;
}
if (len > connPtr->limitsPtr->maxupload) {
return E_CRANGE;
}
connPtr->contentLength = len;
/*
* Setup the connection to read remaining content (if any).
*/
connPtr->avail = bufPtr->length - connPtr->roff;
nbuf = connPtr->roff + connPtr->contentLength;
if (nbuf < connPtr->drvPtr->maxinput) {
/*
* Content will fit at end of request buffer.
*/
Tcl_DStringSetLength(bufPtr, nbuf);
connPtr->content = bufPtr->string + connPtr->roff;
} else {
/*
* Content must overflow to a temp file.
*/
connPtr->flags |= NS_CONN_FILECONTENT;
connPtr->tfd = Ns_GetTemp();
if (connPtr->tfd < 0) {
return E_FDAGAIN;
}
if (write(connPtr->tfd, bufPtr->string + connPtr->roff,
connPtr->avail) != connPtr->avail) {
return E_FDWRITE;
}
Tcl_DStringSetLength(bufPtr, connPtr->roff);
}
return 0;
}
/*
*----------------------------------------------------------------------
*
* RunQueWaits --
*
* Run Sock queue wait callbacks.
*
* Results:
* nsReadDone: Conn is ready for processing.
* nsReadMore: More callbacks are pending.
* nsReadFail: Client drop or timeout.
*
* Side effects:
* Depends on callbacks which may, e.g., register more queue wait
* callbacks.
*
*----------------------------------------------------------------------
*/
static int
RunQueWaits(PollData *pdataPtr, Ns_Time *nowPtr, Sock *sockPtr)
{
Conn *connPtr = sockPtr->connPtr;
QueWait *queWaitPtr, *nextPtr;
int revents, why, dropped;
if (PollIn(pdataPtr, sockPtr->pidx)) {
dropped = 1;
} else {
dropped = 0;
}
queWaitPtr = connPtr->queWaitPtr;
connPtr->queWaitPtr = NULL;
while (queWaitPtr != NULL) {
nextPtr = queWaitPtr->nextPtr;
if (dropped) {
why = NS_SOCK_DROP;
} else {
why = 0;
revents = pdataPtr->pfds[queWaitPtr->pidx].revents;
if (revents & POLLIN) {
why |= NS_SOCK_READ;
}
if (revents & POLLOUT) {
why |= NS_SOCK_WRITE;
}
}
if (why == 0
&& Ns_DiffTime(&queWaitPtr->timeout, nowPtr, NULL) > 0) {
queWaitPtr->nextPtr = connPtr->queWaitPtr;
connPtr->queWaitPtr = queWaitPtr;
} else {
(*queWaitPtr->proc)((Ns_Conn *) connPtr, queWaitPtr->sock,
queWaitPtr->arg, why);
ns_free(queWaitPtr);
}
queWaitPtr = nextPtr;
}
return (dropped ? 0 : 1);
}
/*
*----------------------------------------------------------------------
*
* AllocConn --
*
* Allocate a Conn structure and basic I/O related members.
*
* Results:
* Pointer to new Conn.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static Conn *
AllocConn(Driver *drvPtr, Ns_Time *nowPtr, Sock *sockPtr)
{
Conn *connPtr;
static int nextid = 0;
int id;
Ns_MutexLock(&connlock);
id = nextid++;
connPtr = firstConnPtr;
if (connPtr != NULL) {
firstConnPtr = connPtr->nextPtr;
}
Ns_MutexUnlock(&connlock);
if (connPtr == NULL) {
connPtr = ns_calloc(1, sizeof(Conn));
Tcl_DStringInit(&connPtr->ibuf);
Tcl_DStringInit(&connPtr->obuf);
Tcl_InitHashTable(&connPtr->files, TCL_STRING_KEYS);
connPtr->headers = Ns_SetCreate(NULL);
connPtr->outputheaders = Ns_SetCreate(NULL);
connPtr->tfd = -1;
}
connPtr->drvPtr = drvPtr;
connPtr->times.accept = *nowPtr;
connPtr->id = id;
sprintf(connPtr->idstr, "cns%d", connPtr->id);
connPtr->port = ntohs(sockPtr->sa.sin_port);
strcpy(connPtr->peer, ns_inet_ntoa(sockPtr->sa.sin_addr));
connPtr->times.accept = sockPtr->acceptTime;
connPtr->sockPtr = sockPtr;
connPtr->nextPtr = connPtr->prevPtr = NULL;
return connPtr;
}
/*
*----------------------------------------------------------------------
*
* FreeConn --
*
* Free a Conn structure and members allocated by AllocConn.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
FreeConn(Conn *connPtr)
{
Ns_Conn *conn = (Ns_Conn *) connPtr;
Tcl_HashEntry *hPtr;
Tcl_HashSearch search;
FormFile *filePtr;
/*
* Call CLS cleanups.
*/
NsClsCleanup(connPtr);
/*
* Call public reset routines.
*/
Ns_ConnClearQuery(conn);
Ns_ConnSetType(conn, NULL);
Ns_ConnSetStatus(conn, 0);
Ns_ConnSetEncoding(conn, NULL);
Ns_ConnSetUrlEncoding(conn, NULL);
/*
* Cleanup public elements.
*/
if (conn->request != NULL) {
Ns_FreeRequest(conn->request);
conn->request = NULL;
}
Ns_SetTrunc(conn->headers, 0);
Ns_SetTrunc(conn->outputheaders, 0);
if (conn->authUser != NULL) {
ns_free(conn->authUser);
conn->authUser = conn->authPasswd = NULL;
}
conn->flags = 0;
conn->contentLength = 0;
/*
* Cleanup private elements.
*/
hPtr = Tcl_FirstHashEntry(&connPtr->files, &search);
while (hPtr != NULL) {
filePtr = Tcl_GetHashValue(hPtr);
Ns_SetFree(filePtr->hdrs);
ns_free(filePtr);
hPtr = Tcl_NextHashEntry(&search);
}
Tcl_DeleteHashTable(&connPtr->files);
Tcl_InitHashTable(&connPtr->files, TCL_STRING_KEYS);
connPtr->nContentSent = 0;
/*
* Cleanup content buffers.
*/
if (connPtr->tfd >= 0) {
Ns_ReleaseTemp(connPtr->tfd);
connPtr->tfd = -1;
}
connPtr->rstart = connPtr->rend = NULL;
connPtr->avail = 0;
connPtr->roff = 0;
connPtr->content = NULL;
connPtr->next = NULL;
Ns_DStringTrunc(&connPtr->ibuf, 0);
Ns_DStringTrunc(&connPtr->obuf, 0);
/*
* Dump on the free list.
*/
Ns_MutexLock(&connlock);
connPtr->nextPtr = firstConnPtr;
firstConnPtr = connPtr;
Ns_MutexUnlock(&connlock);
}
/*
*----------------------------------------------------------------------
*
* ReaderThread --
*
* Thread main for blocking connection reads.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
ReaderThread(void *arg)
{
Driver *drvPtr = arg;
Sock *sockPtr;
ThreadName(drvPtr, "reader");
Ns_MutexLock(&drvPtr->lock);
while (1) {
while (!(drvPtr->flags & DRIVER_SHUTDOWN)
&& drvPtr->readSockPtr == NULL) {
Ns_CondWait(&drvPtr->cond, &drvPtr->lock);
}
sockPtr = drvPtr->readSockPtr;
if (sockPtr == NULL) {
break;
}
drvPtr->readSockPtr = sockPtr->nextPtr;
if (drvPtr->readSockPtr != NULL) {
Ns_CondSignal(&drvPtr->cond);
}
--drvPtr->idlereaders;
Ns_MutexUnlock(&drvPtr->lock);
/*
* Read the connection until complete or error.
*/
do {
SockRead(sockPtr);
} while (sockPtr->state == SOCK_READWAIT);
/*
* Return the connection to the driver thread.
*/
Ns_MutexLock(&drvPtr->lock);
sockPtr->nextPtr = drvPtr->runSockPtr;
drvPtr->runSockPtr = sockPtr;
TriggerDriver(drvPtr);
++drvPtr->idlereaders;
}
Ns_MutexUnlock(&drvPtr->lock);
Ns_Log(Notice, "exiting");
}
/*
*----------------------------------------------------------------------
*
* ThreadName --
*
* Set name of driver or reader thread.
*
* Results:
* None.
*
* Side effects:
* Thread name will show up in log messages.
*
*----------------------------------------------------------------------
*/
static void
ThreadName(Driver *drvPtr, char *name)
{
Tcl_DString ds;
Tcl_DStringInit(&ds);
Ns_DStringVarAppend(&ds, "-", drvPtr->module, ":", name, "-", NULL);
Ns_ThreadSetName(ds.string);
Tcl_DStringFree(&ds);
Ns_Log(Notice, "starting");
}
static void
LogError(Sock *sockPtr, DrvErr err)
{
char *msg;
switch (err) {
case E_NOERROR:
msg = "no error";
break;
case E_RECV:
msg = "recv failed";
break;
case E_FDAGAIN:
msg = "fd unavailable";
break;
case E_FDWRITE:
msg = "fd write failed";
break;
case E_FDSEEK:
msg = "fd seek failed";
break;
case E_NOHOST:
msg = "no host header";
break;
case E_NOSERV:
msg = "no such host";
break;
case E_HINVAL:
msg = "invalid header";
break;
case E_RINVAL:
msg = "invalid request";
break;
case E_NINVAL:
msg = "invalid content-length";
break;
case E_LRANGE:
msg = "max line exceeded";
break;
case E_RRANGE:
msg = "max request exceeded";
break;
case E_CRANGE:
msg = "max content exceeded";
break;
}
if (1 /*drvPtr->debug */) {
Ns_Log(Error, "%d: %s", sockPtr->connPtr->id, msg);
}
}
|
Back to SourceForge.net Powered by ViewCVS 1.0-dev |