source: trunk/src/kernel32/overlappedio.cpp@ 8706

Last change on this file since 8706 was 8647, checked in by sandervl, 23 years ago

shorter polling delay + time critical poll thread

File size: 22.5 KB
Line 
1/* $Id: overlappedio.cpp,v 1.17 2002-06-11 16:36:06 sandervl Exp $ */
2
3/*
4 * Win32 overlapped IO class
5 *
6 * Copyright 2001 Sander van Leeuwen <sandervl@xs4all.nl>
7 *
8 * Project Odin Software License can be found in LICENSE.TXT
9 *
10 */
11
12
13
14#include <os2win.h>
15#include <string.h>
16#include <handlemanager.h>
17#include <heapstring.h>
18#include <overlappedio.h>
19#include "oslibdos.h"
20
21#define DBG_LOCALLOG DBG_overlappedio
22#include "dbglocal.h"
23
24
25//******************************************************************************
26//******************************************************************************
27OverlappedIOHandler::OverlappedIOHandler(LPOVERLAPPED_HANDLER lpReadHandler,
28 LPOVERLAPPED_HANDLER lpWriteHandler,
29 LPOVERLAPPED_HANDLER lpPollHandler,
30 BOOL fFullDuplex) :
31 hThreadRead(0), hThreadWrite(0), hThreadPoll(0)
32{
33 OverlappedIOError errcode = OutOfMemory;
34
35 this->fFullDuplex = fFullDuplex;
36
37 if(lpReadHandler == NULL) {
38 throw(InvalidParameter);
39 }
40
41 pending[ASYNC_INDEX_READ] = pending[ASYNC_INDEX_WRITE] = NULL;
42 pending [ASYNC_INDEX_POLL] = pending [ASYNC_INDEX_BUSY] = NULL;
43
44 this->lpReadHandler = lpReadHandler;
45 this->lpWriteHandler = lpWriteHandler;
46 this->lpPollHandler = lpPollHandler;
47
48 ::InitializeCriticalSection(&critsect);
49 //poll, read & write event semaphores are auto-reset (one thread wakes up
50 //after a SetEvent call)
51 hEventPoll = ::CreateEventA(NULL, FALSE, FALSE, NULL);
52 hEventRead = ::CreateEventA(NULL, FALSE, FALSE, NULL);
53 hEventWrite = ::CreateEventA(NULL, FALSE, FALSE, NULL);
54
55 dprintf(("OverlappedIOThread: hEventRead %x hEventWrite %x hEventPoll %x", hEventRead, hEventWrite, hEventPoll));
56
57 //the exit & cancel event semaphores are manual reset, because these events
58 //must be able to wake up multiple threads
59 hEventExit = ::CreateEventA(NULL, TRUE, FALSE, NULL);
60 hEventCancel = ::CreateEventA(NULL, TRUE, FALSE, NULL);
61 if(!hEventPoll || !hEventRead || !hEventWrite || !hEventExit || !hEventCancel)
62 {
63 DebugInt3();
64 errcode = EventCreationFailed;
65 goto failed;
66 }
67
68 DWORD dwThreadId;
69 LPOVERLAPPED_THREAD_PARAM threadparam;
70
71 dwAsyncType = (lpWriteHandler && fFullDuplex) ? ASYNCIO_READ : ASYNCIO_READWRITE;
72 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
73 if(!threadparam) goto outofmem;
74 threadparam->dwOperation = dwAsyncType;
75 threadparam->lpOverlappedObj = this;
76 hThreadRead = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
77
78 if(lpWriteHandler && fFullDuplex) {
79 dwAsyncType |= ASYNCIO_WRITE;
80
81 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
82 if(!threadparam) goto outofmem;
83 threadparam->dwOperation = ASYNCIO_WRITE;
84 threadparam->lpOverlappedObj = this;
85 hThreadWrite = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
86 }
87
88 if(lpPollHandler) {
89 dwAsyncType |= ASYNCIO_POLL;
90
91 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
92 if(!threadparam) goto outofmem;
93 threadparam->dwOperation = ASYNCIO_POLL;
94 threadparam->lpOverlappedObj = this;
95 hThreadPoll = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
96 SetThreadPriority(hThreadPoll, THREAD_PRIORITY_TIME_CRITICAL);
97 }
98
99 if((lpPollHandler && !hThreadPoll) || !hThreadRead || (lpWriteHandler && fFullDuplex && !hThreadWrite))
100 {
101 DebugInt3();
102 errcode = ThreadCreationFailed;
103 goto failed;
104 }
105 return;
106
107outofmem:
108 errcode = OutOfMemory;
109 //fall through
110failed:
111 //SvL: NOTE: We might not fail gracefully when threads have already been
112 // created. (thread accessing memory that has been freed)
113 // Don't feel like wasting time to fix this as this should never
114 // happen anyway.
115 if(hEventExit) {
116 ::SetEvent(hEventExit);
117 ::CloseHandle(hEventExit);
118 }
119
120 if(hEventRead) ::CloseHandle(hEventRead);
121 if(hEventWrite) ::CloseHandle(hEventWrite);
122 if(hEventPoll) ::CloseHandle(hEventPoll);
123
124 if(hThreadRead) ::CloseHandle(hThreadRead);
125 if(hThreadPoll) ::CloseHandle(hThreadPoll);
126 if(hThreadWrite) ::CloseHandle(hThreadWrite);
127 ::DeleteCriticalSection(&critsect);
128
129 throw(errcode);
130}
131//******************************************************************************
132//******************************************************************************
133OverlappedIOHandler::~OverlappedIOHandler()
134{
135 dprintf(("~OverlappedIOHandler: signalling overlapped threads"));
136 ::SetEvent(hEventExit);
137
138 ::CloseHandle(hEventExit);
139 ::CloseHandle(hEventRead);
140 ::CloseHandle(hEventWrite);
141 ::CloseHandle(hEventPoll);
142
143 ::CloseHandle(hThreadRead);
144 if(hThreadPoll) ::CloseHandle(hThreadPoll);
145 if(hThreadWrite) ::CloseHandle(hThreadWrite);
146
147 DeleteCriticalSection(&critsect);
148}
149//******************************************************************************
150//******************************************************************************
151DWORD CALLBACK OverlappedIOThread(LPVOID lpThreadParam)
152{
153 LPOVERLAPPED_THREAD_PARAM threadparam = (LPOVERLAPPED_THREAD_PARAM)lpThreadParam;
154 DWORD dwOperation;
155 OverlappedIOHandler *lpOverlappedObj;
156
157 if(threadparam == NULL || threadparam->lpOverlappedObj == NULL) {
158 DebugInt3();
159 return 0;
160 }
161 lpOverlappedObj = threadparam->lpOverlappedObj;
162 dwOperation = threadparam->dwOperation;
163 //free thread parameter first
164 free(threadparam);
165
166 return lpOverlappedObj->threadHandler(dwOperation);
167}
168//******************************************************************************
169//******************************************************************************
170DWORD OverlappedIOHandler::threadHandler(DWORD dwOperation)
171{
172 LPASYNCIOREQUEST lpRequest;
173 LPOVERLAPPED lpOverlapped;
174 HANDLE hEvents[2];
175 HANDLE hEventsWait[2];
176 HANDLE hHandle;
177 DWORD ret, dwTimeOut, dwResult;
178 int index;
179
180 dprintf(("OverlappedIOThread: started for event %d", dwOperation));
181 switch(dwOperation) {
182 case ASYNCIO_READ:
183 case ASYNCIO_READWRITE:
184 hEvents[0] = hEventRead;
185 index = ASYNC_INDEX_READ;
186 break;
187
188 case ASYNCIO_WRITE:
189 hEvents[0] = hEventWrite;
190 index = ASYNC_INDEX_WRITE;
191 break;
192
193 case ASYNCIO_POLL:
194 hEvents[0] = hEventPoll;
195 index = ASYNC_INDEX_POLL;
196 break;
197 default:
198 DebugInt3();
199 }
200 hEvents[1] = hEventExit;
201
202 while(TRUE)
203 {
204 ret = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
205 if(ret == WAIT_FAILED) {
206 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
207 break;
208 }
209 //if hEventExit has been signalled, then we are told to exit
210 if(ret == (WAIT_OBJECT_0+1)) {
211 dprintf(("end of threadHandler signalled"));
212 break;
213 }
214 //process all pending jobs
215 while(TRUE)
216 {
217 ::EnterCriticalSection(&critsect);
218 if(pending[index] == NULL) {
219 ::LeaveCriticalSection(&critsect);
220 break;
221 }
222 lpRequest = pending[index];
223 pending[index] = lpRequest->next;
224
225 //add to the head of process list
226 lpRequest->next = pending[ASYNC_INDEX_BUSY];
227 pending[ASYNC_INDEX_BUSY] = lpRequest;
228 ::LeaveCriticalSection(&critsect);
229
230 lpOverlapped = lpRequest->lpOverlapped;;
231 hHandle = lpRequest->hHandle;
232
233#ifdef DEBUG
234 switch(lpRequest->dwAsyncType) {
235 case ASYNCIO_READ:
236 case ASYNCIO_WRITE:
237 case ASYNCIO_POLL:
238 break;
239 default:
240 DebugInt3();
241 break;
242 }
243#endif
244 switch(dwOperation) {
245 case ASYNCIO_READ:
246 case ASYNCIO_READWRITE:
247 case ASYNCIO_WRITE:
248 if(lpRequest->dwAsyncType == ASYNCIO_READ || lpWriteHandler == NULL) {
249 lpRequest->dwLastError = lpReadHandler(lpRequest, &dwResult, NULL);
250 }
251 else lpRequest->dwLastError = lpWriteHandler(lpRequest, &dwResult, NULL);
252
253 if(!lpRequest->fCancelled)
254 {
255 lpOverlapped->Internal = lpRequest->dwLastError;
256 lpOverlapped->InternalHigh = dwResult;
257
258 //must NOT store result in specified location!!! (stack corruption)
259 //if(lpRequest->lpdwResult) {
260 // *lpRequest->lpdwResult = dwResult;
261 //}
262#ifdef DEBUG
263 if(lpRequest->dwAsyncType == ASYNCIO_READ) {
264 dprintf(("ASYNCIO_READ %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
265 }
266 else dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
267#endif
268 //wake up user thread
269 ::SetEvent(lpOverlapped->hEvent);
270 }
271 break;
272
273 case ASYNCIO_POLL:
274 hEventsWait[0] = hEventCancel;
275 hEventsWait[1] = hEventExit;
276 ret = WAIT_TIMEOUT;
277 while(TRUE)
278 {
279 dwTimeOut = 0;
280 lpRequest->dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
281 if(lpRequest->dwLastError != ERROR_IO_PENDING) {
282 break;
283 }
284 if(dwTimeOut == 0) {
285 dprintf(("!ERROR!: lpPollHandler returned timeout 0!!"));
286 DebugInt3();
287 break;
288 }
289 //sleep a while to avoid wasting too many cpu cycles; we are woken up when a timeout occurs,
290 //when the operation is cancelled or when the process exits
291 ret = WaitForMultipleObjects(2, hEventsWait, FALSE, dwTimeOut);
292 if(ret != WAIT_TIMEOUT) {
293 dprintf(("ASYNCIO_POLL: WaitForSingleObject didn't time out, abort (ret = %x)", ret));
294 break;
295 }
296 }
297 //Don't access the overlapped & result memory when CancelIo was used to cancel the operation
298 if(ret == WAIT_TIMEOUT && !lpRequest->fCancelled)
299 {
300 dprintf(("ASYNCIO_POLL %x: result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
301 lpOverlapped->Internal = lpRequest->dwLastError;
302 lpOverlapped->InternalHigh = dwResult;
303 if(lpRequest->lpdwResult) {
304 *lpRequest->lpdwResult = dwResult;
305 }
306 //wake up user thread
307 ::SetEvent(lpOverlapped->hEvent);
308 }
309 break;
310 }
311 //remove from in-process list and delete async request object
312 removeRequest(ASYNC_INDEX_BUSY, lpRequest);
313 delete lpRequest;
314 } //while(TRUE)
315 }
316 return 0;
317}
318//******************************************************************************
319//******************************************************************************
320BOOL OverlappedIOHandler::WriteFile(HANDLE hHandle,
321 LPCVOID lpBuffer,
322 DWORD nNumberOfBytesToWrite,
323 LPDWORD lpNumberOfBytesWritten,
324 LPOVERLAPPED lpOverlapped,
325 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
326 DWORD dwUserData,
327 DWORD dwTimeOut)
328{
329 LPASYNCIOREQUEST lpRequest, current;
330 int index;
331
332 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
333 ::SetLastError(ERROR_INVALID_PARAMETER);
334 return FALSE;
335 }
336
337 lpRequest = new ASYNCIOREQUEST;
338 if(lpRequest == NULL) {
339 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
340 return FALSE;
341 }
342 lpRequest->dwAsyncType = ASYNCIO_WRITE;
343 lpRequest->hHandle = hHandle;
344 lpRequest->lpBuffer = lpBuffer;
345 lpRequest->nNumberOfBytes = nNumberOfBytesToWrite;
346 //must NOT store result in specified location!!! (stack corruption)
347//// lpRequest->lpdwResult = lpNumberOfBytesWritten;
348 lpRequest->lpdwResult = NULL;
349 lpRequest->lpOverlapped = lpOverlapped;
350 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
351 lpRequest->dwUserData = dwUserData;
352 lpRequest->dwTimeOut = dwTimeOut;
353 lpRequest->next = NULL;
354
355 if(dwAsyncType & ASYNCIO_READWRITE) {
356 index = ASYNC_INDEX_READ;
357 }
358 else index = ASYNC_INDEX_WRITE;
359
360 addRequest(index, lpRequest);
361
362 lpOverlapped->Internal = STATUS_PENDING;
363 lpOverlapped->InternalHigh = 0;
364 //reset overlapped semaphore to non-signalled
365 ::ResetEvent(lpOverlapped->hEvent);
366
367 //wake up async thread
368 ::SetEvent((dwAsyncType & ASYNCIO_READWRITE) ? hEventRead : hEventWrite);
369
370 ::SetLastError(ERROR_IO_PENDING);
371 return FALSE;
372}
373//******************************************************************************
374//******************************************************************************
375BOOL OverlappedIOHandler::ReadFile(HANDLE hHandle,
376 LPCVOID lpBuffer,
377 DWORD nNumberOfBytesToRead,
378 LPDWORD lpNumberOfBytesRead,
379 LPOVERLAPPED lpOverlapped,
380 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
381 DWORD dwUserData,
382 DWORD dwTimeOut)
383{
384 LPASYNCIOREQUEST lpRequest, current;
385
386 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
387 ::SetLastError(ERROR_INVALID_PARAMETER);
388 return FALSE;
389 }
390
391 lpRequest = new ASYNCIOREQUEST;
392 if(lpRequest == NULL) {
393 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
394 return FALSE;
395 }
396 lpRequest->dwAsyncType = ASYNCIO_READ;
397 lpRequest->hHandle = hHandle;
398 lpRequest->lpBuffer = lpBuffer;
399 lpRequest->nNumberOfBytes = nNumberOfBytesToRead;
400 //must NOT store result in specified location!!! (stack corruption)
401//// lpRequest->lpdwResult = lpNumberOfBytesRead;
402 lpRequest->lpdwResult = NULL;
403 lpRequest->lpOverlapped = lpOverlapped;
404 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
405 lpRequest->dwUserData = dwUserData;
406 lpRequest->dwTimeOut = dwTimeOut;
407 lpRequest->next = NULL;
408
409 addRequest(ASYNC_INDEX_READ, lpRequest);
410
411 lpOverlapped->Internal = STATUS_PENDING;
412 lpOverlapped->InternalHigh = 0;
413 //reset overlapped semaphore to non-signalled
414 ::ResetEvent(lpOverlapped->hEvent);
415
416 //wake up async thread
417 ::SetEvent(hEventRead);
418 ::SetLastError(ERROR_IO_PENDING);
419 return FALSE;
420}
421//******************************************************************************
422//******************************************************************************
423BOOL OverlappedIOHandler::WaitForEvent(HANDLE hHandle,
424 DWORD dwEventMask,
425 LPDWORD lpfdwEvtMask,
426 LPOVERLAPPED lpOverlapped,
427 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
428 DWORD dwUserData,
429 DWORD dwTimeOut)
430{
431 LPASYNCIOREQUEST lpRequest, current;
432 DWORD dwLastError, dwResult;
433
434 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
435 ::SetLastError(ERROR_INVALID_PARAMETER);
436 return FALSE;
437 }
438 if(!lpPollHandler) {
439 DebugInt3();
440 ::SetLastError(ERROR_INVALID_PARAMETER);
441 return FALSE;
442 }
443
444 lpRequest = new ASYNCIOREQUEST;
445 if(lpRequest == NULL) {
446 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
447 return FALSE;
448 }
449 lpRequest->dwAsyncType = ASYNCIO_POLL;
450 lpRequest->hHandle = hHandle;
451 lpRequest->lpBuffer = NULL;
452 lpRequest->nNumberOfBytes = 0;
453 //must store result also in specified location
454 lpRequest->lpdwResult = lpfdwEvtMask;
455 lpRequest->lpOverlapped = lpOverlapped;
456 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
457 lpRequest->dwUserData = dwUserData;
458 lpRequest->dwTimeOut = dwTimeOut;
459 lpRequest->dwEventMask = dwEventMask;
460 lpRequest->next = NULL;
461
462 lpOverlapped->Internal = STATUS_PENDING;
463 lpOverlapped->InternalHigh = 0;
464 //reset overlapped semaphore to non-signalled
465 ::ResetEvent(lpOverlapped->hEvent);
466
467 //first check if the event has already occured; if so, return result
468 //immediately
469 dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
470 if(dwLastError != ERROR_IO_PENDING)
471 {
472 dprintf(("OverlappedIOHandler::WaitForEvent %x: result %x, last error %d", lpOverlapped, dwResult, dwLastError));
473 lpOverlapped->Internal = dwLastError;
474 lpOverlapped->InternalHigh = dwResult;
475 if(lpfdwEvtMask) {
476 *lpfdwEvtMask = dwResult;
477 }
478 //wake up user thread
479 ::SetEvent(lpOverlapped->hEvent);
480
481 delete lpRequest;
482 ::SetLastError(dwLastError);
483 return (dwLastError == ERROR_SUCCESS);
484 }
485
486 addRequest(ASYNC_INDEX_POLL, lpRequest);
487
488 //wake up async thread
489 ::SetEvent(hEventPoll);
490 ::SetLastError(ERROR_IO_PENDING);
491 return FALSE;
492}
493//******************************************************************************
494//******************************************************************************
495BOOL OverlappedIOHandler::CancelIo(HANDLE hHandle)
496{
497 LPASYNCIOREQUEST lpRequest;
498
499 for(int i=ASYNC_INDEX_READ;i<NR_ASYNC_OPERATIONS;i++)
500 {
501 while(TRUE)
502 {
503 lpRequest = findAndRemoveRequest(i, hHandle);
504
505 if(lpRequest) {
506 //TODO: This doesn't work if multiple handles share the
507 // same OverlappedIOHandler
508 lpRequest->fCancelled = TRUE;
509 ::SetEvent(hEventCancel); //cancel pending operation
510 if(i != ASYNC_INDEX_BUSY) {//thread that handles the request will delete it
511 delete lpRequest;
512 }
513 }
514 else break;
515 }
516 }
517 //TODO: return error if there were no pending requests???
518 ::SetLastError(ERROR_SUCCESS);
519 return TRUE;
520}
521//******************************************************************************
522//******************************************************************************
523BOOL OverlappedIOHandler::GetOverlappedResult(HANDLE hHandle,
524 LPOVERLAPPED lpOverlapped,
525 LPDWORD lpcbTransfer,
526 BOOL fWait)
527{
528 DWORD ret;
529
530 ret = ::WaitForSingleObject(lpOverlapped->hEvent, (fWait) ? INFINITE : 0);
531
532 if(lpcbTransfer)
533 *lpcbTransfer = lpOverlapped->InternalHigh;
534
535 ::SetLastError(lpOverlapped->Internal);
536
537 dprintf(("GetOverlappedResult %x -> result %d last error %d", hHandle, lpOverlapped->InternalHigh, lpOverlapped->Internal));
538 return (ret == WAIT_OBJECT_0);
539}
540//******************************************************************************
541//******************************************************************************
542LPASYNCIOREQUEST OverlappedIOHandler::findAndRemoveRequest(int index, HANDLE hHandle)
543{
544 LPASYNCIOREQUEST lpRequest, lpFound = NULL;
545
546 ::EnterCriticalSection(&critsect);
547 if(pending[index])
548 {
549 if(pending[index]->hHandle != hHandle)
550 {
551 lpRequest = pending[index];
552 while(lpRequest->next) {
553 if(lpRequest->next->hHandle == hHandle) {
554 lpFound = lpRequest->next;
555 lpRequest->next = lpFound->next;
556 break;
557 }
558 lpRequest = lpRequest->next;
559 }
560 }
561 else {
562 lpFound = pending[index];
563 pending[index] = lpFound->next;
564 }
565 }
566 ::LeaveCriticalSection(&critsect);
567 return lpFound;
568}
569//******************************************************************************
570//******************************************************************************
571void OverlappedIOHandler::addRequest(int index, LPASYNCIOREQUEST lpRequest)
572{
573 LPASYNCIOREQUEST current;
574
575 ::EnterCriticalSection(&critsect);
576 if(pending[index]) {
577 current = pending[index];
578 while(current->next) {
579 current = current->next;
580 }
581 current->next = lpRequest;
582 }
583 else pending[index] = lpRequest;
584 ::LeaveCriticalSection(&critsect);
585}
586//******************************************************************************
587//******************************************************************************
588void OverlappedIOHandler::removeRequest(int index, LPASYNCIOREQUEST lpRequest)
589{
590 LPASYNCIOREQUEST current;
591
592 ::EnterCriticalSection(&critsect);
593 if(pending[index]) {
594 if(pending[index] == lpRequest) {
595 pending[index] = lpRequest->next;
596 }
597 else {
598 current = pending[index];
599 while(current->next && current->next != lpRequest) {
600 current = current->next;
601 }
602 if(current->next) {
603 current->next = lpRequest->next;
604 }
605 else {
606 dprintf(("!ERROR!: request %x not found!!!!!!", lpRequest));
607 DebugInt3();
608 }
609 }
610 }
611 //else removed from list by cancelio
612 ::LeaveCriticalSection(&critsect);
613}
614//******************************************************************************
615//******************************************************************************
616
Note: See TracBrowser for help on using the repository browser.