source: trunk/src/kernel32/overlappedio.cpp@ 7596

Last change on this file since 7596 was 7596, checked in by sandervl, 24 years ago

Overlapped IO fix (poll)

File size: 20.8 KB
Line 
1/* $Id: overlappedio.cpp,v 1.8 2001-12-09 21:19:28 sandervl Exp $ */
2
3/*
4 * Win32 overlapped IO class
5 *
6 * Copyright 2001 Sander van Leeuwen <sandervl@xs4all.nl>
7 *
8 * Project Odin Software License can be found in LICENSE.TXT
9 *
10 */
11
12
13
14#include <os2win.h>
15#include <string.h>
16#include <handlemanager.h>
17#include <heapstring.h>
18#include "overlappedio.h"
19#include "oslibdos.h"
20
21#define DBG_LOCALLOG DBG_overlappedio
22#include "dbglocal.h"
23
24
25//******************************************************************************
26//******************************************************************************
27OverlappedIOHandler::OverlappedIOHandler(LPOVERLAPPED_HANDLER lpReadHandler,
28 LPOVERLAPPED_HANDLER lpWriteHandler,
29 LPOVERLAPPED_HANDLER lpPollHandler) :
30 hThreadRead(0), hThreadWrite(0), hThreadPoll(0)
31{
32 OverlappedIOError errcode = OutOfMemory;
33
34 if(lpReadHandler == NULL || lpPollHandler == NULL) {
35 throw(InvalidParameter);
36 }
37
38 pending[ASYNC_INDEX_READ] = pending[ASYNC_INDEX_WRITE] = NULL;
39 pending [ASYNC_INDEX_POLL] = pending [ASYNC_INDEX_BUSY] = NULL;
40
41 this->lpReadHandler = lpReadHandler;
42 this->lpWriteHandler = lpWriteHandler;
43 this->lpPollHandler = lpPollHandler;
44
45 ::InitializeCriticalSection(&critsect);
46 //poll, read & write event semaphores are auto-reset (one thread wakes up
47 //after a SetEvent call)
48 hEventPoll = ::CreateEventA(NULL, FALSE, FALSE, NULL);
49 hEventRead = ::CreateEventA(NULL, FALSE, FALSE, NULL);
50 hEventWrite = ::CreateEventA(NULL, FALSE, FALSE, NULL);
51
52 //the exit event semaphore is manual reset, because this event
53 //must be able to wake up multiple threads
54 hEventExit = ::CreateEventA(NULL, TRUE, FALSE, NULL);
55 if(!hEventPoll || !hEventRead || !hEventWrite || !hEventExit)
56 {
57 DebugInt3();
58 errcode = EventCreationFailed;
59 goto failed;
60 }
61
62 DWORD dwThreadId;
63 LPOVERLAPPED_THREAD_PARAM threadparam;
64
65 dwAsyncType = (lpWriteHandler) ? ASYNCIO_READ : ASYNCIO_READWRITE;
66 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
67 if(!threadparam) goto outofmem;
68 threadparam->dwOperation = dwAsyncType;
69 threadparam->lpOverlappedObj = this;
70 hThreadRead = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
71
72 if(lpWriteHandler) {
73 dwAsyncType |= ASYNCIO_WRITE;
74
75 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
76 if(!threadparam) goto outofmem;
77 threadparam->dwOperation = ASYNCIO_WRITE;
78 threadparam->lpOverlappedObj = this;
79 hThreadWrite = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
80 }
81
82 if(lpPollHandler) {
83 dwAsyncType |= ASYNCIO_POLL;
84
85 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
86 if(!threadparam) goto outofmem;
87 threadparam->dwOperation = ASYNCIO_POLL;
88 threadparam->lpOverlappedObj = this;
89 hThreadPoll = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
90 }
91
92 if((lpPollHandler && !hThreadPoll) || !hThreadRead || (lpWriteHandler && !hThreadWrite))
93 {
94 DebugInt3();
95 errcode = ThreadCreationFailed;
96 goto failed;
97 }
98 return;
99
100outofmem:
101 errcode = OutOfMemory;
102 //fall through
103failed:
104 //SvL: NOTE: We might not fail gracefully when threads have already been
105 // created. (thread accessing memory that has been freed)
106 // Don't feel like wasting time to fix this as this should never
107 // happen anyway.
108 if(hEventExit) {
109 ::SetEvent(hEventExit);
110 ::CloseHandle(hEventExit);
111 }
112
113 if(hEventRead) ::CloseHandle(hEventRead);
114 if(hEventWrite) ::CloseHandle(hEventWrite);
115 if(hEventPoll) ::CloseHandle(hEventPoll);
116
117 if(hThreadRead) ::CloseHandle(hThreadRead);
118 if(hThreadPoll) ::CloseHandle(hThreadPoll);
119 if(hThreadWrite) ::CloseHandle(hThreadWrite);
120 ::DeleteCriticalSection(&critsect);
121
122 throw(errcode);
123}
124//******************************************************************************
125//******************************************************************************
126OverlappedIOHandler::~OverlappedIOHandler()
127{
128 dprintf(("~OverlappedIOHandler: signalling overlapped threads"));
129 ::SetEvent(hEventExit);
130
131 ::CloseHandle(hEventExit);
132 ::CloseHandle(hEventRead);
133 ::CloseHandle(hEventWrite);
134 ::CloseHandle(hEventPoll);
135
136 ::CloseHandle(hThreadRead);
137 if(hThreadPoll) ::CloseHandle(hThreadPoll);
138 if(hThreadWrite) ::CloseHandle(hThreadWrite);
139
140 DeleteCriticalSection(&critsect);
141}
142//******************************************************************************
143//******************************************************************************
144DWORD CALLBACK OverlappedIOThread(LPVOID lpThreadParam)
145{
146 LPOVERLAPPED_THREAD_PARAM threadparam = (LPOVERLAPPED_THREAD_PARAM)lpThreadParam;
147 DWORD dwOperation;
148 OverlappedIOHandler *lpOverlappedObj;
149
150 if(threadparam == NULL) {
151 DebugInt3();
152 return 0;
153 }
154 lpOverlappedObj = threadparam->lpOverlappedObj;
155 dwOperation = threadparam->dwOperation;
156 //free thread parameter first
157 free(threadparam);
158
159 return lpOverlappedObj->threadHandler(dwOperation);
160}
161//******************************************************************************
162//******************************************************************************
163DWORD OverlappedIOHandler::threadHandler(DWORD dwOperation)
164{
165 LPASYNCIOREQUEST lpRequest;
166 LPOVERLAPPED lpOverlapped;
167 HANDLE hEvents[2];
168 HANDLE hEventsWait[2];
169 HANDLE hHandle;
170 DWORD ret, dwTimeOut, dwResult;
171 int index;
172
173 dprintf(("OverlappedIOThread: started for event %d", dwOperation));
174 switch(dwOperation) {
175 case ASYNCIO_READ:
176 case ASYNCIO_READWRITE:
177 hEvents[0] = hEventRead;
178 index = ASYNC_INDEX_READ;
179 break;
180
181 case ASYNCIO_WRITE:
182 hEvents[0] = hEventWrite;
183 index = ASYNC_INDEX_WRITE;
184 break;
185
186 case ASYNCIO_POLL:
187 hEvents[0] = hEventPoll;
188 index = ASYNC_INDEX_POLL;
189 break;
190 default:
191 DebugInt3();
192 }
193 hEvents[1] = hEventExit;
194
195 while(TRUE) {
196 ret = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
197 if(ret == WAIT_FAILED) {
198 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
199 break;
200 }
201 //if hEventExit has been signalled, then we are told to exit
202 if(ret == (WAIT_OBJECT_0+1)) {
203 dprintf(("end of threadHandler signalled"));
204 break;
205 }
206 ::EnterCriticalSection(&critsect);
207 if(pending[index] == NULL) {
208 //oh, oh
209 ::LeaveCriticalSection(&critsect);
210 dprintf(("!ERROR!: overlapped thread woken up, but no tasks pending!!"));
211 DebugInt3();
212 continue;
213 }
214 lpRequest = pending[index];
215 pending[index] = lpRequest->next;
216
217 //add to in process list
218 lpRequest->next = pending[ASYNC_INDEX_BUSY];
219 pending[ASYNC_INDEX_BUSY] = lpRequest;
220 ::LeaveCriticalSection(&critsect);
221
222 lpOverlapped = lpRequest->lpOverlapped;;
223 hHandle = lpRequest->hHandle;
224
225 switch(dwOperation) {
226 case ASYNCIO_READ:
227 case ASYNCIO_READWRITE:
228 lpRequest->dwLastError = lpReadHandler(lpRequest, &dwResult, NULL);
229 lpOverlapped->Internal = lpRequest->dwLastError;
230 lpOverlapped->InternalHigh = dwResult;
231 if(lpRequest->lpdwResult) {
232 *lpRequest->lpdwResult = dwResult;
233 }
234 if(lpRequest->dwAsyncType == ASYNCIO_READ) {
235 dprintf(("ASYNCIO_READ %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
236 }
237 else dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
238 //wake up user thread
239 ::SetEvent(lpOverlapped->hEvent);
240 break;
241
242 case ASYNCIO_WRITE:
243 lpRequest->dwLastError = lpWriteHandler(lpRequest, &dwResult, NULL);
244 lpOverlapped->Internal = lpRequest->dwLastError;
245 lpOverlapped->InternalHigh = dwResult;
246 if(lpRequest->lpdwResult) {
247 *lpRequest->lpdwResult = dwResult;
248 }
249 dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
250 //wake up user thread
251 ::SetEvent(lpOverlapped->hEvent);
252 break;
253
254 case ASYNCIO_POLL:
255 hEventsWait[0] = lpRequest->hEventCancel;
256 hEventsWait[1] = hEventExit;
257 ret = WAIT_TIMEOUT;
258 while(TRUE)
259 {
260 dwTimeOut = 0;
261 lpRequest->dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
262 if(lpRequest->dwLastError != ERROR_IO_PENDING) {
263 break;
264 }
265 if(dwTimeOut == 0) {
266 dprintf(("!ERROR!: lpPollHandler returned timeout 0!!"));
267 DebugInt3();
268 break;
269 }
270 //sleep a while to avoid wasting too many cpu cycles; we are woken up when a timeout occurs,
271 //when the operation is cancelled or when the process exits
272 ret = WaitForMultipleObjects(2, hEventsWait, FALSE, dwTimeOut);
273 if(ret != WAIT_TIMEOUT) {
274 dprintf(("ASYNCIO_POLL: WaitForSingleObject didn't time out, abort (ret = %x)", ret));
275 break;
276 }
277 }
278 //Don't access the overlapped & result memory when CancelIo was used to cancel the operation
279 if(ret == WAIT_TIMEOUT)
280 {
281 dprintf(("ASYNCIO_POLL %x: result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
282 lpOverlapped->Internal = lpRequest->dwLastError;
283 lpOverlapped->InternalHigh = dwResult;
284 if(lpRequest->lpdwResult) {
285 *lpRequest->lpdwResult = dwResult;
286 }
287 //wake up user thread
288 ::SetEvent(lpOverlapped->hEvent);
289 }
290 break;
291 }
292 //remove from in-process list and delete async request object
293 findAndRemoveRequest(ASYNC_INDEX_BUSY, hHandle);
294 delete lpRequest;
295 }
296 return 0;
297}
298//******************************************************************************
299//******************************************************************************
300BOOL OverlappedIOHandler::WriteFile(HANDLE hHandle,
301 LPCVOID lpBuffer,
302 DWORD nNumberOfBytesToWrite,
303 LPDWORD lpNumberOfBytesWritten,
304 LPOVERLAPPED lpOverlapped,
305 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
306 DWORD dwUserData,
307 DWORD dwTimeOut)
308{
309 LPASYNCIOREQUEST lpRequest, current;
310 int index;
311
312 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
313 ::SetLastError(ERROR_INVALID_PARAMETER);
314 return FALSE;
315 }
316
317 lpRequest = new ASYNCIOREQUEST;
318 if(lpRequest == NULL) {
319 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
320 return FALSE;
321 }
322 lpRequest->dwAsyncType = ASYNCIO_WRITE;
323 lpRequest->hHandle = hHandle;
324 lpRequest->lpBuffer = lpBuffer;
325 lpRequest->nNumberOfBytes = nNumberOfBytesToWrite;
326 lpRequest->lpdwResult = lpNumberOfBytesWritten;
327 lpRequest->lpOverlapped = lpOverlapped;
328 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
329 lpRequest->dwUserData = dwUserData;
330 lpRequest->dwTimeOut = dwTimeOut;
331 lpRequest->next = NULL;
332
333 if(dwAsyncType == ASYNCIO_READWRITE) {
334 index = ASYNC_INDEX_READ;
335 }
336 else index = ASYNC_INDEX_WRITE;
337
338 ::EnterCriticalSection(&critsect);
339 if(pending[index]) {
340 current = pending[index];
341 while(current->next) {
342 current = current->next;
343 }
344 current->next = lpRequest;
345 }
346 else pending[index] = lpRequest;
347 ::LeaveCriticalSection(&critsect);
348
349 lpOverlapped->Internal = ERROR_IO_PENDING;
350 lpOverlapped->InternalHigh = 0;
351 lpOverlapped->Offset = 0;
352 lpOverlapped->OffsetHigh = 0;
353 //reset overlapped semaphore to non-signalled
354 ::ResetEvent(lpOverlapped->hEvent);
355
356 //wake up async thread
357 ::SetEvent((dwAsyncType == ASYNCIO_READWRITE) ? hEventRead : hEventWrite);
358
359 ::SetLastError(ERROR_IO_PENDING);
360 return FALSE;
361}
362//******************************************************************************
363//******************************************************************************
364BOOL OverlappedIOHandler::ReadFile(HANDLE hHandle,
365 LPCVOID lpBuffer,
366 DWORD nNumberOfBytesToRead,
367 LPDWORD lpNumberOfBytesRead,
368 LPOVERLAPPED lpOverlapped,
369 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
370 DWORD dwUserData,
371 DWORD dwTimeOut)
372{
373 LPASYNCIOREQUEST lpRequest, current;
374
375 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
376 ::SetLastError(ERROR_INVALID_PARAMETER);
377 return FALSE;
378 }
379
380 lpRequest = new ASYNCIOREQUEST;
381 if(lpRequest == NULL) {
382 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
383 return FALSE;
384 }
385 lpRequest->dwAsyncType = ASYNCIO_READ;
386 lpRequest->hHandle = hHandle;
387 lpRequest->lpBuffer = lpBuffer;
388 lpRequest->nNumberOfBytes = nNumberOfBytesToRead;
389 lpRequest->lpdwResult = lpNumberOfBytesRead;
390 lpRequest->lpOverlapped = lpOverlapped;
391 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
392 lpRequest->dwUserData = dwUserData;
393 lpRequest->dwTimeOut = dwTimeOut;
394 lpRequest->next = NULL;
395
396 ::EnterCriticalSection(&critsect);
397 if(pending[ASYNC_INDEX_READ]) {
398 current = pending[ASYNC_INDEX_READ];
399 while(current->next) {
400 current = current->next;
401 }
402 current->next = lpRequest;
403 }
404 else pending[ASYNC_INDEX_READ] = lpRequest;
405 ::LeaveCriticalSection(&critsect);
406
407 lpOverlapped->Internal = ERROR_IO_PENDING;
408 lpOverlapped->InternalHigh = 0;
409 lpOverlapped->Offset = 0;
410 lpOverlapped->OffsetHigh = 0;
411 //reset overlapped semaphore to non-signalled
412 ::ResetEvent(lpOverlapped->hEvent);
413
414 //wake up async thread
415 ::SetEvent(hEventRead);
416 ::SetLastError(ERROR_IO_PENDING);
417 return FALSE;
418}
419//******************************************************************************
420//******************************************************************************
421BOOL OverlappedIOHandler::WaitForEvent(HANDLE hHandle,
422 DWORD dwEventMask,
423 LPDWORD lpfdwEvtMask,
424 LPOVERLAPPED lpOverlapped,
425 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
426 DWORD dwUserData,
427 DWORD dwTimeOut)
428{
429 LPASYNCIOREQUEST lpRequest, current;
430 DWORD dwLastError, dwResult;
431
432 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
433 ::SetLastError(ERROR_INVALID_PARAMETER);
434 return FALSE;
435 }
436 if(!lpPollHandler) {
437 DebugInt3();
438 ::SetLastError(ERROR_INVALID_PARAMETER);
439 return FALSE;
440 }
441
442 lpRequest = new ASYNCIOREQUEST;
443 if(lpRequest == NULL) {
444 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
445 return FALSE;
446 }
447 lpRequest->dwAsyncType = ASYNCIO_POLL;
448 lpRequest->hHandle = hHandle;
449 lpRequest->lpBuffer = NULL;
450 lpRequest->nNumberOfBytes = 0;
451 lpRequest->lpdwResult = lpfdwEvtMask;
452 lpRequest->lpOverlapped = lpOverlapped;
453 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
454 lpRequest->dwUserData = dwUserData;
455 lpRequest->dwTimeOut = dwTimeOut;
456 lpRequest->dwEventMask = dwEventMask;
457 lpRequest->next = NULL;
458
459 lpOverlapped->Internal = ERROR_IO_PENDING;
460 lpOverlapped->InternalHigh = 0;
461 lpOverlapped->Offset = 0;
462 lpOverlapped->OffsetHigh = 0;
463 //reset overlapped semaphore to non-signalled
464 ::ResetEvent(lpOverlapped->hEvent);
465
466 //first check if the event has already occured; if so, return result
467 //immediately
468 dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
469 if(dwLastError != ERROR_IO_PENDING)
470 {
471 dprintf(("OverlappedIOHandler::WaitForEvent %x: result %x, last error %d", lpOverlapped, dwResult, dwLastError));
472 lpOverlapped->Internal = dwLastError;
473 lpOverlapped->InternalHigh = dwResult;
474 if(lpfdwEvtMask) {
475 *lpfdwEvtMask = dwResult;
476 }
477 //wake up user thread
478 ::SetEvent(lpOverlapped->hEvent);
479
480 delete lpRequest;
481 ::SetLastError(dwLastError);
482 return (dwLastError == ERROR_SUCCESS);
483 }
484
485 ::EnterCriticalSection(&critsect);
486 if(pending[ASYNC_INDEX_POLL]) {
487 current = pending[ASYNC_INDEX_READ];
488 while(current->next) {
489 current = current->next;
490 }
491 current->next = lpRequest;
492 }
493 else pending[ASYNC_INDEX_POLL] = lpRequest;
494 ::LeaveCriticalSection(&critsect);
495
496 //wake up async thread
497 ::SetEvent(hEventPoll);
498 ::SetLastError(ERROR_IO_PENDING);
499 return FALSE;
500}
501//******************************************************************************
502//******************************************************************************
503BOOL OverlappedIOHandler::CancelIo(HANDLE hHandle)
504{
505 LPASYNCIOREQUEST lpRequest;
506
507 for(int i=ASYNC_INDEX_READ;i<NR_ASYNC_OPERATIONS;i++)
508 {
509 while(TRUE) {
510 lpRequest = findAndRemoveRequest(i, hHandle);
511
512 if(lpRequest) {
513 ::SetEvent(lpRequest->hEventCancel); //cancel pending operation
514 if(i != ASYNC_INDEX_BUSY) {//thread that handles the request will delete it
515 delete lpRequest;
516 }
517 }
518 else break;
519 }
520 }
521 //TODO: return error if there were no pending requests???
522 ::SetLastError(ERROR_SUCCESS);
523 return TRUE;
524}
525//******************************************************************************
526//******************************************************************************
527BOOL OverlappedIOHandler::GetOverlappedResult(HANDLE hHandle,
528 LPOVERLAPPED lpOverlapped,
529 LPDWORD lpcbTransfer,
530 BOOL fWait)
531{
532 DWORD ret;
533
534 ret = ::WaitForSingleObject(lpOverlapped->hEvent, (fWait) ? INFINITE : 0);
535
536 if(lpcbTransfer)
537 *lpcbTransfer = lpOverlapped->InternalHigh;
538
539 ::SetLastError(lpOverlapped->Internal);
540
541 dprintf(("GetOverlappedResult %x -> result %d last error %d", hHandle, lpOverlapped->InternalHigh, lpOverlapped->Internal));
542 return (ret == WAIT_OBJECT_0);
543}
544//******************************************************************************
545//******************************************************************************
546LPASYNCIOREQUEST OverlappedIOHandler::findAndRemoveRequest(int index, HANDLE hHandle)
547{
548 LPASYNCIOREQUEST lpRequest, lpFound = NULL;
549
550 ::EnterCriticalSection(&critsect);
551 if(pending[index])
552 {
553 if(pending[index]->hHandle != hHandle)
554 {
555 lpRequest = pending[index];
556 while(lpRequest->next) {
557 if(lpRequest->next->hHandle == hHandle) {
558 lpFound = lpRequest->next;
559 lpRequest->next = lpFound->next;
560 break;
561 }
562 lpRequest = lpRequest->next;
563 }
564 }
565 else {
566 lpFound = pending[index];
567 pending[index] = lpFound->next;
568 }
569 }
570 ::LeaveCriticalSection(&critsect);
571 return lpFound;
572}
573//******************************************************************************
574//******************************************************************************
575
Note: See TracBrowser for help on using the repository browser.