source: trunk/src/kernel32/overlappedio.cpp@ 7631

Last change on this file since 7631 was 7631, checked in by sandervl, 24 years ago

enabled overlapped io for custom device drivers

File size: 21.1 KB
Line 
1/* $Id: overlappedio.cpp,v 1.13 2001-12-14 12:44:57 sandervl Exp $ */
2
3/*
4 * Win32 overlapped IO class
5 *
6 * Copyright 2001 Sander van Leeuwen <sandervl@xs4all.nl>
7 *
8 * Project Odin Software License can be found in LICENSE.TXT
9 *
10 */
11
12
13
14#include <os2win.h>
15#include <string.h>
16#include <handlemanager.h>
17#include <heapstring.h>
18#include <overlappedio.h>
19#include "oslibdos.h"
20
21#define DBG_LOCALLOG DBG_overlappedio
22#include "dbglocal.h"
23
24
25//******************************************************************************
26//******************************************************************************
27OverlappedIOHandler::OverlappedIOHandler(LPOVERLAPPED_HANDLER lpReadHandler,
28 LPOVERLAPPED_HANDLER lpWriteHandler,
29 LPOVERLAPPED_HANDLER lpPollHandler,
30 BOOL fFullDuplex) :
31 hThreadRead(0), hThreadWrite(0), hThreadPoll(0)
32{
33 OverlappedIOError errcode = OutOfMemory;
34
35 this->fFullDuplex = fFullDuplex;
36
37 if(lpReadHandler == NULL || lpPollHandler == NULL) {
38 throw(InvalidParameter);
39 }
40
41 pending[ASYNC_INDEX_READ] = pending[ASYNC_INDEX_WRITE] = NULL;
42 pending [ASYNC_INDEX_POLL] = pending [ASYNC_INDEX_BUSY] = NULL;
43
44 this->lpReadHandler = lpReadHandler;
45 this->lpWriteHandler = lpWriteHandler;
46 this->lpPollHandler = lpPollHandler;
47
48 ::InitializeCriticalSection(&critsect);
49 //poll, read & write event semaphores are auto-reset (one thread wakes up
50 //after a SetEvent call)
51 hEventPoll = ::CreateEventA(NULL, FALSE, FALSE, NULL);
52 hEventRead = ::CreateEventA(NULL, FALSE, FALSE, NULL);
53 hEventWrite = ::CreateEventA(NULL, FALSE, FALSE, NULL);
54
55 dprintf(("OverlappedIOThread: hEventRead %x hEventWrite %x hEventPoll %x", hEventRead, hEventWrite, hEventPoll));
56
57 //the exit & cancel event semaphores are manual reset, because these events
58 //must be able to wake up multiple threads
59 hEventExit = ::CreateEventA(NULL, TRUE, FALSE, NULL);
60 hEventCancel = ::CreateEventA(NULL, TRUE, FALSE, NULL);
61 if(!hEventPoll || !hEventRead || !hEventWrite || !hEventExit || !hEventCancel)
62 {
63 DebugInt3();
64 errcode = EventCreationFailed;
65 goto failed;
66 }
67
68 DWORD dwThreadId;
69 LPOVERLAPPED_THREAD_PARAM threadparam;
70
71 dwAsyncType = (lpWriteHandler && fFullDuplex) ? ASYNCIO_READ : ASYNCIO_READWRITE;
72 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
73 if(!threadparam) goto outofmem;
74 threadparam->dwOperation = dwAsyncType;
75 threadparam->lpOverlappedObj = this;
76 hThreadRead = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
77
78 if(lpWriteHandler && fFullDuplex) {
79 dwAsyncType |= ASYNCIO_WRITE;
80
81 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
82 if(!threadparam) goto outofmem;
83 threadparam->dwOperation = ASYNCIO_WRITE;
84 threadparam->lpOverlappedObj = this;
85 hThreadWrite = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
86 }
87
88 if(lpPollHandler) {
89 dwAsyncType |= ASYNCIO_POLL;
90
91 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
92 if(!threadparam) goto outofmem;
93 threadparam->dwOperation = ASYNCIO_POLL;
94 threadparam->lpOverlappedObj = this;
95 hThreadPoll = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
96 }
97
98 if((lpPollHandler && !hThreadPoll) || !hThreadRead || (lpWriteHandler && fFullDuplex && !hThreadWrite))
99 {
100 DebugInt3();
101 errcode = ThreadCreationFailed;
102 goto failed;
103 }
104 return;
105
106outofmem:
107 errcode = OutOfMemory;
108 //fall through
109failed:
110 //SvL: NOTE: We might not fail gracefully when threads have already been
111 // created. (thread accessing memory that has been freed)
112 // Don't feel like wasting time to fix this as this should never
113 // happen anyway.
114 if(hEventExit) {
115 ::SetEvent(hEventExit);
116 ::CloseHandle(hEventExit);
117 }
118
119 if(hEventRead) ::CloseHandle(hEventRead);
120 if(hEventWrite) ::CloseHandle(hEventWrite);
121 if(hEventPoll) ::CloseHandle(hEventPoll);
122
123 if(hThreadRead) ::CloseHandle(hThreadRead);
124 if(hThreadPoll) ::CloseHandle(hThreadPoll);
125 if(hThreadWrite) ::CloseHandle(hThreadWrite);
126 ::DeleteCriticalSection(&critsect);
127
128 throw(errcode);
129}
130//******************************************************************************
131//******************************************************************************
132OverlappedIOHandler::~OverlappedIOHandler()
133{
134 dprintf(("~OverlappedIOHandler: signalling overlapped threads"));
135 ::SetEvent(hEventExit);
136
137 ::CloseHandle(hEventExit);
138 ::CloseHandle(hEventRead);
139 ::CloseHandle(hEventWrite);
140 ::CloseHandle(hEventPoll);
141
142 ::CloseHandle(hThreadRead);
143 if(hThreadPoll) ::CloseHandle(hThreadPoll);
144 if(hThreadWrite) ::CloseHandle(hThreadWrite);
145
146 DeleteCriticalSection(&critsect);
147}
148//******************************************************************************
149//******************************************************************************
150DWORD CALLBACK OverlappedIOThread(LPVOID lpThreadParam)
151{
152 LPOVERLAPPED_THREAD_PARAM threadparam = (LPOVERLAPPED_THREAD_PARAM)lpThreadParam;
153 DWORD dwOperation;
154 OverlappedIOHandler *lpOverlappedObj;
155
156 if(threadparam == NULL) {
157 DebugInt3();
158 return 0;
159 }
160 lpOverlappedObj = threadparam->lpOverlappedObj;
161 dwOperation = threadparam->dwOperation;
162 //free thread parameter first
163 free(threadparam);
164
165 return lpOverlappedObj->threadHandler(dwOperation);
166}
167//******************************************************************************
168//******************************************************************************
169DWORD OverlappedIOHandler::threadHandler(DWORD dwOperation)
170{
171 LPASYNCIOREQUEST lpRequest;
172 LPOVERLAPPED lpOverlapped;
173 HANDLE hEvents[2];
174 HANDLE hEventsWait[2];
175 HANDLE hHandle;
176 DWORD ret, dwTimeOut, dwResult;
177 int index;
178
179 dprintf(("OverlappedIOThread: started for event %d", dwOperation));
180 switch(dwOperation) {
181 case ASYNCIO_READ:
182 case ASYNCIO_READWRITE:
183 hEvents[0] = hEventRead;
184 index = ASYNC_INDEX_READ;
185 break;
186
187 case ASYNCIO_WRITE:
188 hEvents[0] = hEventWrite;
189 index = ASYNC_INDEX_WRITE;
190 break;
191
192 case ASYNCIO_POLL:
193 hEvents[0] = hEventPoll;
194 index = ASYNC_INDEX_POLL;
195 break;
196 default:
197 DebugInt3();
198 }
199 hEvents[1] = hEventExit;
200
201 while(TRUE)
202 {
203 ret = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
204 if(ret == WAIT_FAILED) {
205 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
206 break;
207 }
208 //if hEventExit has been signalled, then we are told to exit
209 if(ret == (WAIT_OBJECT_0+1)) {
210 dprintf(("end of threadHandler signalled"));
211 break;
212 }
213 //process all pending jobs
214 while(TRUE)
215 {
216 ::EnterCriticalSection(&critsect);
217 if(pending[index] == NULL) {
218 ::LeaveCriticalSection(&critsect);
219 break;
220 }
221 lpRequest = pending[index];
222 pending[index] = lpRequest->next;
223
224 //add to in process list
225 lpRequest->next = pending[ASYNC_INDEX_BUSY];
226 pending[ASYNC_INDEX_BUSY] = lpRequest;
227 ::LeaveCriticalSection(&critsect);
228
229 lpOverlapped = lpRequest->lpOverlapped;;
230 hHandle = lpRequest->hHandle;
231
232 switch(dwOperation) {
233 case ASYNCIO_READ:
234 case ASYNCIO_READWRITE:
235 case ASYNCIO_WRITE:
236 if(lpRequest->dwAsyncType == ASYNCIO_READ || lpWriteHandler == NULL) {
237 lpRequest->dwLastError = lpReadHandler(lpRequest, &dwResult, NULL);
238 }
239 else lpRequest->dwLastError = lpWriteHandler(lpRequest, &dwResult, NULL);
240
241 if(!lpRequest->fCancelled)
242 {
243 lpOverlapped->Internal = lpRequest->dwLastError;
244 lpOverlapped->InternalHigh = dwResult;
245 if(lpRequest->lpdwResult) {
246 *lpRequest->lpdwResult = dwResult;
247 }
248#ifdef DEBUG
249 if(lpRequest->dwAsyncType == ASYNCIO_READ) {
250 dprintf(("ASYNCIO_READ %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
251 }
252 else dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
253#endif
254 //wake up user thread
255 ::SetEvent(lpOverlapped->hEvent);
256 }
257 break;
258
259 case ASYNCIO_POLL:
260 hEventsWait[0] = hEventCancel;
261 hEventsWait[1] = hEventExit;
262 ret = WAIT_TIMEOUT;
263 while(TRUE)
264 {
265 dwTimeOut = 0;
266 lpRequest->dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
267 if(lpRequest->dwLastError != ERROR_IO_PENDING) {
268 break;
269 }
270 if(dwTimeOut == 0) {
271 dprintf(("!ERROR!: lpPollHandler returned timeout 0!!"));
272 DebugInt3();
273 break;
274 }
275 //sleep a while to avoid wasting too many cpu cycles; we are woken up when a timeout occurs,
276 //when the operation is cancelled or when the process exits
277 ret = WaitForMultipleObjects(2, hEventsWait, FALSE, dwTimeOut);
278 if(ret != WAIT_TIMEOUT) {
279 dprintf(("ASYNCIO_POLL: WaitForSingleObject didn't time out, abort (ret = %x)", ret));
280 break;
281 }
282 }
283 //Don't access the overlapped & result memory when CancelIo was used to cancel the operation
284 if(ret == WAIT_TIMEOUT && !lpRequest->fCancelled)
285 {
286 dprintf(("ASYNCIO_POLL %x: result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
287 lpOverlapped->Internal = lpRequest->dwLastError;
288 lpOverlapped->InternalHigh = dwResult;
289 if(lpRequest->lpdwResult) {
290 *lpRequest->lpdwResult = dwResult;
291 }
292 //wake up user thread
293 ::SetEvent(lpOverlapped->hEvent);
294 }
295 break;
296 }
297 //remove from in-process list and delete async request object
298 findAndRemoveRequest(ASYNC_INDEX_BUSY, hHandle);
299 delete lpRequest;
300 } //while(TRUE)
301 }
302 return 0;
303}
304//******************************************************************************
305//******************************************************************************
306BOOL OverlappedIOHandler::WriteFile(HANDLE hHandle,
307 LPCVOID lpBuffer,
308 DWORD nNumberOfBytesToWrite,
309 LPDWORD lpNumberOfBytesWritten,
310 LPOVERLAPPED lpOverlapped,
311 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
312 DWORD dwUserData,
313 DWORD dwTimeOut)
314{
315 LPASYNCIOREQUEST lpRequest, current;
316 int index;
317
318 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
319 ::SetLastError(ERROR_INVALID_PARAMETER);
320 return FALSE;
321 }
322
323 lpRequest = new ASYNCIOREQUEST;
324 if(lpRequest == NULL) {
325 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
326 return FALSE;
327 }
328 lpRequest->dwAsyncType = ASYNCIO_WRITE;
329 lpRequest->hHandle = hHandle;
330 lpRequest->lpBuffer = lpBuffer;
331 lpRequest->nNumberOfBytes = nNumberOfBytesToWrite;
332 lpRequest->lpdwResult = lpNumberOfBytesWritten;
333 lpRequest->lpOverlapped = lpOverlapped;
334 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
335 lpRequest->dwUserData = dwUserData;
336 lpRequest->dwTimeOut = dwTimeOut;
337 lpRequest->next = NULL;
338
339 if(dwAsyncType & ASYNCIO_READWRITE) {
340 index = ASYNC_INDEX_READ;
341 }
342 else index = ASYNC_INDEX_WRITE;
343
344 ::EnterCriticalSection(&critsect);
345 if(pending[index]) {
346 current = pending[index];
347 while(current->next) {
348 current = current->next;
349 }
350 current->next = lpRequest;
351 }
352 else pending[index] = lpRequest;
353 ::LeaveCriticalSection(&critsect);
354
355 lpOverlapped->Internal = STATUS_PENDING;
356 lpOverlapped->InternalHigh = 0;
357 //reset overlapped semaphore to non-signalled
358 ::ResetEvent(lpOverlapped->hEvent);
359
360 //wake up async thread
361 ::SetEvent((dwAsyncType & ASYNCIO_READWRITE) ? hEventRead : hEventWrite);
362
363 ::SetLastError(ERROR_IO_PENDING);
364 return FALSE;
365}
366//******************************************************************************
367//******************************************************************************
368BOOL OverlappedIOHandler::ReadFile(HANDLE hHandle,
369 LPCVOID lpBuffer,
370 DWORD nNumberOfBytesToRead,
371 LPDWORD lpNumberOfBytesRead,
372 LPOVERLAPPED lpOverlapped,
373 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
374 DWORD dwUserData,
375 DWORD dwTimeOut)
376{
377 LPASYNCIOREQUEST lpRequest, current;
378
379 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
380 ::SetLastError(ERROR_INVALID_PARAMETER);
381 return FALSE;
382 }
383
384 lpRequest = new ASYNCIOREQUEST;
385 if(lpRequest == NULL) {
386 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
387 return FALSE;
388 }
389 lpRequest->dwAsyncType = ASYNCIO_READ;
390 lpRequest->hHandle = hHandle;
391 lpRequest->lpBuffer = lpBuffer;
392 lpRequest->nNumberOfBytes = nNumberOfBytesToRead;
393 lpRequest->lpdwResult = lpNumberOfBytesRead;
394 lpRequest->lpOverlapped = lpOverlapped;
395 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
396 lpRequest->dwUserData = dwUserData;
397 lpRequest->dwTimeOut = dwTimeOut;
398 lpRequest->next = NULL;
399
400 ::EnterCriticalSection(&critsect);
401 if(pending[ASYNC_INDEX_READ]) {
402 current = pending[ASYNC_INDEX_READ];
403 while(current->next) {
404 current = current->next;
405 }
406 current->next = lpRequest;
407 }
408 else pending[ASYNC_INDEX_READ] = lpRequest;
409 ::LeaveCriticalSection(&critsect);
410
411 lpOverlapped->Internal = STATUS_PENDING;
412 lpOverlapped->InternalHigh = 0;
413 //reset overlapped semaphore to non-signalled
414 ::ResetEvent(lpOverlapped->hEvent);
415
416 //wake up async thread
417 ::SetEvent(hEventRead);
418 ::SetLastError(ERROR_IO_PENDING);
419 return FALSE;
420}
421//******************************************************************************
422//******************************************************************************
423BOOL OverlappedIOHandler::WaitForEvent(HANDLE hHandle,
424 DWORD dwEventMask,
425 LPDWORD lpfdwEvtMask,
426 LPOVERLAPPED lpOverlapped,
427 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
428 DWORD dwUserData,
429 DWORD dwTimeOut)
430{
431 LPASYNCIOREQUEST lpRequest, current;
432 DWORD dwLastError, dwResult;
433
434 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
435 ::SetLastError(ERROR_INVALID_PARAMETER);
436 return FALSE;
437 }
438 if(!lpPollHandler) {
439 DebugInt3();
440 ::SetLastError(ERROR_INVALID_PARAMETER);
441 return FALSE;
442 }
443
444 lpRequest = new ASYNCIOREQUEST;
445 if(lpRequest == NULL) {
446 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
447 return FALSE;
448 }
449 lpRequest->dwAsyncType = ASYNCIO_POLL;
450 lpRequest->hHandle = hHandle;
451 lpRequest->lpBuffer = NULL;
452 lpRequest->nNumberOfBytes = 0;
453 lpRequest->lpdwResult = lpfdwEvtMask;
454 lpRequest->lpOverlapped = lpOverlapped;
455 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
456 lpRequest->dwUserData = dwUserData;
457 lpRequest->dwTimeOut = dwTimeOut;
458 lpRequest->dwEventMask = dwEventMask;
459 lpRequest->next = NULL;
460
461 lpOverlapped->Internal = STATUS_PENDING;
462 lpOverlapped->InternalHigh = 0;
463 //reset overlapped semaphore to non-signalled
464 ::ResetEvent(lpOverlapped->hEvent);
465
466 //first check if the event has already occured; if so, return result
467 //immediately
468 dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
469 if(dwLastError != ERROR_IO_PENDING)
470 {
471 dprintf(("OverlappedIOHandler::WaitForEvent %x: result %x, last error %d", lpOverlapped, dwResult, dwLastError));
472 lpOverlapped->Internal = dwLastError;
473 lpOverlapped->InternalHigh = dwResult;
474 if(lpfdwEvtMask) {
475 *lpfdwEvtMask = dwResult;
476 }
477 //wake up user thread
478 ::SetEvent(lpOverlapped->hEvent);
479
480 delete lpRequest;
481 ::SetLastError(dwLastError);
482 return (dwLastError == ERROR_SUCCESS);
483 }
484
485 ::EnterCriticalSection(&critsect);
486 if(pending[ASYNC_INDEX_POLL]) {
487 current = pending[ASYNC_INDEX_READ];
488 while(current->next) {
489 current = current->next;
490 }
491 current->next = lpRequest;
492 }
493 else pending[ASYNC_INDEX_POLL] = lpRequest;
494 ::LeaveCriticalSection(&critsect);
495
496 //wake up async thread
497 ::SetEvent(hEventPoll);
498 ::SetLastError(ERROR_IO_PENDING);
499 return FALSE;
500}
501//******************************************************************************
502//******************************************************************************
503BOOL OverlappedIOHandler::CancelIo(HANDLE hHandle)
504{
505 LPASYNCIOREQUEST lpRequest;
506
507 for(int i=ASYNC_INDEX_READ;i<NR_ASYNC_OPERATIONS;i++)
508 {
509 while(TRUE)
510 {
511 lpRequest = findAndRemoveRequest(i, hHandle);
512
513 if(lpRequest) {
514 //TODO: This doesn't work if multiple handles share the
515 // same OverlappedIOHandler
516 lpRequest->fCancelled = TRUE;
517 ::SetEvent(hEventCancel); //cancel pending operation
518 if(i != ASYNC_INDEX_BUSY) {//thread that handles the request will delete it
519 delete lpRequest;
520 }
521 }
522 else break;
523 }
524 }
525 //TODO: return error if there were no pending requests???
526 ::SetLastError(ERROR_SUCCESS);
527 return TRUE;
528}
529//******************************************************************************
530//******************************************************************************
531BOOL OverlappedIOHandler::GetOverlappedResult(HANDLE hHandle,
532 LPOVERLAPPED lpOverlapped,
533 LPDWORD lpcbTransfer,
534 BOOL fWait)
535{
536 DWORD ret;
537
538 ret = ::WaitForSingleObject(lpOverlapped->hEvent, (fWait) ? INFINITE : 0);
539
540 if(lpcbTransfer)
541 *lpcbTransfer = lpOverlapped->InternalHigh;
542
543 ::SetLastError(lpOverlapped->Internal);
544
545 dprintf(("GetOverlappedResult %x -> result %d last error %d", hHandle, lpOverlapped->InternalHigh, lpOverlapped->Internal));
546 return (ret == WAIT_OBJECT_0);
547}
548//******************************************************************************
549//******************************************************************************
550LPASYNCIOREQUEST OverlappedIOHandler::findAndRemoveRequest(int index, HANDLE hHandle)
551{
552 LPASYNCIOREQUEST lpRequest, lpFound = NULL;
553
554 ::EnterCriticalSection(&critsect);
555 if(pending[index])
556 {
557 if(pending[index]->hHandle != hHandle)
558 {
559 lpRequest = pending[index];
560 while(lpRequest->next) {
561 if(lpRequest->next->hHandle == hHandle) {
562 lpFound = lpRequest->next;
563 lpRequest->next = lpFound->next;
564 break;
565 }
566 lpRequest = lpRequest->next;
567 }
568 }
569 else {
570 lpFound = pending[index];
571 pending[index] = lpFound->next;
572 }
573 }
574 ::LeaveCriticalSection(&critsect);
575 return lpFound;
576}
577//******************************************************************************
578//******************************************************************************
579
Note: See TracBrowser for help on using the repository browser.