Additional change to control loop for sched thread shutdown of event threads.
/*
* The contents of this file are subject to the AOLserver Public License
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://aolserver.com/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is AOLserver Code and related documentation
* distributed by AOL.
*
* The Initial Developer of the Original Code is America Online,
* Inc. Portions created by AOL are Copyright (C) 1999 America Online,
* Inc. All Rights Reserved.
*
* Alternatively, the contents of this file may be used under the terms
* of the GNU General Public License (the "GPL"), in which case the
* provisions of GPL are applicable instead of those above. If you wish
* to allow use of your version of this file only under the terms of the
* GPL and not to allow others to use your version of this file under the
* License, indicate your decision by deleting the provisions above and
* replace them with the notice and other provisions required by the GPL.
* If you do not delete the provisions above, a recipient may use your
* version of this file under either the License or the GPL.
*/
static const char *RCSID = "@(#) $Header: /cvsroot/aolserver/aolserver/nsd/sched.c,v 1.14 2003/03/10 16:02:05 mpagenva Exp $, compiled: " __DATE__ " " __TIME__;
/*
* sched.c --
*
* Support for the background task and scheduled procedure
* interfaces. The implementation is based on the paper:
*
* "A Heap-based Callout Implementation to Meet Real-Time Needs",
* by Barkley and Lee, in "Proceeding of the Summer 1988 USENIX
* Conference".
*
* The heap code in particular is based on:
*
* "Chapter 9. Priority Queues and Heapsort", Sedgewick "Algorithms
* in C, 3rd Edition", Addison-Wesley, 1998.
*/
#include "nsd.h"
/*
* The following structure defines a scheduled event.
*/
typedef struct Event {
struct Event *nextPtr;
Tcl_HashEntry *hPtr; /* Entry in event hash or NULL if deleted. */
unsigned int id; /* Unique event id. */
int qid; /* Current priority queue id. */
time_t nextqueue; /* Next time to queue for run. */
time_t lastqueue; /* Last time queued for run. */
time_t laststart; /* Last time run started. */
time_t lastend; /* Last time run finished. */
int flags; /* One or more of NS_SCHED_ONCE, NS_SCHED_THREAD,
* NS_SCHED_DAILY, or NS_SCHED_WEEKLY. */
int interval; /* Interval specification. */
Ns_SchedProc *proc; /* Procedure to execute. */
void *arg; /* Client data for procedure. */
Ns_SchedProc *deleteProc; /* Procedure to cleanup when done (if any). */
} Event;
/*
* Local functions defined in this file.
*/
static Ns_ThreadProc SchedThread; /* Detached event firing thread. */
static Ns_ThreadProc EventThread; /* Proc for NS_SCHED_THREAD events. */
static void QueueEvent(Event *ePtr, time_t *nowPtr); /* Queue event on heap. */
static Event *DeQueueEvent(int qid); /* Remove event from heap. */
static void FreeEvent(Event *ePtr); /* Free completed or cancelled event. */
/*
* Static variables defined in this file.
*/
static Tcl_HashTable eventsTable; /* Hash table of events. */
static Ns_Mutex lock; /* Lock around heap and hash table. */
static Ns_Cond schedcond; /* Condition to wakeup SchedThread. */
static Ns_Cond eventcond; /* Condition to wakeup EventThread(s). */
static Event **queue; /* Heap priority queue (dynamically re-sized). */
static int nqueue; /* Number of events in queue. */
static int maxqueue; /* Max queue events (dynamically re-sized). */
static int running;
static int shutdownPending;
static Ns_Thread schedThread;
static int nThreads;
static int nIdleThreads;
static Event *threadEventPtr;
static Ns_Thread *eventThreads;
/*
* Macro to exchange two events in the heap, used in QueueEvent() and
* DeQueueEvent().
*/
#define EXCH(i,j) \
{\
Event *tmp = queue[(i)];\
queue[(i)] = queue[(j)], queue[(j)] = tmp;\
queue[(i)]->qid = (i), queue[(j)]->qid = (j);\
}
/*
*----------------------------------------------------------------------
*
* NsInitSched --
*
* Initialize scheduler API.
*
* Results:
* None.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
void
NsInitSched(void)
{
Ns_MutexInit(&lock);
Ns_MutexSetName(&lock, "ns:sched");
Tcl_InitHashTable(&eventsTable, TCL_ONE_WORD_KEYS);
}
/*
*----------------------------------------------------------------------
*
* Ns_After --
*
* Schedule a one-shot event.
*
* Results:
* Event id or NS_ERROR if delay is out of range.
*
* Side effects:
* See Ns_ScheduleProcEx().
*
*----------------------------------------------------------------------
*/
int
Ns_After(int delay, Ns_Callback *proc, void *arg, Ns_Callback *deleteProc)
{
if (delay < 0) {
return NS_ERROR;
}
return Ns_ScheduleProcEx((Ns_SchedProc *) proc, arg, NS_SCHED_ONCE,
delay, (Ns_SchedProc *) deleteProc);
}
/*
*----------------------------------------------------------------------
*
* Ns_ScheduleProc --
*
* Schedule a proc to run at a given interval.
*
* Results:
* Event id or NS_ERROR if interval is invalid.
*
* Side effects:
* See Ns_ScheduleProcEx().
*
*----------------------------------------------------------------------
*/
int
Ns_ScheduleProc(Ns_Callback *proc, void *arg, int thread, int interval)
{
if (interval < 0) {
return NS_ERROR;
}
return Ns_ScheduleProcEx((Ns_SchedProc *) proc, arg,
thread ? NS_SCHED_THREAD : 0, interval, NULL);
}
/*
*----------------------------------------------------------------------
*
* Ns_ScheduleDaily --
*
* Schedule a proc to run once a day.
*
* Results:
* Event id or NS_ERROR if hour and/or minute is out of range.
*
* Side effects:
* See Ns_ScheduleProcEx
*
*----------------------------------------------------------------------
*/
int
Ns_ScheduleDaily(Ns_SchedProc * proc, void *clientData, int flags,
int hour, int minute, Ns_SchedProc *cleanupProc)
{
int seconds;
if (hour > 23 ||
hour < 0 ||
minute > 59 ||
minute < 0) {
return NS_ERROR;
}
seconds = (hour * 3600) + (minute * 60);
return Ns_ScheduleProcEx(proc, clientData, flags | NS_SCHED_DAILY,
seconds, cleanupProc);
}
/*
*----------------------------------------------------------------------
*
* Ns_ScheduleWeekly --
*
* Schedule a proc to run once a week.
*
* Results:
* Event id or NS_ERROR if day, hour, and/or minute is out of range.
*
* Side effects:
* See Ns_ScheduleProcEx
*
*----------------------------------------------------------------------
*/
int
Ns_ScheduleWeekly(Ns_SchedProc * proc, void *clientData, int flags,
int day, int hour, int minute, Ns_SchedProc *cleanupProc)
{
int seconds;
if (day < 0 ||
day > 6 ||
hour > 23 ||
hour < 0 ||
minute > 59 ||
minute < 0) {
return NS_ERROR;
}
seconds = (((day * 24) + hour) * 3600) + (minute * 60);
return Ns_ScheduleProcEx(proc, clientData, flags | NS_SCHED_WEEKLY,
seconds, cleanupProc);
}
/*
*----------------------------------------------------------------------
*
* Ns_ScheduleProcEx --
*
* Schedule a proc to run at a given interval. The interpretation
* of interval (whether interative, daily, or weekly) is handled
* by QueueEvent.
*
* Results:
* Event id of NS_ERROR if interval is out of range.
*
* Side effects:
* Event is allocated, hashed, and queued.
*
*----------------------------------------------------------------------
*/
int
Ns_ScheduleProcEx(Ns_SchedProc *proc, void *arg, int flags,
int interval, Ns_SchedProc *deleteProc)
{
Event *ePtr;
int id, new;
static int nextId;
time_t now;
if (interval < 0) {
return NS_ERROR;
}
time(&now);
ePtr = ns_malloc(sizeof(Event));
ePtr->flags = flags;
ePtr->nextqueue = 0;
ePtr->lastqueue = ePtr->laststart = ePtr->lastend = -1;
ePtr->interval = interval;
ePtr->proc = proc;
ePtr->deleteProc = deleteProc;
ePtr->arg = arg;
Ns_MutexLock(&lock);
if (shutdownPending) {
id = NS_ERROR;
ns_free(ePtr);
} else {
do {
id = nextId++;
if (nextId < 0) {
nextId = 0;
}
ePtr->hPtr = Tcl_CreateHashEntry(&eventsTable, (char *) id, &new);
} while (!new);
Tcl_SetHashValue(ePtr->hPtr, ePtr);
ePtr->id = (unsigned int)id;
QueueEvent(ePtr, &now);
}
Ns_MutexUnlock(&lock);
return id;
}
/*
*----------------------------------------------------------------------
*
* Ns_Cancel, Ns_UnscheduleProc --
*
* Cancel a previously scheduled event.
*
* Results:
* Ns_UnscheduleProc: None.
* Ns_Cancel: 1 if cancelled, 0 otherwise.
*
* Side effects:
* See FreeEvent().
*
*----------------------------------------------------------------------
*/
void
Ns_UnscheduleProc(int id)
{
(void) Ns_Cancel(id);
}
int
Ns_Cancel(int id)
{
Tcl_HashEntry *hPtr = NULL;
Event *ePtr = NULL;
int cancelled;
cancelled = 0;
Ns_MutexLock(&lock);
if (!shutdownPending) {
hPtr = Tcl_FindHashEntry(&eventsTable, (char *) id);
if (hPtr != NULL) {
ePtr = Tcl_GetHashValue(hPtr);
Tcl_DeleteHashEntry(hPtr);
ePtr->hPtr = NULL;
if (ePtr->qid > 0) {
DeQueueEvent(ePtr->qid);
cancelled = 1;
}
}
}
Ns_MutexUnlock(&lock);
if (cancelled) {
FreeEvent(ePtr);
}
return cancelled;
}
/*
*----------------------------------------------------------------------
*
* Ns_Pause --
*
* Pause a schedule procedure.
*
* Results:
* 1 if proc paused, 0 otherwise.
*
* Side effects:
* Proc will not run at the next scheduled time.
*
*----------------------------------------------------------------------
*/
int
Ns_Pause(int id)
{
Tcl_HashEntry *hPtr;
Event *ePtr;
int paused;
paused = 0;
Ns_MutexLock(&lock);
if (!shutdownPending) {
hPtr = Tcl_FindHashEntry(&eventsTable, (char *) id);
if (hPtr != NULL) {
ePtr = Tcl_GetHashValue(hPtr);
if (!(ePtr->flags & NS_SCHED_PAUSED)) {
ePtr->flags |= NS_SCHED_PAUSED;
if (ePtr->qid > 0) {
DeQueueEvent(ePtr->qid);
}
paused = 1;
}
}
}
Ns_MutexUnlock(&lock);
return paused;
}
/*
*----------------------------------------------------------------------
*
* Ns_Resume --
*
* Resume a scheduled proc.
*
* Results:
* 1 if proc resumed, 0 otherwise.
*
* Side effects:
* Proc will be rescheduled.
*
*----------------------------------------------------------------------
*/
int
Ns_Resume(int id)
{
Tcl_HashEntry *hPtr;
Event *ePtr;
int resumed;
time_t now;
resumed = 0;
Ns_MutexLock(&lock);
if (!shutdownPending) {
hPtr = Tcl_FindHashEntry(&eventsTable, (char *) id);
if (hPtr != NULL) {
ePtr = Tcl_GetHashValue(hPtr);
if ((ePtr->flags & NS_SCHED_PAUSED)) {
ePtr->flags &= ~NS_SCHED_PAUSED;
time(&now);
QueueEvent(ePtr, &now);
resumed = 1;
}
}
}
Ns_MutexUnlock(&lock);
return resumed;
}
/*
*----------------------------------------------------------------------
*
* NsStartSchedShutdown, NsWaitSchedShutdown --
*
* Inititiate and then wait for sched shutdown.
*
* Results:
* None.
*
* Side effects:
* May timeout waiting for sched shutdown.
*
*----------------------------------------------------------------------
*/
void
NsStartSchedShutdown(void)
{
Ns_MutexLock(&lock);
if (running) {
Ns_Log(Notice, "sched: shutdown pending");
shutdownPending = 1;
Ns_CondSignal(&schedcond);
}
Ns_MutexUnlock(&lock);
}
void
NsWaitSchedShutdown(Ns_Time *toPtr)
{
int status;
Ns_MutexLock(&lock);
status = NS_OK;
while (status == NS_OK && running) {
status = Ns_CondTimedWait(&schedcond, &lock, toPtr);
}
Ns_MutexUnlock(&lock);
if (status != NS_OK) {
Ns_Log(Warning, "sched: timeout waiting for sched exit");
} else if (schedThread != NULL) {
Ns_ThreadJoin(&schedThread, NULL);
}
}
/*
*----------------------------------------------------------------------
*
* QueueEvent --
*
* Add an event to the priority queue heap.
*
* Results:
* None.
*
* Side effects:
* SchedThread() may be created and/or signalled.
*
*----------------------------------------------------------------------
*/
static void
QueueEvent(Event *ePtr, time_t *nowPtr)
{
struct tm *tp;
if (ePtr->flags & NS_SCHED_PAUSED) {
return;
}
/*
* Calculate the time from now in seconds this event should run.
*/
if (ePtr->flags & (NS_SCHED_DAILY | NS_SCHED_WEEKLY)) {
tp = ns_localtime(nowPtr);
tp->tm_sec = ePtr->interval;
tp->tm_hour = 0;
tp->tm_min = 0;
if (ePtr->flags & NS_SCHED_WEEKLY) {
tp->tm_mday -= tp->tm_wday;
}
ePtr->nextqueue = mktime(tp);
if (ePtr->nextqueue <= *nowPtr) {
tp->tm_mday += (ePtr->flags & NS_SCHED_WEEKLY) ? 7 : 1;
ePtr->nextqueue = mktime(tp);
}
} else {
ePtr->nextqueue = *nowPtr + ePtr->interval;
}
/*
* Place the new event at the end of the queue array and
* heap it up into place. The queue array is extended
* if necessary.
*/
ePtr->qid = ++nqueue;
if (maxqueue <= nqueue) {
maxqueue += 1000;
queue = ns_realloc(queue, (sizeof(Event *)) * (maxqueue + 1));
}
queue[nqueue] = ePtr;
if (nqueue > 1) {
int j, k;
k = nqueue;
j = k / 2;
while (k > 1 && queue[j]->nextqueue > queue[k]->nextqueue) {
EXCH(j, k);
k = j;
j = k / 2;
}
}
/*
* Signal or create the SchedThread if necessary.
*/
if (running) {
Ns_CondSignal(&schedcond);
} else {
running = 1;
Ns_ThreadCreate(SchedThread, NULL, 0, &schedThread);
}
}
/*
*----------------------------------------------------------------------
*
* DeQueueEvent --
*
* Remove an event from the priority queue heap.
*
* Results:
* Pointer to removed event.
*
* Side effects:
* None.
*
*----------------------------------------------------------------------
*/
static Event *
DeQueueEvent(int k)
{
Event *ePtr;
int j;
/*
* Swap out the event to be removed and heap down to restore the
* order of events to be fired.
*/
EXCH(k, nqueue);
ePtr = queue[nqueue--];
ePtr->qid = 0;
while ((j = 2 * k) <= nqueue) {
if (j < nqueue && queue[j]->nextqueue > queue[j + 1]->nextqueue) {
++j;
}
if (queue[j]->nextqueue > queue[k]->nextqueue) {
break;
}
EXCH(k, j);
k = j;
}
return ePtr;
}
/*
*----------------------------------------------------------------------
*
* EventThread --
*
* Run detached thread events.
*
* Results:
* None.
*
* Side effects:
* See FinishEvent().
*
*----------------------------------------------------------------------
*/
static void
EventThread(void *arg)
{
Event *ePtr;
char name[20], idle[20];
time_t now;
sprintf(idle, "-sched:idle%d-", (int) arg);
Ns_ThreadSetName(idle);
Ns_Log(Notice, "starting");
Ns_MutexLock(&lock);
while (1) {
while (threadEventPtr == NULL && !shutdownPending) {
Ns_CondWait(&eventcond, &lock);
}
if (threadEventPtr == NULL) {
break;
}
ePtr = threadEventPtr;
threadEventPtr = ePtr->nextPtr;
if (threadEventPtr != NULL) {
Ns_CondSignal(&eventcond);
}
--nIdleThreads;
Ns_MutexUnlock(&lock);
sprintf(name, "-sched:%u-", ePtr->id);
Ns_ThreadSetName(name);
(*ePtr->proc) (ePtr->arg, (int)ePtr->id);
Ns_ThreadSetName(idle);
time(&now);
Ns_MutexLock(&lock);
++nIdleThreads;
if (ePtr->hPtr == NULL) {
Ns_MutexUnlock(&lock);
FreeEvent(ePtr);
Ns_MutexLock(&lock);
} else {
ePtr->flags &= ~NS_SCHED_RUNNING;
ePtr->lastend = now;
QueueEvent(ePtr, &now);
}
}
Ns_MutexUnlock(&lock);
Ns_Log(Notice, "exiting");
}
/*
*----------------------------------------------------------------------
*
* FreeEvent --
*
* Free and event after run.
*
* Results:
* None.
*
* Side effects:
* Event is freed or re-queued.
*
*----------------------------------------------------------------------
*/
static void
FreeEvent(Event *ePtr)
{
if (ePtr->deleteProc != NULL) {
(*ePtr->deleteProc) (ePtr->arg, (int)ePtr->id);
}
ns_free(ePtr);
}
/*
*----------------------------------------------------------------------
*
* SchedThread --
*
* Detached thread to fire events on time.
*
* Results:
* None.
*
* Side effects:
* Depends on event procedures.
*
*----------------------------------------------------------------------
*/
static void
SchedThread(void *ignored)
{
Event *ePtr, *readyPtr;
time_t now;
Ns_Time timeout;
int elapsed;
Ns_Thread *joinThreads;
int nJoinThreads;
Ns_WaitForStartup();
Ns_ThreadSetName("-sched-");
Ns_Log(Notice, "sched: starting");
readyPtr = NULL;
Ns_MutexLock(&lock);
while (!shutdownPending) {
/*
* For events ready to run, either create a thread for
* detached events or add to a list of synchronous events.
*/
time(&now);
while (nqueue > 0 && queue[1]->nextqueue <= now) {
ePtr = DeQueueEvent(1);
if (ePtr->flags & NS_SCHED_ONCE) {
Tcl_DeleteHashEntry(ePtr->hPtr);
ePtr->hPtr = NULL;
}
ePtr->lastqueue = now;
if (ePtr->flags & NS_SCHED_THREAD) {
ePtr->flags |= NS_SCHED_RUNNING;
ePtr->laststart = now;
ePtr->nextPtr = threadEventPtr;
threadEventPtr = ePtr;
} else {
ePtr->nextPtr = readyPtr;
readyPtr = ePtr;
}
}
/*
* Dispatch any threaded events.
*/
if (threadEventPtr != NULL) {
if (nIdleThreads == 0) {
eventThreads = ns_realloc(eventThreads, sizeof(Ns_Thread) * (nThreads+1));
Ns_ThreadCreate(EventThread, (void *) nThreads, 0, &eventThreads[nThreads]);
++nIdleThreads;
++nThreads;
}
Ns_CondSignal(&eventcond);
}
/*
* Run and re-queue or free synchronous events.
*/
while ((ePtr = readyPtr) != NULL) {
readyPtr = ePtr->nextPtr;
ePtr->laststart = now;
ePtr->flags |= NS_SCHED_RUNNING;
Ns_MutexUnlock(&lock);
(*ePtr->proc) (ePtr->arg, (int)ePtr->id);
time(&now);
elapsed = (int) difftime(now, ePtr->laststart);
if (elapsed > nsconf.sched.maxelapsed) {
Ns_Log(Warning, "sched: "
"excessive time taken by proc %d (%d seconds)",
ePtr->id, elapsed);
}
if (ePtr->hPtr == NULL) {
FreeEvent(ePtr);
ePtr = NULL;
}
Ns_MutexLock(&lock);
if (ePtr != NULL) {
ePtr->flags &= ~NS_SCHED_RUNNING;
ePtr->lastend = now;
QueueEvent(ePtr, &now);
}
}
/*
* Wait for the next ready event.
*/
if (nqueue == 0) {
Ns_CondWait(&schedcond, &lock);
} else if (!shutdownPending) {
timeout.sec = queue[1]->nextqueue;
timeout.usec = 0;
(void) Ns_CondTimedWait(&schedcond, &lock, &timeout);
}
}
/*
* Wait for any detached event threads to exit
* and then cleanup the scheduler and signal
* shutdown complete.
*/
Ns_Log(Notice, "sched: shutdown started");
if (nThreads > 0) {
Ns_Log(Notice, "sched: waiting for event threads...");
Ns_CondBroadcast(&eventcond);
while (nThreads > 0) {
joinThreads = eventThreads;
nJoinThreads = nThreads;
eventThreads = NULL;
nThreads = 0;
Ns_MutexUnlock(&lock);
while (--nJoinThreads >= 0 ) {
Ns_ThreadJoin(&joinThreads[nJoinThreads], NULL);
}
ns_free(joinThreads);
Ns_MutexLock(&lock);
}
}
Ns_MutexUnlock(&lock);
while (nqueue > 0) {
FreeEvent(queue[nqueue--]);
}
ns_free(queue);
Tcl_DeleteHashTable(&eventsTable);
Ns_Log(Notice, "sched: shutdown complete");
Ns_MutexLock(&lock);
running = 0;
Ns_CondBroadcast(&schedcond);
Ns_MutexUnlock(&lock);
}
void
NsGetScheduled(Tcl_DString *dsPtr)
{
Tcl_HashEntry *hPtr;
Tcl_HashSearch search;
Event *ePtr;
time_t now;
char buf[100];
time(&now);
Ns_MutexLock(&lock);
hPtr = Tcl_FirstHashEntry(&eventsTable, &search);
while (hPtr != NULL) {
ePtr = Tcl_GetHashValue(hPtr);
Tcl_DStringStartSublist(dsPtr);
sprintf(buf, "%u %d %d %ld %ld %ld %ld",
ePtr->id, ePtr->flags, ePtr->interval, ePtr->nextqueue,
ePtr->lastqueue, ePtr->laststart, ePtr->lastend);
Tcl_DStringAppend(dsPtr, buf, -1);
Ns_GetProcInfo(dsPtr, (void *) ePtr->proc, ePtr->arg);
Tcl_DStringEndSublist(dsPtr);
hPtr = Tcl_NextHashEntry(&search);
}
Ns_MutexUnlock(&lock);
}
|
Back to SourceForge.net Powered by ViewCVS 1.0-dev |