Fixed to handle POLLHUP returned from Poll as observed on OS/X 10.4.
/*
* The contents of this file are subject to the AOLserver Public License
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://aolserver.com/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is AOLserver Code and related documentation
* distributed by AOL.
*
* The Initial Developer of the Original Code is America Online,
* Inc. Portions created by AOL are Copyright (C) 1999 America Online,
* Inc. All Rights Reserved.
*
* Alternatively, the contents of this file may be used under the terms
* of the GNU General Public License (the "GPL"), in which case the
* provisions of GPL are applicable instead of those above. If you wish
* to allow use of your version of this file only under the terms of the
* GPL and not to allow others to use your version of this file under the
* License, indicate your decision by deleting the provisions above and
* replace them with the notice and other provisions required by the GPL.
* If you do not delete the provisions above, a recipient may use your
* version of this file under either the License or the GPL.
*/
/*
* task.c --
*
* Support for I/O tasks.
*/
static const char *RCSID = "@(#) $Header: /cvsroot/aolserver/aolserver/nsd/task.c,v 1.2 2005/06/22 00:02:15 jgdavidson Exp $, compiled: " __DATE__ " " __TIME__;
#include "nsd.h"
/*
* The following defines a task queue.
*/
#define NAME_SIZE 31
typedef struct Queue {
struct Queue *nextPtr; /* Next in list of all queues. */
struct Task *firstSignalPtr; /* First in list of task signals. */
Ns_Thread tid; /* Thread id. */
Ns_Mutex lock; /* Queue list and signal lock. */
Ns_Cond cond; /* Task and queue signal condition. */
int shutdown; /* Shutdown flag. */
int stopped; /* Stop flag. */
SOCKET trigger[2]; /* Trigger pipe. */
char name[NAME_SIZE+1]; /* String name. */
} Queue;
/*
* The following bits are used to send signals to a task queue
* and manage the state tasks.
*/
#define TASK_INIT 0x01
#define TASK_CANCEL 0x02
#define TASK_WAIT 0x04
#define TASK_TIMEOUT 0x08
#define TASK_DONE 0x10
#define TASK_PENDING 0x20
/*
* The following defines a task.
*/
typedef struct Task {
struct Queue *queuePtr; /* Monitoring queue. */
struct Task *nextWaitPtr; /* Next on wait queue. */
struct Task *nextSignalPtr; /* Next on signal queue. */
SOCKET sock; /* Underlying socket. */
Ns_TaskProc *proc; /* Queue callback. */
void *arg; /* Callback data. */
int idx; /* Poll index. */
int events; /* Poll events. */
Ns_Time timeout; /* Non-null timeout data. */
int signal; /* Signal bits sent to/from queue thread. */
int flags; /* Flags private to queue. */
} Task;
/*
* Local functions defined in this file
*/
static void TriggerQueue(Queue *queuePtr);
static void JoinQueue(Queue *queuePtr);
static void StopQueue(Queue *queuePtr);
static int SignalQueue(Task *taskPtr, int bit);
static Ns_ThreadProc TaskQueue;
static void RunTask(Task *taskPtr, int revents, Ns_Time *nowPtr);
#define Call(tp,w) ((*((tp)->proc))((Ns_Task *)(tp),(tp)->sock,(tp)->arg,(w)))
/*
* Static variables defined in this file
*/
static Queue *firstQueuePtr; /* List of all queues. */
static Ns_Mutex lock; /* Lock for queue list. */
/*
* The following maps AOLserver sock "when" bits to Poll event bits.
* The order is significant and determines the order of callbacks
* when multiple events are ready.
*/
static struct {
int when; /* AOLserver when bit. */
int event; /* Poll event bit. */
} map[] = {
{NS_SOCK_EXCEPTION, POLLPRI},
{NS_SOCK_WRITE, POLLOUT},
{NS_SOCK_READ, POLLIN}
};
/*
*----------------------------------------------------------------------
*
* Ns_CreateTaskQueue --
*
* Create a new task queue.
*
* Results:
* Handle to task queue..
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
Ns_TaskQueue *
Ns_CreateTaskQueue(char *name)
{
Queue *queuePtr;
queuePtr = ns_calloc(1, sizeof(Queue));
strncpy(queuePtr->name, name ? name : "", NAME_SIZE);
if (ns_sockpair(queuePtr->trigger) != 0) {
Ns_Fatal("queue: ns_sockpair() failed: %s",
ns_sockstrerror(ns_sockerrno));
}
Ns_MutexLock(&lock);
queuePtr->nextPtr = firstQueuePtr;
firstQueuePtr = queuePtr;
Ns_ThreadCreate(TaskQueue, queuePtr, 0, &queuePtr->tid);
Ns_MutexUnlock(&lock);
return (Ns_TaskQueue *) queuePtr;
}
/*
*----------------------------------------------------------------------
*
* Ns_DestoryTaskQueue --
*
* Stop and join a task queue.
*
* Results:
* None.
*
* Side effects:
* Pending tasks callbacks, if any, are cancelled.
*
*----------------------------------------------------------------------
*/
void
Ns_DestroyTaskQueue(Ns_TaskQueue *queue)
{
Queue *queuePtr = (Queue *) queue;
Queue **nextPtrPtr;
/*
* Remove queue from list of all queues.
*/
Ns_MutexLock(&lock);
nextPtrPtr = &firstQueuePtr;
while (*nextPtrPtr != queuePtr) {
nextPtrPtr = &(*nextPtrPtr)->nextPtr;
}
*nextPtrPtr = queuePtr->nextPtr;
Ns_MutexUnlock(&lock);
/*
* Signal stop and wait for join.
*/
StopQueue(queuePtr);
JoinQueue(queuePtr);
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskCreate --
*
* Create a new task.
*
* Results:
* Handle to task.
*
* Side effects:
* None
*
*----------------------------------------------------------------------
*/
Ns_Task *
Ns_TaskCreate(SOCKET sock, Ns_TaskProc *proc, void *arg)
{
Task *taskPtr;
taskPtr = ns_calloc(1, sizeof(Task));
taskPtr->sock = sock;
taskPtr->proc = proc;
taskPtr->arg = arg;
return (Ns_Task *) taskPtr;
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskEnqueue --
*
* Add a task to a queue.
*
* Results:
* NS_OK if task sent, NS_ERROR otherwise.
*
* Side effects:
* Queue will begin running the task.
*
*----------------------------------------------------------------------
*/
int
Ns_TaskEnqueue(Ns_Task *task, Ns_TaskQueue *queue)
{
Task *taskPtr = (Task *) task;
Queue *queuePtr = (Queue *) queue;
taskPtr->queuePtr = queuePtr;
if (!SignalQueue(taskPtr, TASK_INIT)) {
return NS_ERROR;
}
return NS_OK;
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskRun --
*
* Run a task directly, waiting for completion.
*
* Results:
* None.
*
* Side effects:
* Depends on task callback.
*
*----------------------------------------------------------------------
*/
void
Ns_TaskRun(Ns_Task *task)
{
Task *taskPtr = (Task *) task;
struct pollfd pfd;
Ns_Time now, *timeoutPtr;
pfd.fd = taskPtr->sock;
Call(taskPtr, NS_SOCK_INIT);
while (!(taskPtr->flags & TASK_DONE)) {
if (taskPtr->flags & TASK_TIMEOUT) {
timeoutPtr = &taskPtr->timeout;
} else {
timeoutPtr = NULL;
}
pfd.revents = 0;
pfd.events = taskPtr->events;
if (NsPoll(&pfd, 1, timeoutPtr) != 1) {
break;
}
Ns_GetTime(&now);
RunTask(taskPtr, pfd.revents, &now);
}
taskPtr->signal |= TASK_DONE;
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskCancel --
*
* Signal a task queue to stop running a task.
*
* Results:
* NS_OK if cancel sent, NS_ERROR otherwise.
*
* Side effects:
* Task callback will be invoke with NS_SOCK_CANCEL and is
* expected to call Ns_TaskDone to indicate completion.
*
*----------------------------------------------------------------------
*/
int
Ns_TaskCancel(Ns_Task *task)
{
Task *taskPtr = (Task *) task;
if (taskPtr->queuePtr == NULL) {
taskPtr->signal |= TASK_CANCEL;
} else if (!SignalQueue(taskPtr, TASK_CANCEL)) {
return NS_ERROR;
}
return NS_OK;
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskWait --
*
* Wait for a task to complete. Infinite wait is indicated
* by a NULL timeoutPtr.
*
* Results:
* NS_TIMEOUT if task did not complete by absolute time,
* NS_OK otherwise.
*
* Side effects:
* May wait up to specified timeout.
*
*----------------------------------------------------------------------
*/
int
Ns_TaskWait(Ns_Task *task, Ns_Time *timeoutPtr)
{
Task *taskPtr = (Task *) task;
Queue *queuePtr = taskPtr->queuePtr;
int status = NS_OK;
if (queuePtr == NULL) {
if (!(taskPtr->signal & TASK_DONE)) {
status = NS_TIMEOUT;
}
} else {
Ns_MutexLock(&queuePtr->lock);
while (status == NS_OK && !(taskPtr->signal & TASK_DONE)) {
status = Ns_CondTimedWait(&queuePtr->cond, &queuePtr->lock,
timeoutPtr);
}
Ns_MutexUnlock(&queuePtr->lock);
if (status == NS_OK) {
taskPtr->queuePtr = NULL;
}
}
return status;
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskCallback --
*
* Update pending conditions and timeout for a task. This
* routine is expected to be called from within the task
* callback proc including to set the initial wait conditions
* from within the NS_SOCK_INIT callback.
*
* Results:
* None.
*
* Side effects:
* Task callback will be invoked when ready or on timeout.
*
*----------------------------------------------------------------------
*/
void
Ns_TaskCallback(Ns_Task *task, int when, Ns_Time *timeoutPtr)
{
Task *taskPtr = (Task *) task;
int i;
/*
* Map from AOLserver when bits to Poll event bits.
*/
taskPtr->events = 0;
for (i = 0; i < 3; ++i) {
if (when & map[i].when) {
taskPtr->events |= map[i].event;
}
}
/*
* Copy timeout, if any.
*/
if (timeoutPtr == NULL) {
taskPtr->flags &= ~TASK_TIMEOUT;
} else {
taskPtr->flags |= TASK_TIMEOUT;
taskPtr->timeout = *timeoutPtr;
}
/*
* Mark as waiting if there are events or a timeout.
*/
if (taskPtr->events || timeoutPtr) {
taskPtr->flags |= TASK_WAIT;
} else {
taskPtr->flags &= ~TASK_WAIT;
}
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskDone --
*
* Mark a task as done. This routine should be called from
* within the task callback. The task queue thread will signal
* other waiting threads, if any, on next spin.
*
* Results:
* None.
*
* Side effects:
* Task queue will signal this task is done on next spin.
*
*----------------------------------------------------------------------
*/
void
Ns_TaskDone(Ns_Task *event)
{
Task *taskPtr = (Task *) event;
taskPtr->flags |= TASK_DONE;
}
/*
*----------------------------------------------------------------------
*
* Ns_TaskFree --
*
* Free task structure. The caller is responsible for
* ensuring the task is no longer being run or monitored
* a task queue.
*
* Results:
* The task SOCKET which the caller is responsible for closing
* or reusing.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
SOCKET
Ns_TaskFree(Ns_Task *task)
{
Task *taskPtr = (Task *) task;
SOCKET sock = taskPtr->sock;
ns_free(taskPtr);
return sock;
}
/*
*----------------------------------------------------------------------
*
* NsStartQueueShutdown --
*
* Trigger all task queues to begin shutdown.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
void
NsStartQueueShutdown(void)
{
Queue *queuePtr;
/*
* Trigger all queues to shutdown.
*/
Ns_MutexLock(&lock);
queuePtr = firstQueuePtr;
while (queuePtr != NULL) {
StopQueue(queuePtr);
queuePtr = queuePtr->nextPtr;
}
Ns_MutexUnlock(&lock);
}
/*
*----------------------------------------------------------------------
*
* NsWaitQueueShutdown --
*
* Wait for all task queues to shutdown.
*
* Results:
* None.
*
* Side effects:
* May timeout waiting for shutdown.
*
*----------------------------------------------------------------------
*/
void
NsWaitQueueShutdown(Ns_Time *toPtr)
{
Queue *queuePtr, *nextPtr;
int status;
/*
* Clear out list of any remaining task queues.
*/
Ns_MutexLock(&lock);
queuePtr = firstQueuePtr;
firstQueuePtr = NULL;
Ns_MutexUnlock(&lock);
/*
* Join all queues possible within total allowed time.
*/
status = NS_OK;
while (status == NS_OK && queuePtr != NULL) {
nextPtr = queuePtr->nextPtr;
Ns_MutexLock(&queuePtr->lock);
while (status == NS_OK && !queuePtr->stopped) {
status = Ns_CondTimedWait(&queuePtr->cond, &queuePtr->lock, toPtr);
}
Ns_MutexUnlock(&queuePtr->lock);
if (status == NS_OK) {
JoinQueue(queuePtr);
}
queuePtr = nextPtr;
}
if (status != NS_OK) {
Ns_Log(Warning, "timeout waiting for event queue shutdown");
}
}
/*
*----------------------------------------------------------------------
*
* RunTask --
*
* Run a single task from either a task queue or a directly via
* Ns_TaskRun.
*
* Results:
* None.
*
* Side effects:
* Depends on callbacks of given task.
*
*----------------------------------------------------------------------
*/
static void
RunTask(Task *taskPtr, int revents, Ns_Time *nowPtr)
{
int i;
/*
* NB: Treat POLLHUP as POLLIN on systems which return it.
*/
if (revents & POLLHUP) {
revents |= POLLIN;
}
if (revents) {
for (i = 0; i < 3; ++i) {
if (revents & map[i].event) {
Call(taskPtr, map[i].when);
}
}
} else if ((taskPtr->flags & TASK_TIMEOUT)
&& Ns_DiffTime(&taskPtr->timeout, nowPtr, NULL) < 0) {
taskPtr->flags &= ~ TASK_WAIT;
Call(taskPtr, NS_SOCK_TIMEOUT);
}
}
/*
*----------------------------------------------------------------------
*
* SignalQueue --
*
* Send a signal for a task to a task queue.
*
* Results:
* None.
*
* Side effects:
* Task queue will process signal on next spin.
*
*----------------------------------------------------------------------
*/
static int
SignalQueue(Task *taskPtr, int bit)
{
Queue *queuePtr = taskPtr->queuePtr;
int pending, shutdown;
Ns_MutexLock(&queuePtr->lock);
shutdown = queuePtr->shutdown;
if (!shutdown) {
/*
* Mark the signal and add event to signal list if not
* already there.
*/
taskPtr->signal |= bit;
pending = (taskPtr->signal & TASK_PENDING);
if (!pending) {
taskPtr->signal |= TASK_PENDING;
taskPtr->nextSignalPtr = queuePtr->firstSignalPtr;
queuePtr->firstSignalPtr = taskPtr;
}
}
Ns_MutexUnlock(&queuePtr->lock);
if (shutdown) {
return 0;
}
if (!pending) {
TriggerQueue(queuePtr);
}
return 1;
}
/*
*----------------------------------------------------------------------
*
* TriggerQueue --
*
* Wakeup a task queue.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
TriggerQueue(Queue *queuePtr)
{
if (send(queuePtr->trigger[1], "", 1, 0) != 1) {
Ns_Fatal("queue: trigger send() failed: %s",
ns_sockstrerror(ns_sockerrno));
}
}
/*
*----------------------------------------------------------------------
*
* StopQueue --
*
* Signal a task queue to shutdown.
*
* Results:
* None.
*
* Side effects:
* Queue will exit on next spin and call remaining tasks
* with NS_SOCK_EXIT.
*
*----------------------------------------------------------------------
*/
static void
StopQueue(Queue *queuePtr)
{
Ns_MutexLock(&queuePtr->lock);
queuePtr->shutdown = 1;
Ns_MutexUnlock(&queuePtr->lock);
TriggerQueue(queuePtr);
}
/*
*----------------------------------------------------------------------
*
* JoinQueue --
*
* Cleanup resources of a task queue.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static void
JoinQueue(Queue *queuePtr)
{
Ns_ThreadJoin(&queuePtr->tid, NULL);
ns_sockclose(queuePtr->trigger[0]);
ns_sockclose(queuePtr->trigger[1]);
Ns_MutexDestroy(&queuePtr->lock);
ns_free(queuePtr);
}
/*
*----------------------------------------------------------------------
*
* TaskQueue --
*
* Run an task queue.
*
* Results:
* None.
*
* Side effects:
* Depends on callbacks of given tasks.
*
*----------------------------------------------------------------------
*/
static void
TaskQueue(void *arg)
{
Queue *queuePtr = arg;
char c;
int n, broadcast, max, nfds, shutdown;
Task *taskPtr, *nextPtr, *firstWaitPtr;
struct pollfd *pfds;
Ns_Time now, *timeoutPtr;
char name[NAME_SIZE+10];
sprintf(name, "task:%s", queuePtr->name);
Ns_ThreadSetName(name);
Ns_Log(Notice, "starting");
max = 100;
pfds = ns_malloc(sizeof(struct pollfd) * max);
firstWaitPtr = NULL;
while (1) {
/*
* Get the shutdown flag and process any incoming signals.
*/
Ns_MutexLock(&queuePtr->lock);
shutdown = queuePtr->shutdown;
while ((taskPtr = queuePtr->firstSignalPtr) != NULL) {
queuePtr->firstSignalPtr = taskPtr->nextSignalPtr;
taskPtr->nextSignalPtr = NULL;
if (!(taskPtr->flags & TASK_WAIT)) {
taskPtr->flags |= TASK_WAIT;
taskPtr->nextWaitPtr = firstWaitPtr;
firstWaitPtr = taskPtr;
}
if (taskPtr->signal & TASK_INIT) {
taskPtr->signal &= ~TASK_INIT;
taskPtr->flags |= TASK_INIT;
}
if (taskPtr->signal & TASK_CANCEL) {
taskPtr->signal &= ~TASK_CANCEL;
taskPtr->flags |= TASK_CANCEL;
}
taskPtr->signal &= ~TASK_PENDING;
}
Ns_MutexUnlock(&queuePtr->lock);
/*
* Invoke pre-Poll callbacks, determine minimum timeout, and set
* the pollfd structs for all waiting tasks.
*/
pfds[0].fd = queuePtr->trigger[0];
pfds[0].events = POLLIN;
pfds[0].revents = 0;
nfds = 1;
timeoutPtr = NULL;
taskPtr = firstWaitPtr;
firstWaitPtr = NULL;
broadcast = 0;
while (taskPtr != NULL) {
nextPtr = taskPtr->nextWaitPtr;
/*
* Call init and/or cancel and signal done if necessary.
* Note that a task can go from init to done immediately
* so all required callbacks are invoked before determining
* if a wait is required.
*/
if (taskPtr->flags & TASK_INIT) {
taskPtr->flags &= ~TASK_INIT;
Call(taskPtr, NS_SOCK_INIT);
}
if (taskPtr->flags & TASK_CANCEL) {
taskPtr->flags &= ~(TASK_CANCEL|TASK_WAIT);
taskPtr->flags |= TASK_DONE;
Call(taskPtr, NS_SOCK_CANCEL);
}
if (taskPtr->flags & TASK_DONE) {
taskPtr->flags &= ~(TASK_DONE|TASK_WAIT);
Ns_MutexLock(&queuePtr->lock);
taskPtr->signal |= TASK_DONE;
Ns_MutexUnlock(&queuePtr->lock);
broadcast = 1;
}
if (taskPtr->flags & TASK_WAIT) {
if (max <= nfds) {
max = nfds + 100;
pfds = ns_realloc(pfds, (size_t) max);
}
taskPtr->idx = nfds;
pfds[nfds].fd = taskPtr->sock;
pfds[nfds].events = taskPtr->events;
pfds[nfds].revents = 0;
if ((taskPtr->flags & TASK_TIMEOUT) && (timeoutPtr == NULL
|| Ns_DiffTime(&taskPtr->timeout,
timeoutPtr, NULL) < 0)) {
timeoutPtr = &taskPtr->timeout;
}
taskPtr->nextWaitPtr = firstWaitPtr;
firstWaitPtr = taskPtr;
++nfds;
}
taskPtr = nextPtr;
}
/*
* Signal other threads which may be waiting on tasks to complete.
*/
if (broadcast) {
Ns_CondBroadcast(&queuePtr->cond);
}
/*
* Break now if shutting down now that all signals have been processed.
*/
if (shutdown) {
break;
}
/*
* Poll sockets and drain the trigger pipe if necessary.
*/
n = NsPoll(pfds, nfds, timeoutPtr);
if ((pfds[0].revents & POLLIN) && recv(pfds[0].fd, &c, 1, 0) != 1) {
Ns_Fatal("queue: trigger read() failed: %s",
ns_sockstrerror(ns_sockerrno));
}
/*
* Execute any ready events or timeouts for waiting tasks.
*/
Ns_GetTime(&now);
taskPtr = firstWaitPtr;
while (taskPtr != NULL) {
RunTask(taskPtr, pfds[taskPtr->idx].revents, &now);
taskPtr = taskPtr->nextWaitPtr;
}
}
Ns_Log(Notice, "shutdown pending");
/*
* Call exit for all remaining tasks.
*/
taskPtr = firstWaitPtr;
while (taskPtr != NULL) {
Call(taskPtr, NS_SOCK_EXIT);
taskPtr = taskPtr->nextWaitPtr;
}
/*
* Signal all tasks done and shutdown complete.
*/
Ns_MutexLock(&queuePtr->lock);
while ((taskPtr = firstWaitPtr) != NULL) {
firstWaitPtr = taskPtr->nextWaitPtr;
taskPtr->signal |= TASK_DONE;
}
queuePtr->stopped = 1;
Ns_MutexUnlock(&queuePtr->lock);
Ns_CondBroadcast(&queuePtr->cond);
Ns_Log(Notice, "shutdown complete");
}
|
Back to SourceForge.net Powered by ViewCVS 1.0-dev |