source: trunk/src/wsock32/asyncthread.cpp@ 10367

Last change on this file since 10367 was 9627, checked in by sandervl, 23 years ago

Better method to cancel all async requests in WSACleanup

File size: 10.1 KB
Line 
1/* $Id: asyncthread.cpp,v 1.16 2003-01-06 13:05:40 sandervl Exp $ */
2
3/*
4 * Async thread help functions
5 *
6 * Copyright 2000 Sander van Leeuwen (sandervl@xs4all.nl)
7 *
8 * TODO: Not everything is 100% thread safe (i.e. async parameter updates)
9 *
10 * Project Odin Software License can be found in LICENSE.TXT
11 *
12 */
13#define INCL_BASE
14#include <os2wrap.h>
15#include <os2sel.h>
16#include <stdlib.h>
17#include <wprocess.h>
18#include <win32api.h>
19#include <misc.h>
20#include <string.h>
21#include <vmutex.h>
22#include "wsock32.h"
23#include "asyncthread.h"
24
25static PASYNCTHREADPARM threadList = NULL;
26VMutex asyncThreadMutex;
27
28static void AddToQueue(PASYNCTHREADPARM pThreadParm);
29
30//******************************************************************************
31//******************************************************************************
32static void _Optlink AsyncThread(void *arg)
33{
34 PASYNCTHREADPARM pThreadParm = (PASYNCTHREADPARM)arg;
35#ifdef DEBUG
36 ULONG ulSocket = pThreadParm->u.asyncselect.s;
37 ULONG hTaskHandle = pThreadParm->hAsyncTaskHandle;
38#endif
39
40 pThreadParm->asyncProc((PVOID)arg);
41
42//only for blocking hooks (currently not implemented)
43//// if(pThreadParm->request == ASYNC_BLOCKHOOK)
44//// WSASetBlocking(FALSE, pThreadParm->hThread);
45
46 dprintf(("AsyncThread %x socket %x exit", hTaskHandle, ulSocket));
47 free((PVOID)pThreadParm);
48}
49//******************************************************************************
50//******************************************************************************
51ULONG QueueAsyncJob(ASYNCTHREADPROC asyncproc, PASYNCTHREADPARM pThreadParm, BOOL fSetBlocking)
52{
53 TID tid;
54
55 dprintf(("QueueAsyncJob for socket %x", pThreadParm->u.asyncselect.s));
56
57 pThreadParm->asyncProc = asyncproc;
58 pThreadParm->fActive = TRUE;
59 pThreadParm->fCancelled = FALSE;
60 pThreadParm->next = NULL;
61 pThreadParm->hAsyncTaskHandle = 0;
62
63 pThreadParm->hThread = GetCurrentThread();
64 AddToQueue(pThreadParm);
65
66 if(fSetBlocking) WSASetBlocking(TRUE);
67
68 USHORT sel = GetFS();
69 tid = _beginthread(AsyncThread, NULL, 16384, (PVOID)pThreadParm);
70 SetFS(sel);
71 if (tid == -1) {
72 dprintf(("QueueAsyncJob: _beginthread failed"));
73 if(fSetBlocking) WSASetBlocking(FALSE);
74 RemoveFromQueue(pThreadParm);
75 WSASetLastError(WSAEFAULT);
76 return 0;
77 }
78 pThreadParm->hAsyncTaskHandle = tid;
79 WSASetLastError(NO_ERROR);
80 return pThreadParm->hAsyncTaskHandle;
81}
82//******************************************************************************
83//******************************************************************************
84void AddToQueue(PASYNCTHREADPARM pThreadParm)
85{
86 asyncThreadMutex.enter();
87 pThreadParm->next = threadList;
88 threadList = pThreadParm;
89 asyncThreadMutex.leave();
90}
91//******************************************************************************
92//******************************************************************************
93void RemoveFromQueue(PASYNCTHREADPARM pThreadParm)
94{
95 PASYNCTHREADPARM pThreadInfo;
96
97 asyncThreadMutex.enter();
98 pThreadInfo = threadList;
99
100 if(pThreadInfo == pThreadParm) {
101 threadList = pThreadParm->next;
102 }
103 else {
104 while(pThreadInfo->next) {
105 if(pThreadInfo->next == pThreadParm) {
106 pThreadInfo->next = pThreadParm->next;
107 break;
108 }
109 pThreadInfo = pThreadInfo->next;
110 }
111 if(pThreadInfo == NULL) {
112 dprintf(("RemoveFromQueue: parm %x not found!!", pThreadParm));
113 DebugInt3();
114 }
115 }
116 memset(pThreadParm, 0, sizeof(*pThreadParm));
117 asyncThreadMutex.leave();
118}
119//******************************************************************************
120//******************************************************************************
121void WSACancelAllAsyncRequests()
122{
123 PASYNCTHREADPARM pThreadInfo;
124
125 dprintf(("WSACancelAllAsyncRequests"));
126 asyncThreadMutex.enter();
127 pThreadInfo = threadList;
128
129 while(pThreadInfo) {
130 dprintf(("WSACancelAllAsyncRequests %x", pThreadInfo->hAsyncTaskHandle));
131 pThreadInfo->fCancelled = TRUE;
132 pThreadInfo->u.asyncselect.asyncSem->post();
133 pThreadInfo = pThreadInfo->next;
134 }
135 asyncThreadMutex.leave();
136}
137//******************************************************************************
138//******************************************************************************
139void WSAWaitForAllAsyncRequests()
140{
141 int wait = 0;
142
143 dprintf(("WSAWaitForAllAsyncRequests"));
144
145 //We don't want to wait forever until all async request threads have died
146 //(just in case something goes wrong in select())
147 //So wait up to one second for all threads to end.
148 while(threadList && wait < 1000) {
149 DosSleep(20);
150 wait += 20;
151 }
152}
153//******************************************************************************
154//******************************************************************************
155int WIN32API WSACancelAsyncRequest(LHANDLE hAsyncTaskHandle)
156{
157 PASYNCTHREADPARM pThreadInfo;
158 BOOL found = FALSE;
159
160 dprintf(("WSACancelAsyncRequest: cancel task %x", hAsyncTaskHandle));
161 asyncThreadMutex.enter();
162 pThreadInfo = threadList;
163
164 while(pThreadInfo) {
165 if(pThreadInfo->hAsyncTaskHandle == hAsyncTaskHandle) {
166 pThreadInfo->fCancelled = TRUE;
167 found = TRUE;
168 break;
169 }
170 pThreadInfo = pThreadInfo->next;
171 }
172 asyncThreadMutex.leave();
173 if(found == FALSE) {
174 WSASetLastError(WSAEINVAL);
175 dprintf(("WSACancelAsyncRequest: task not found!!"));
176 }
177 return (found) ? NO_ERROR : SOCKET_ERROR;
178}
179//******************************************************************************
180//Only to cancel blocking hooks
181//******************************************************************************
182int WIN32API WSACancelBlockingCall()
183{
184 HANDLE hThread = GetCurrentThread();
185
186 dprintf(("WSACancelBlockingCall"));
187#if 0
188 asyncThreadMutex.enter();
189 pThreadInfo = threadList;
190
191 while(pThreadInfo) {
192 if(pThreadInfo->hThread == hThread) {
193 pThreadInfo->fCancelled = TRUE;
194
195 if(pThreadInfo->request == ASYNC_BLOCKHOOK) {
196 ret = so_cancel(pThreadInfo->blockedsocket);
197 }
198
199 found = TRUE;
200 break;
201 }
202 pThreadInfo = pThreadInfo->next;
203 }
204 asyncThreadMutex.leave();
205#endif
206 return SOCKET_ERROR;
207}
208//******************************************************************************
209//dump queue
210//******************************************************************************
211void DumpQueue(void)
212{
213 HANDLE hThread = GetCurrentThread();
214 PASYNCTHREADPARM pThreadInfo;
215
216 dprintf(("DumpQueue"));
217
218 asyncThreadMutex.enter();
219 pThreadInfo = threadList;
220
221 while(pThreadInfo) {
222 dprintf(( "SOCKET %08xh thread#%d events %xh pending %xh, select %d",
223 pThreadInfo->u.asyncselect.s,
224 pThreadInfo->hAsyncTaskHandle,
225 pThreadInfo->u.asyncselect.lEvents,
226 pThreadInfo->u.asyncselect.lEventsPending,
227 pThreadInfo->fWaitSelect));
228 pThreadInfo = pThreadInfo->next;
229 }
230 asyncThreadMutex.leave();
231 dprintf(("DumpQueue done"));
232}
233//******************************************************************************
234//Assumes caller owns async thread mutex!
235//******************************************************************************
236PASYNCTHREADPARM FindAsyncEvent(SOCKET s)
237{
238 PASYNCTHREADPARM pThreadInfo;
239
240 pThreadInfo = threadList;
241 while(pThreadInfo) {
242 if(pThreadInfo->u.asyncselect.s == s && !pThreadInfo->fRemoved) {
243 return pThreadInfo;
244 }
245 pThreadInfo = pThreadInfo->next;
246 }
247 return NULL;
248}
249//******************************************************************************
250//******************************************************************************
251BOOL FindAndSetAsyncEvent(SOCKET s, int mode, int notifyHandle, int notifyData, ULONG lEventMask)
252{
253 PASYNCTHREADPARM pThreadInfo;
254
255 asyncThreadMutex.enter();
256 pThreadInfo = FindAsyncEvent(s);
257 if(pThreadInfo)
258 {
259 int size, state, tmp;
260 state = ioctl(s, FIOBSTATUS, (char *)&tmp, sizeof(tmp));
261 dprintf(("FindAndSetAsyncEvent: state %x", state));
262
263 //Don't send FD_CONNECT is socket was already connected (accept returns connected socket)
264 if(state & SS_ISCONNECTED) {
265 pThreadInfo->fConnected = TRUE;
266 }
267 else pThreadInfo->fConnected = FALSE;
268
269 pThreadInfo->u.asyncselect.mode = mode;
270 pThreadInfo->u.asyncselect.lEvents = lEventMask;
271 pThreadInfo->u.asyncselect.lEventsPending = lEventMask;
272 pThreadInfo->notifyHandle = notifyHandle;
273 pThreadInfo->notifyData = notifyData;
274 if(lEventMask == 0) {
275 //make sure this thread isn't used anymore
276 pThreadInfo->fRemoved = TRUE;
277 }
278 if(pThreadInfo->fWaitSelect) {
279 //cancel pending select in async select thread (if any)
280 so_cancel(s);
281 }
282
283 //unblock async thread if it was waiting
284 pThreadInfo->u.asyncselect.asyncSem->post();
285 }
286 asyncThreadMutex.leave();
287 return(pThreadInfo != NULL);
288}
289//******************************************************************************
290//******************************************************************************
291void EnableAsyncEvent(SOCKET s, ULONG flags)
292{
293 PASYNCTHREADPARM pThreadInfo;
294
295 asyncThreadMutex.enter();
296 pThreadInfo = FindAsyncEvent(s);
297 if(pThreadInfo) {
298 pThreadInfo->u.asyncselect.lEventsPending |= (pThreadInfo->u.asyncselect.lEvents & flags);
299 //unblock async thread if it was waiting
300 pThreadInfo->u.asyncselect.asyncSem->post();
301
302 //cancel pending select in async select thread (if any)
303 if(pThreadInfo->fWaitSelect) {
304 so_cancel(s);
305 }
306 }
307 asyncThreadMutex.leave();
308}
309//******************************************************************************
310//******************************************************************************
311BOOL QueryAsyncEvent(SOCKET s, int *pMode, ULONG *pNofityHandle, ULONG *pNofityData,
312 ULONG *plEvent)
313{
314 PASYNCTHREADPARM pThreadInfo;
315
316 asyncThreadMutex.enter();
317 pThreadInfo = FindAsyncEvent(s);
318 if(pThreadInfo) {
319 *pMode = pThreadInfo->u.asyncselect.mode;
320 *pNofityHandle = pThreadInfo->notifyHandle;
321 *pNofityData = pThreadInfo->notifyData;
322 *plEvent = pThreadInfo->u.asyncselect.lEvents;
323 }
324 asyncThreadMutex.leave();
325 return(pThreadInfo != NULL);
326}
327//******************************************************************************
328//******************************************************************************
Note: See TracBrowser for help on using the repository browser.