source: trunk/src/wsock32/asyncthread.cpp@ 7146

Last change on this file since 7146 was 7146, checked in by sandervl, 24 years ago

fixes for sending FD_CLOSE notification when connection is closed at the remote end

File size: 9.1 KB
Line 
1/* $Id: asyncthread.cpp,v 1.13 2001-10-21 13:43:51 sandervl Exp $ */
2
3/*
4 * Async thread help functions
5 *
6 * Copyright 2000 Sander van Leeuwen (sandervl@xs4all.nl)
7 *
8 * TODO: Not everything is 100% thread safe (i.e. async parameter updates)
9 *
10 * Project Odin Software License can be found in LICENSE.TXT
11 *
12 */
13#define INCL_BASE
14#include <os2wrap.h>
15#include <os2sel.h>
16#include <stdlib.h>
17#include <wprocess.h>
18#include <win32api.h>
19#include <misc.h>
20#include <string.h>
21#include <vmutex.h>
22#include "wsock32.h"
23#include "asyncthread.h"
24
25static PASYNCTHREADPARM threadList = NULL;
26VMutex asyncThreadMutex;
27
28static void AddToQueue(PASYNCTHREADPARM pThreadParm);
29
30//******************************************************************************
31//******************************************************************************
32static void _Optlink AsyncThread(void *arg)
33{
34 PASYNCTHREADPARM pThreadParm = (PASYNCTHREADPARM)arg;
35
36 pThreadParm->asyncProc((PVOID)arg);
37
38//only for blocking hooks (currently not implemented)
39//// if(pThreadParm->request == ASYNC_BLOCKHOOK)
40//// WSASetBlocking(FALSE, pThreadParm->hThread);
41
42 dprintf(("AsyncThread %x socket %x exit", pThreadParm->hAsyncTaskHandle, pThreadParm->u.asyncselect.s));
43 free((PVOID)pThreadParm);
44}
45//******************************************************************************
46//******************************************************************************
47ULONG QueueAsyncJob(ASYNCTHREADPROC asyncproc, PASYNCTHREADPARM pThreadParm, BOOL fSetBlocking)
48{
49 TID tid;
50
51 dprintf(("QueueAsyncJob for socket %x", pThreadParm->u.asyncselect.s));
52
53 pThreadParm->asyncProc = asyncproc;
54 pThreadParm->fActive = TRUE;
55 pThreadParm->fCancelled = FALSE;
56 pThreadParm->next = NULL;
57 pThreadParm->hAsyncTaskHandle = 0;
58
59 pThreadParm->hThread = GetCurrentThread();
60 AddToQueue(pThreadParm);
61
62 if(fSetBlocking) WSASetBlocking(TRUE);
63
64 USHORT sel = GetFS();
65 tid = _beginthread(AsyncThread, NULL, 16384, (PVOID)pThreadParm);
66 SetFS(sel);
67 if (tid == -1) {
68 dprintf(("QueueAsyncJob: _beginthread failed"));
69 if(fSetBlocking) WSASetBlocking(FALSE);
70 RemoveFromQueue(pThreadParm);
71 WSASetLastError(WSAEFAULT);
72 return 0;
73 }
74 pThreadParm->hAsyncTaskHandle = tid;
75 WSASetLastError(NO_ERROR);
76 return pThreadParm->hAsyncTaskHandle;
77}
78//******************************************************************************
79//******************************************************************************
80void AddToQueue(PASYNCTHREADPARM pThreadParm)
81{
82 asyncThreadMutex.enter();
83 pThreadParm->next = threadList;
84 threadList = pThreadParm;
85 asyncThreadMutex.leave();
86}
87//******************************************************************************
88//******************************************************************************
89void RemoveFromQueue(PASYNCTHREADPARM pThreadParm)
90{
91 PASYNCTHREADPARM pThreadInfo;
92
93 asyncThreadMutex.enter();
94 pThreadInfo = threadList;
95
96 if(pThreadInfo == pThreadParm) {
97 threadList = pThreadParm->next;
98 }
99 else {
100 while(pThreadInfo->next) {
101 if(pThreadInfo->next == pThreadParm) {
102 pThreadInfo->next = pThreadParm->next;
103 break;
104 }
105 pThreadInfo = pThreadInfo->next;
106 }
107 if(pThreadInfo == NULL) {
108 dprintf(("RemoveFromQueue: parm %x not found!!", pThreadParm));
109 DebugInt3();
110 }
111 }
112 memset(pThreadParm, 0, sizeof(*pThreadParm));
113 asyncThreadMutex.leave();
114}
115//******************************************************************************
116//******************************************************************************
117void WSACancelAllAsyncRequests()
118{
119 PASYNCTHREADPARM pThreadInfo;
120 BOOL found = FALSE;
121
122 dprintf(("WSACancelAllAsyncRequests"));
123 asyncThreadMutex.enter();
124 pThreadInfo = threadList;
125
126 while(pThreadInfo) {
127 dprintf(("WSACancelAllAsyncRequests %x", pThreadInfo->hAsyncTaskHandle));
128 pThreadInfo->fCancelled = TRUE;
129 pThreadInfo->u.asyncselect.asyncSem->post();
130 pThreadInfo = pThreadInfo->next;
131 }
132 asyncThreadMutex.leave();
133 //TODO: not the right way to wait for the async threads to die
134 DosSleep(250);
135}
136//******************************************************************************
137//******************************************************************************
138int WIN32API WSACancelAsyncRequest(LHANDLE hAsyncTaskHandle)
139{
140 PASYNCTHREADPARM pThreadInfo;
141 BOOL found = FALSE;
142
143 dprintf(("WSACancelAsyncRequest: cancel task %x", hAsyncTaskHandle));
144 asyncThreadMutex.enter();
145 pThreadInfo = threadList;
146
147 while(pThreadInfo) {
148 if(pThreadInfo->hAsyncTaskHandle == hAsyncTaskHandle) {
149 pThreadInfo->fCancelled = TRUE;
150 found = TRUE;
151 break;
152 }
153 pThreadInfo = pThreadInfo->next;
154 }
155 asyncThreadMutex.leave();
156 if(found == FALSE) {
157 WSASetLastError(WSAEINVAL);
158 dprintf(("WSACancelAsyncRequest: task not found!!"));
159 }
160 return (found) ? NO_ERROR : SOCKET_ERROR;
161}
162//******************************************************************************
163//Only to cancel blocking hooks
164//******************************************************************************
165int WIN32API WSACancelBlockingCall()
166{
167 HANDLE hThread = GetCurrentThread();
168
169 dprintf(("WSACancelBlockingCall"));
170#if 0
171 asyncThreadMutex.enter();
172 pThreadInfo = threadList;
173
174 while(pThreadInfo) {
175 if(pThreadInfo->hThread == hThread) {
176 pThreadInfo->fCancelled = TRUE;
177
178 if(pThreadInfo->request == ASYNC_BLOCKHOOK) {
179 ret = so_cancel(pThreadInfo->blockedsocket);
180 }
181
182 found = TRUE;
183 break;
184 }
185 pThreadInfo = pThreadInfo->next;
186 }
187 asyncThreadMutex.leave();
188#endif
189 return SOCKET_ERROR;
190}
191//******************************************************************************
192//dump queue
193//******************************************************************************
194void DumpQueue(void)
195{
196 HANDLE hThread = GetCurrentThread();
197 PASYNCTHREADPARM pThreadInfo;
198
199 dprintf(("DumpQueue"));
200
201 asyncThreadMutex.enter();
202 pThreadInfo = threadList;
203
204 while(pThreadInfo) {
205 dprintf(( "SOCKET %08xh thread#%d events %xh pending %xh, select %d",
206 pThreadInfo->u.asyncselect.s,
207 pThreadInfo->hAsyncTaskHandle,
208 pThreadInfo->u.asyncselect.lEvents,
209 pThreadInfo->u.asyncselect.lEventsPending,
210 pThreadInfo->fWaitSelect));
211 pThreadInfo = pThreadInfo->next;
212 }
213 asyncThreadMutex.leave();
214 dprintf(("DumpQueue done"));
215}
216//******************************************************************************
217//Assumes caller owns async thread mutex!
218//******************************************************************************
219PASYNCTHREADPARM FindAsyncEvent(SOCKET s)
220{
221 PASYNCTHREADPARM pThreadInfo;
222
223 pThreadInfo = threadList;
224 while(pThreadInfo) {
225 if(pThreadInfo->u.asyncselect.s == s && !pThreadInfo->fRemoved) {
226 return pThreadInfo;
227 }
228 pThreadInfo = pThreadInfo->next;
229 }
230 return NULL;
231}
232//******************************************************************************
233//******************************************************************************
234BOOL FindAndSetAsyncEvent(SOCKET s, int mode, int notifyHandle, int notifyData, ULONG lEventMask)
235{
236 PASYNCTHREADPARM pThreadInfo;
237
238 asyncThreadMutex.enter();
239 pThreadInfo = FindAsyncEvent(s);
240 if(pThreadInfo) {
241 pThreadInfo->u.asyncselect.mode = mode;
242 pThreadInfo->u.asyncselect.lEvents = lEventMask;
243 pThreadInfo->u.asyncselect.lEventsPending = lEventMask;
244 pThreadInfo->notifyHandle = notifyHandle;
245 pThreadInfo->notifyData = notifyData;
246 if(lEventMask == 0) {
247 //make sure this thread isn't used anymore
248 pThreadInfo->fRemoved = TRUE;
249 }
250 if(pThreadInfo->fWaitSelect) {
251 //cancel pending select in async select thread (if any)
252 so_cancel(s);
253 }
254
255 //unblock async thread if it was waiting
256 pThreadInfo->u.asyncselect.asyncSem->post();
257 }
258 asyncThreadMutex.leave();
259 return(pThreadInfo != NULL);
260}
261//******************************************************************************
262//******************************************************************************
263void EnableAsyncEvent(SOCKET s, ULONG flags)
264{
265 PASYNCTHREADPARM pThreadInfo;
266
267 asyncThreadMutex.enter();
268 pThreadInfo = FindAsyncEvent(s);
269 if(pThreadInfo) {
270 pThreadInfo->u.asyncselect.lEventsPending |= (pThreadInfo->u.asyncselect.lEvents & flags);
271 //unblock async thread if it was waiting
272 pThreadInfo->u.asyncselect.asyncSem->post();
273
274 //cancel pending select in async select thread (if any)
275 if(pThreadInfo->fWaitSelect) {
276 so_cancel(s);
277 }
278 }
279 asyncThreadMutex.leave();
280}
281//******************************************************************************
282//******************************************************************************
283BOOL QueryAsyncEvent(SOCKET s, HWND *pHwnd, int *pMsg, ULONG *plEvent)
284{
285 PASYNCTHREADPARM pThreadInfo;
286
287 asyncThreadMutex.enter();
288 pThreadInfo = FindAsyncEvent(s);
289 if(pThreadInfo) {
290 *pHwnd = (HWND)pThreadInfo->notifyHandle;
291 *pMsg = pThreadInfo->notifyData;
292 *plEvent = pThreadInfo->u.asyncselect.lEvents;
293 }
294 asyncThreadMutex.leave();
295 return(pThreadInfo != NULL);
296}
297//******************************************************************************
298//******************************************************************************
Note: See TracBrowser for help on using the repository browser.