source: trunk/src/kernel32/overlappedio.cpp@ 8644

Last change on this file since 8644 was 8644, checked in by sandervl, 23 years ago

Overlapped IO bugfixes

File size: 22.4 KB
Line 
1/* $Id: overlappedio.cpp,v 1.16 2002-06-11 12:51:43 sandervl Exp $ */
2
3/*
4 * Win32 overlapped IO class
5 *
6 * Copyright 2001 Sander van Leeuwen <sandervl@xs4all.nl>
7 *
8 * Project Odin Software License can be found in LICENSE.TXT
9 *
10 */
11
12
13
14#include <os2win.h>
15#include <string.h>
16#include <handlemanager.h>
17#include <heapstring.h>
18#include <overlappedio.h>
19#include "oslibdos.h"
20
21#define DBG_LOCALLOG DBG_overlappedio
22#include "dbglocal.h"
23
24
25//******************************************************************************
26//******************************************************************************
27OverlappedIOHandler::OverlappedIOHandler(LPOVERLAPPED_HANDLER lpReadHandler,
28 LPOVERLAPPED_HANDLER lpWriteHandler,
29 LPOVERLAPPED_HANDLER lpPollHandler,
30 BOOL fFullDuplex) :
31 hThreadRead(0), hThreadWrite(0), hThreadPoll(0)
32{
33 OverlappedIOError errcode = OutOfMemory;
34
35 this->fFullDuplex = fFullDuplex;
36
37 if(lpReadHandler == NULL) {
38 throw(InvalidParameter);
39 }
40
41 pending[ASYNC_INDEX_READ] = pending[ASYNC_INDEX_WRITE] = NULL;
42 pending [ASYNC_INDEX_POLL] = pending [ASYNC_INDEX_BUSY] = NULL;
43
44 this->lpReadHandler = lpReadHandler;
45 this->lpWriteHandler = lpWriteHandler;
46 this->lpPollHandler = lpPollHandler;
47
48 ::InitializeCriticalSection(&critsect);
49 //poll, read & write event semaphores are auto-reset (one thread wakes up
50 //after a SetEvent call)
51 hEventPoll = ::CreateEventA(NULL, FALSE, FALSE, NULL);
52 hEventRead = ::CreateEventA(NULL, FALSE, FALSE, NULL);
53 hEventWrite = ::CreateEventA(NULL, FALSE, FALSE, NULL);
54
55 dprintf(("OverlappedIOThread: hEventRead %x hEventWrite %x hEventPoll %x", hEventRead, hEventWrite, hEventPoll));
56
57 //the exit & cancel event semaphores are manual reset, because these events
58 //must be able to wake up multiple threads
59 hEventExit = ::CreateEventA(NULL, TRUE, FALSE, NULL);
60 hEventCancel = ::CreateEventA(NULL, TRUE, FALSE, NULL);
61 if(!hEventPoll || !hEventRead || !hEventWrite || !hEventExit || !hEventCancel)
62 {
63 DebugInt3();
64 errcode = EventCreationFailed;
65 goto failed;
66 }
67
68 DWORD dwThreadId;
69 LPOVERLAPPED_THREAD_PARAM threadparam;
70
71 dwAsyncType = (lpWriteHandler && fFullDuplex) ? ASYNCIO_READ : ASYNCIO_READWRITE;
72 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
73 if(!threadparam) goto outofmem;
74 threadparam->dwOperation = dwAsyncType;
75 threadparam->lpOverlappedObj = this;
76 hThreadRead = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
77
78 if(lpWriteHandler && fFullDuplex) {
79 dwAsyncType |= ASYNCIO_WRITE;
80
81 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
82 if(!threadparam) goto outofmem;
83 threadparam->dwOperation = ASYNCIO_WRITE;
84 threadparam->lpOverlappedObj = this;
85 hThreadWrite = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
86 }
87
88 if(lpPollHandler) {
89 dwAsyncType |= ASYNCIO_POLL;
90
91 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
92 if(!threadparam) goto outofmem;
93 threadparam->dwOperation = ASYNCIO_POLL;
94 threadparam->lpOverlappedObj = this;
95 hThreadPoll = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
96 }
97
98 if((lpPollHandler && !hThreadPoll) || !hThreadRead || (lpWriteHandler && fFullDuplex && !hThreadWrite))
99 {
100 DebugInt3();
101 errcode = ThreadCreationFailed;
102 goto failed;
103 }
104 return;
105
106outofmem:
107 errcode = OutOfMemory;
108 //fall through
109failed:
110 //SvL: NOTE: We might not fail gracefully when threads have already been
111 // created. (thread accessing memory that has been freed)
112 // Don't feel like wasting time to fix this as this should never
113 // happen anyway.
114 if(hEventExit) {
115 ::SetEvent(hEventExit);
116 ::CloseHandle(hEventExit);
117 }
118
119 if(hEventRead) ::CloseHandle(hEventRead);
120 if(hEventWrite) ::CloseHandle(hEventWrite);
121 if(hEventPoll) ::CloseHandle(hEventPoll);
122
123 if(hThreadRead) ::CloseHandle(hThreadRead);
124 if(hThreadPoll) ::CloseHandle(hThreadPoll);
125 if(hThreadWrite) ::CloseHandle(hThreadWrite);
126 ::DeleteCriticalSection(&critsect);
127
128 throw(errcode);
129}
130//******************************************************************************
131//******************************************************************************
132OverlappedIOHandler::~OverlappedIOHandler()
133{
134 dprintf(("~OverlappedIOHandler: signalling overlapped threads"));
135 ::SetEvent(hEventExit);
136
137 ::CloseHandle(hEventExit);
138 ::CloseHandle(hEventRead);
139 ::CloseHandle(hEventWrite);
140 ::CloseHandle(hEventPoll);
141
142 ::CloseHandle(hThreadRead);
143 if(hThreadPoll) ::CloseHandle(hThreadPoll);
144 if(hThreadWrite) ::CloseHandle(hThreadWrite);
145
146 DeleteCriticalSection(&critsect);
147}
148//******************************************************************************
149//******************************************************************************
150DWORD CALLBACK OverlappedIOThread(LPVOID lpThreadParam)
151{
152 LPOVERLAPPED_THREAD_PARAM threadparam = (LPOVERLAPPED_THREAD_PARAM)lpThreadParam;
153 DWORD dwOperation;
154 OverlappedIOHandler *lpOverlappedObj;
155
156 if(threadparam == NULL || threadparam->lpOverlappedObj == NULL) {
157 DebugInt3();
158 return 0;
159 }
160 lpOverlappedObj = threadparam->lpOverlappedObj;
161 dwOperation = threadparam->dwOperation;
162 //free thread parameter first
163 free(threadparam);
164
165 return lpOverlappedObj->threadHandler(dwOperation);
166}
167//******************************************************************************
168//******************************************************************************
169DWORD OverlappedIOHandler::threadHandler(DWORD dwOperation)
170{
171 LPASYNCIOREQUEST lpRequest;
172 LPOVERLAPPED lpOverlapped;
173 HANDLE hEvents[2];
174 HANDLE hEventsWait[2];
175 HANDLE hHandle;
176 DWORD ret, dwTimeOut, dwResult;
177 int index;
178
179 dprintf(("OverlappedIOThread: started for event %d", dwOperation));
180 switch(dwOperation) {
181 case ASYNCIO_READ:
182 case ASYNCIO_READWRITE:
183 hEvents[0] = hEventRead;
184 index = ASYNC_INDEX_READ;
185 break;
186
187 case ASYNCIO_WRITE:
188 hEvents[0] = hEventWrite;
189 index = ASYNC_INDEX_WRITE;
190 break;
191
192 case ASYNCIO_POLL:
193 hEvents[0] = hEventPoll;
194 index = ASYNC_INDEX_POLL;
195 break;
196 default:
197 DebugInt3();
198 }
199 hEvents[1] = hEventExit;
200
201 while(TRUE)
202 {
203 ret = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
204 if(ret == WAIT_FAILED) {
205 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
206 break;
207 }
208 //if hEventExit has been signalled, then we are told to exit
209 if(ret == (WAIT_OBJECT_0+1)) {
210 dprintf(("end of threadHandler signalled"));
211 break;
212 }
213 //process all pending jobs
214 while(TRUE)
215 {
216 ::EnterCriticalSection(&critsect);
217 if(pending[index] == NULL) {
218 ::LeaveCriticalSection(&critsect);
219 break;
220 }
221 lpRequest = pending[index];
222 pending[index] = lpRequest->next;
223
224 //add to the head of process list
225 lpRequest->next = pending[ASYNC_INDEX_BUSY];
226 pending[ASYNC_INDEX_BUSY] = lpRequest;
227 ::LeaveCriticalSection(&critsect);
228
229 lpOverlapped = lpRequest->lpOverlapped;;
230 hHandle = lpRequest->hHandle;
231
232#ifdef DEBUG
233 switch(lpRequest->dwAsyncType) {
234 case ASYNCIO_READ:
235 case ASYNCIO_WRITE:
236 case ASYNCIO_POLL:
237 break;
238 default:
239 DebugInt3();
240 break;
241 }
242#endif
243 switch(dwOperation) {
244 case ASYNCIO_READ:
245 case ASYNCIO_READWRITE:
246 case ASYNCIO_WRITE:
247 if(lpRequest->dwAsyncType == ASYNCIO_READ || lpWriteHandler == NULL) {
248 lpRequest->dwLastError = lpReadHandler(lpRequest, &dwResult, NULL);
249 }
250 else lpRequest->dwLastError = lpWriteHandler(lpRequest, &dwResult, NULL);
251
252 if(!lpRequest->fCancelled)
253 {
254 lpOverlapped->Internal = lpRequest->dwLastError;
255 lpOverlapped->InternalHigh = dwResult;
256
257 //must NOT store result in specified location!!! (stack corruption)
258 //if(lpRequest->lpdwResult) {
259 // *lpRequest->lpdwResult = dwResult;
260 //}
261#ifdef DEBUG
262 if(lpRequest->dwAsyncType == ASYNCIO_READ) {
263 dprintf(("ASYNCIO_READ %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
264 }
265 else dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
266#endif
267 //wake up user thread
268 ::SetEvent(lpOverlapped->hEvent);
269 }
270 break;
271
272 case ASYNCIO_POLL:
273 hEventsWait[0] = hEventCancel;
274 hEventsWait[1] = hEventExit;
275 ret = WAIT_TIMEOUT;
276 while(TRUE)
277 {
278 dwTimeOut = 0;
279 lpRequest->dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
280 if(lpRequest->dwLastError != ERROR_IO_PENDING) {
281 break;
282 }
283 if(dwTimeOut == 0) {
284 dprintf(("!ERROR!: lpPollHandler returned timeout 0!!"));
285 DebugInt3();
286 break;
287 }
288 //sleep a while to avoid wasting too many cpu cycles; we are woken up when a timeout occurs,
289 //when the operation is cancelled or when the process exits
290 ret = WaitForMultipleObjects(2, hEventsWait, FALSE, dwTimeOut);
291 if(ret != WAIT_TIMEOUT) {
292 dprintf(("ASYNCIO_POLL: WaitForSingleObject didn't time out, abort (ret = %x)", ret));
293 break;
294 }
295 }
296 //Don't access the overlapped & result memory when CancelIo was used to cancel the operation
297 if(ret == WAIT_TIMEOUT && !lpRequest->fCancelled)
298 {
299 dprintf(("ASYNCIO_POLL %x: result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
300 lpOverlapped->Internal = lpRequest->dwLastError;
301 lpOverlapped->InternalHigh = dwResult;
302 if(lpRequest->lpdwResult) {
303 *lpRequest->lpdwResult = dwResult;
304 }
305 //wake up user thread
306 ::SetEvent(lpOverlapped->hEvent);
307 }
308 break;
309 }
310 //remove from in-process list and delete async request object
311 removeRequest(ASYNC_INDEX_BUSY, lpRequest);
312 delete lpRequest;
313 } //while(TRUE)
314 }
315 return 0;
316}
317//******************************************************************************
318//******************************************************************************
319BOOL OverlappedIOHandler::WriteFile(HANDLE hHandle,
320 LPCVOID lpBuffer,
321 DWORD nNumberOfBytesToWrite,
322 LPDWORD lpNumberOfBytesWritten,
323 LPOVERLAPPED lpOverlapped,
324 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
325 DWORD dwUserData,
326 DWORD dwTimeOut)
327{
328 LPASYNCIOREQUEST lpRequest, current;
329 int index;
330
331 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
332 ::SetLastError(ERROR_INVALID_PARAMETER);
333 return FALSE;
334 }
335
336 lpRequest = new ASYNCIOREQUEST;
337 if(lpRequest == NULL) {
338 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
339 return FALSE;
340 }
341 lpRequest->dwAsyncType = ASYNCIO_WRITE;
342 lpRequest->hHandle = hHandle;
343 lpRequest->lpBuffer = lpBuffer;
344 lpRequest->nNumberOfBytes = nNumberOfBytesToWrite;
345 //must NOT store result in specified location!!! (stack corruption)
346//// lpRequest->lpdwResult = lpNumberOfBytesWritten;
347 lpRequest->lpdwResult = NULL;
348 lpRequest->lpOverlapped = lpOverlapped;
349 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
350 lpRequest->dwUserData = dwUserData;
351 lpRequest->dwTimeOut = dwTimeOut;
352 lpRequest->next = NULL;
353
354 if(dwAsyncType & ASYNCIO_READWRITE) {
355 index = ASYNC_INDEX_READ;
356 }
357 else index = ASYNC_INDEX_WRITE;
358
359 addRequest(index, lpRequest);
360
361 lpOverlapped->Internal = STATUS_PENDING;
362 lpOverlapped->InternalHigh = 0;
363 //reset overlapped semaphore to non-signalled
364 ::ResetEvent(lpOverlapped->hEvent);
365
366 //wake up async thread
367 ::SetEvent((dwAsyncType & ASYNCIO_READWRITE) ? hEventRead : hEventWrite);
368
369 ::SetLastError(ERROR_IO_PENDING);
370 return FALSE;
371}
372//******************************************************************************
373//******************************************************************************
374BOOL OverlappedIOHandler::ReadFile(HANDLE hHandle,
375 LPCVOID lpBuffer,
376 DWORD nNumberOfBytesToRead,
377 LPDWORD lpNumberOfBytesRead,
378 LPOVERLAPPED lpOverlapped,
379 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
380 DWORD dwUserData,
381 DWORD dwTimeOut)
382{
383 LPASYNCIOREQUEST lpRequest, current;
384
385 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
386 ::SetLastError(ERROR_INVALID_PARAMETER);
387 return FALSE;
388 }
389
390 lpRequest = new ASYNCIOREQUEST;
391 if(lpRequest == NULL) {
392 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
393 return FALSE;
394 }
395 lpRequest->dwAsyncType = ASYNCIO_READ;
396 lpRequest->hHandle = hHandle;
397 lpRequest->lpBuffer = lpBuffer;
398 lpRequest->nNumberOfBytes = nNumberOfBytesToRead;
399 //must NOT store result in specified location!!! (stack corruption)
400//// lpRequest->lpdwResult = lpNumberOfBytesRead;
401 lpRequest->lpdwResult = NULL;
402 lpRequest->lpOverlapped = lpOverlapped;
403 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
404 lpRequest->dwUserData = dwUserData;
405 lpRequest->dwTimeOut = dwTimeOut;
406 lpRequest->next = NULL;
407
408 addRequest(ASYNC_INDEX_READ, lpRequest);
409
410 lpOverlapped->Internal = STATUS_PENDING;
411 lpOverlapped->InternalHigh = 0;
412 //reset overlapped semaphore to non-signalled
413 ::ResetEvent(lpOverlapped->hEvent);
414
415 //wake up async thread
416 ::SetEvent(hEventRead);
417 ::SetLastError(ERROR_IO_PENDING);
418 return FALSE;
419}
420//******************************************************************************
421//******************************************************************************
422BOOL OverlappedIOHandler::WaitForEvent(HANDLE hHandle,
423 DWORD dwEventMask,
424 LPDWORD lpfdwEvtMask,
425 LPOVERLAPPED lpOverlapped,
426 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
427 DWORD dwUserData,
428 DWORD dwTimeOut)
429{
430 LPASYNCIOREQUEST lpRequest, current;
431 DWORD dwLastError, dwResult;
432
433 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
434 ::SetLastError(ERROR_INVALID_PARAMETER);
435 return FALSE;
436 }
437 if(!lpPollHandler) {
438 DebugInt3();
439 ::SetLastError(ERROR_INVALID_PARAMETER);
440 return FALSE;
441 }
442
443 lpRequest = new ASYNCIOREQUEST;
444 if(lpRequest == NULL) {
445 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
446 return FALSE;
447 }
448 lpRequest->dwAsyncType = ASYNCIO_POLL;
449 lpRequest->hHandle = hHandle;
450 lpRequest->lpBuffer = NULL;
451 lpRequest->nNumberOfBytes = 0;
452 //must store result also in specified location
453 lpRequest->lpdwResult = lpfdwEvtMask;
454 lpRequest->lpOverlapped = lpOverlapped;
455 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
456 lpRequest->dwUserData = dwUserData;
457 lpRequest->dwTimeOut = dwTimeOut;
458 lpRequest->dwEventMask = dwEventMask;
459 lpRequest->next = NULL;
460
461 lpOverlapped->Internal = STATUS_PENDING;
462 lpOverlapped->InternalHigh = 0;
463 //reset overlapped semaphore to non-signalled
464 ::ResetEvent(lpOverlapped->hEvent);
465
466 //first check if the event has already occured; if so, return result
467 //immediately
468 dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
469 if(dwLastError != ERROR_IO_PENDING)
470 {
471 dprintf(("OverlappedIOHandler::WaitForEvent %x: result %x, last error %d", lpOverlapped, dwResult, dwLastError));
472 lpOverlapped->Internal = dwLastError;
473 lpOverlapped->InternalHigh = dwResult;
474 if(lpfdwEvtMask) {
475 *lpfdwEvtMask = dwResult;
476 }
477 //wake up user thread
478 ::SetEvent(lpOverlapped->hEvent);
479
480 delete lpRequest;
481 ::SetLastError(dwLastError);
482 return (dwLastError == ERROR_SUCCESS);
483 }
484
485 addRequest(ASYNC_INDEX_POLL, lpRequest);
486
487 //wake up async thread
488 ::SetEvent(hEventPoll);
489 ::SetLastError(ERROR_IO_PENDING);
490 return FALSE;
491}
492//******************************************************************************
493//******************************************************************************
494BOOL OverlappedIOHandler::CancelIo(HANDLE hHandle)
495{
496 LPASYNCIOREQUEST lpRequest;
497
498 for(int i=ASYNC_INDEX_READ;i<NR_ASYNC_OPERATIONS;i++)
499 {
500 while(TRUE)
501 {
502 lpRequest = findAndRemoveRequest(i, hHandle);
503
504 if(lpRequest) {
505 //TODO: This doesn't work if multiple handles share the
506 // same OverlappedIOHandler
507 lpRequest->fCancelled = TRUE;
508 ::SetEvent(hEventCancel); //cancel pending operation
509 if(i != ASYNC_INDEX_BUSY) {//thread that handles the request will delete it
510 delete lpRequest;
511 }
512 }
513 else break;
514 }
515 }
516 //TODO: return error if there were no pending requests???
517 ::SetLastError(ERROR_SUCCESS);
518 return TRUE;
519}
520//******************************************************************************
521//******************************************************************************
522BOOL OverlappedIOHandler::GetOverlappedResult(HANDLE hHandle,
523 LPOVERLAPPED lpOverlapped,
524 LPDWORD lpcbTransfer,
525 BOOL fWait)
526{
527 DWORD ret;
528
529 ret = ::WaitForSingleObject(lpOverlapped->hEvent, (fWait) ? INFINITE : 0);
530
531 if(lpcbTransfer)
532 *lpcbTransfer = lpOverlapped->InternalHigh;
533
534 ::SetLastError(lpOverlapped->Internal);
535
536 dprintf(("GetOverlappedResult %x -> result %d last error %d", hHandle, lpOverlapped->InternalHigh, lpOverlapped->Internal));
537 return (ret == WAIT_OBJECT_0);
538}
539//******************************************************************************
540//******************************************************************************
541LPASYNCIOREQUEST OverlappedIOHandler::findAndRemoveRequest(int index, HANDLE hHandle)
542{
543 LPASYNCIOREQUEST lpRequest, lpFound = NULL;
544
545 ::EnterCriticalSection(&critsect);
546 if(pending[index])
547 {
548 if(pending[index]->hHandle != hHandle)
549 {
550 lpRequest = pending[index];
551 while(lpRequest->next) {
552 if(lpRequest->next->hHandle == hHandle) {
553 lpFound = lpRequest->next;
554 lpRequest->next = lpFound->next;
555 break;
556 }
557 lpRequest = lpRequest->next;
558 }
559 }
560 else {
561 lpFound = pending[index];
562 pending[index] = lpFound->next;
563 }
564 }
565 ::LeaveCriticalSection(&critsect);
566 return lpFound;
567}
568//******************************************************************************
569//******************************************************************************
570void OverlappedIOHandler::addRequest(int index, LPASYNCIOREQUEST lpRequest)
571{
572 LPASYNCIOREQUEST current;
573
574 ::EnterCriticalSection(&critsect);
575 if(pending[index]) {
576 current = pending[index];
577 while(current->next) {
578 current = current->next;
579 }
580 current->next = lpRequest;
581 }
582 else pending[index] = lpRequest;
583 ::LeaveCriticalSection(&critsect);
584}
585//******************************************************************************
586//******************************************************************************
587void OverlappedIOHandler::removeRequest(int index, LPASYNCIOREQUEST lpRequest)
588{
589 LPASYNCIOREQUEST current;
590
591 ::EnterCriticalSection(&critsect);
592 if(pending[index]) {
593 if(pending[index] == lpRequest) {
594 pending[index] = lpRequest->next;
595 }
596 else {
597 current = pending[index];
598 while(current->next && current->next != lpRequest) {
599 current = current->next;
600 }
601 if(current->next) {
602 current->next = lpRequest->next;
603 }
604 else {
605 dprintf(("!ERROR!: request %x not found!!!!!!", lpRequest));
606 DebugInt3();
607 }
608 }
609 }
610 //else removed from list by cancelio
611 ::LeaveCriticalSection(&critsect);
612}
613//******************************************************************************
614//******************************************************************************
615
Note: See TracBrowser for help on using the repository browser.