source: trunk/src/kernel32/overlappedio.cpp@ 7598

Last change on this file since 7598 was 7598, checked in by sandervl, 24 years ago

added half or full duplex support

File size: 21.2 KB
Line 
1/* $Id: overlappedio.cpp,v 1.9 2001-12-09 21:55:17 sandervl Exp $ */
2
3/*
4 * Win32 overlapped IO class
5 *
6 * Copyright 2001 Sander van Leeuwen <sandervl@xs4all.nl>
7 *
8 * Project Odin Software License can be found in LICENSE.TXT
9 *
10 */
11
12
13
14#include <os2win.h>
15#include <string.h>
16#include <handlemanager.h>
17#include <heapstring.h>
18#include "overlappedio.h"
19#include "oslibdos.h"
20
21#define DBG_LOCALLOG DBG_overlappedio
22#include "dbglocal.h"
23
24
25//******************************************************************************
26//******************************************************************************
27OverlappedIOHandler::OverlappedIOHandler(LPOVERLAPPED_HANDLER lpReadHandler,
28 LPOVERLAPPED_HANDLER lpWriteHandler,
29 LPOVERLAPPED_HANDLER lpPollHandler,
30 BOOL fFullDuplex) :
31 hThreadRead(0), hThreadWrite(0), hThreadPoll(0)
32{
33 OverlappedIOError errcode = OutOfMemory;
34
35 this->fFullDuplex = fFullDuplex;
36
37 if(lpReadHandler == NULL || lpPollHandler == NULL) {
38 throw(InvalidParameter);
39 }
40
41 pending[ASYNC_INDEX_READ] = pending[ASYNC_INDEX_WRITE] = NULL;
42 pending [ASYNC_INDEX_POLL] = pending [ASYNC_INDEX_BUSY] = NULL;
43
44 this->lpReadHandler = lpReadHandler;
45 this->lpWriteHandler = lpWriteHandler;
46 this->lpPollHandler = lpPollHandler;
47
48 ::InitializeCriticalSection(&critsect);
49 //poll, read & write event semaphores are auto-reset (one thread wakes up
50 //after a SetEvent call)
51 hEventPoll = ::CreateEventA(NULL, FALSE, FALSE, NULL);
52 hEventRead = ::CreateEventA(NULL, FALSE, FALSE, NULL);
53 hEventWrite = ::CreateEventA(NULL, FALSE, FALSE, NULL);
54
55 dprintf(("OverlappedIOThread: hEventRead %x hEventWrite %x hEventPoll %x", hEventRead, hEventWrite, hEventPoll));
56
57 //the exit event semaphore is manual reset, because this event
58 //must be able to wake up multiple threads
59 hEventExit = ::CreateEventA(NULL, TRUE, FALSE, NULL);
60 if(!hEventPoll || !hEventRead || !hEventWrite || !hEventExit)
61 {
62 DebugInt3();
63 errcode = EventCreationFailed;
64 goto failed;
65 }
66
67 DWORD dwThreadId;
68 LPOVERLAPPED_THREAD_PARAM threadparam;
69
70 dwAsyncType = (lpWriteHandler && fFullDuplex) ? ASYNCIO_READ : ASYNCIO_READWRITE;
71 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
72 if(!threadparam) goto outofmem;
73 threadparam->dwOperation = dwAsyncType;
74 threadparam->lpOverlappedObj = this;
75 hThreadRead = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
76
77 if(lpWriteHandler && fFullDuplex) {
78 dwAsyncType |= ASYNCIO_WRITE;
79
80 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
81 if(!threadparam) goto outofmem;
82 threadparam->dwOperation = ASYNCIO_WRITE;
83 threadparam->lpOverlappedObj = this;
84 hThreadWrite = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
85 }
86
87 if(lpPollHandler) {
88 dwAsyncType |= ASYNCIO_POLL;
89
90 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
91 if(!threadparam) goto outofmem;
92 threadparam->dwOperation = ASYNCIO_POLL;
93 threadparam->lpOverlappedObj = this;
94 hThreadPoll = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
95 }
96
97 if((lpPollHandler && !hThreadPoll) || !hThreadRead || (lpWriteHandler && fFullDuplex && !hThreadWrite))
98 {
99 DebugInt3();
100 errcode = ThreadCreationFailed;
101 goto failed;
102 }
103 return;
104
105outofmem:
106 errcode = OutOfMemory;
107 //fall through
108failed:
109 //SvL: NOTE: We might not fail gracefully when threads have already been
110 // created. (thread accessing memory that has been freed)
111 // Don't feel like wasting time to fix this as this should never
112 // happen anyway.
113 if(hEventExit) {
114 ::SetEvent(hEventExit);
115 ::CloseHandle(hEventExit);
116 }
117
118 if(hEventRead) ::CloseHandle(hEventRead);
119 if(hEventWrite) ::CloseHandle(hEventWrite);
120 if(hEventPoll) ::CloseHandle(hEventPoll);
121
122 if(hThreadRead) ::CloseHandle(hThreadRead);
123 if(hThreadPoll) ::CloseHandle(hThreadPoll);
124 if(hThreadWrite) ::CloseHandle(hThreadWrite);
125 ::DeleteCriticalSection(&critsect);
126
127 throw(errcode);
128}
129//******************************************************************************
130//******************************************************************************
131OverlappedIOHandler::~OverlappedIOHandler()
132{
133 dprintf(("~OverlappedIOHandler: signalling overlapped threads"));
134 ::SetEvent(hEventExit);
135
136 ::CloseHandle(hEventExit);
137 ::CloseHandle(hEventRead);
138 ::CloseHandle(hEventWrite);
139 ::CloseHandle(hEventPoll);
140
141 ::CloseHandle(hThreadRead);
142 if(hThreadPoll) ::CloseHandle(hThreadPoll);
143 if(hThreadWrite) ::CloseHandle(hThreadWrite);
144
145 DeleteCriticalSection(&critsect);
146}
147//******************************************************************************
148//******************************************************************************
149DWORD CALLBACK OverlappedIOThread(LPVOID lpThreadParam)
150{
151 LPOVERLAPPED_THREAD_PARAM threadparam = (LPOVERLAPPED_THREAD_PARAM)lpThreadParam;
152 DWORD dwOperation;
153 OverlappedIOHandler *lpOverlappedObj;
154
155 if(threadparam == NULL) {
156 DebugInt3();
157 return 0;
158 }
159 lpOverlappedObj = threadparam->lpOverlappedObj;
160 dwOperation = threadparam->dwOperation;
161 //free thread parameter first
162 free(threadparam);
163
164 return lpOverlappedObj->threadHandler(dwOperation);
165}
166//******************************************************************************
167//******************************************************************************
168DWORD OverlappedIOHandler::threadHandler(DWORD dwOperation)
169{
170 LPASYNCIOREQUEST lpRequest;
171 LPOVERLAPPED lpOverlapped;
172 HANDLE hEvents[2];
173 HANDLE hEventsWait[2];
174 HANDLE hHandle;
175 DWORD ret, dwTimeOut, dwResult;
176 int index;
177
178 dprintf(("OverlappedIOThread: started for event %d", dwOperation));
179 switch(dwOperation) {
180 case ASYNCIO_READ:
181 case ASYNCIO_READWRITE:
182 hEvents[0] = hEventRead;
183 index = ASYNC_INDEX_READ;
184 break;
185
186 case ASYNCIO_WRITE:
187 hEvents[0] = hEventWrite;
188 index = ASYNC_INDEX_WRITE;
189 break;
190
191 case ASYNCIO_POLL:
192 hEvents[0] = hEventPoll;
193 index = ASYNC_INDEX_POLL;
194 break;
195 default:
196 DebugInt3();
197 }
198 hEvents[1] = hEventExit;
199
200 while(TRUE) {
201 ret = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
202 if(ret == WAIT_FAILED) {
203 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
204 break;
205 }
206 //if hEventExit has been signalled, then we are told to exit
207 if(ret == (WAIT_OBJECT_0+1)) {
208 dprintf(("end of threadHandler signalled"));
209 break;
210 }
211 ::EnterCriticalSection(&critsect);
212 if(pending[index] == NULL) {
213 //oh, oh
214 ::LeaveCriticalSection(&critsect);
215 dprintf(("!ERROR!: overlapped thread woken up, but no tasks pending!!"));
216 DebugInt3();
217 continue;
218 }
219 lpRequest = pending[index];
220 pending[index] = lpRequest->next;
221
222 //add to in process list
223 lpRequest->next = pending[ASYNC_INDEX_BUSY];
224 pending[ASYNC_INDEX_BUSY] = lpRequest;
225 ::LeaveCriticalSection(&critsect);
226
227 lpOverlapped = lpRequest->lpOverlapped;;
228 hHandle = lpRequest->hHandle;
229
230 switch(dwOperation) {
231 case ASYNCIO_READ:
232 case ASYNCIO_READWRITE:
233 if(lpRequest->dwAsyncType == ASYNCIO_READ || lpWriteHandler == NULL) {
234 lpRequest->dwLastError = lpReadHandler(lpRequest, &dwResult, NULL);
235 }
236 else lpRequest->dwLastError = lpWriteHandler(lpRequest, &dwResult, NULL);
237
238 lpOverlapped->Internal = lpRequest->dwLastError;
239 lpOverlapped->InternalHigh = dwResult;
240 if(lpRequest->lpdwResult) {
241 *lpRequest->lpdwResult = dwResult;
242 }
243 if(lpRequest->dwAsyncType == ASYNCIO_READ) {
244 dprintf(("ASYNCIO_READ %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
245 }
246 else dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
247 //wake up user thread
248 ::SetEvent(lpOverlapped->hEvent);
249 break;
250
251 case ASYNCIO_WRITE:
252 lpRequest->dwLastError = lpWriteHandler(lpRequest, &dwResult, NULL);
253 lpOverlapped->Internal = lpRequest->dwLastError;
254 lpOverlapped->InternalHigh = dwResult;
255 if(lpRequest->lpdwResult) {
256 *lpRequest->lpdwResult = dwResult;
257 }
258 dprintf(("ASYNCIO_WRITE %x finished; result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
259 //wake up user thread
260 ::SetEvent(lpOverlapped->hEvent);
261 break;
262
263 case ASYNCIO_POLL:
264 hEventsWait[0] = lpRequest->hEventCancel;
265 hEventsWait[1] = hEventExit;
266 ret = WAIT_TIMEOUT;
267 while(TRUE)
268 {
269 dwTimeOut = 0;
270 lpRequest->dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
271 if(lpRequest->dwLastError != ERROR_IO_PENDING) {
272 break;
273 }
274 if(dwTimeOut == 0) {
275 dprintf(("!ERROR!: lpPollHandler returned timeout 0!!"));
276 DebugInt3();
277 break;
278 }
279 //sleep a while to avoid wasting too many cpu cycles; we are woken up when a timeout occurs,
280 //when the operation is cancelled or when the process exits
281 ret = WaitForMultipleObjects(2, hEventsWait, FALSE, dwTimeOut);
282 if(ret != WAIT_TIMEOUT) {
283 dprintf(("ASYNCIO_POLL: WaitForSingleObject didn't time out, abort (ret = %x)", ret));
284 break;
285 }
286 }
287 //Don't access the overlapped & result memory when CancelIo was used to cancel the operation
288 if(ret == WAIT_TIMEOUT)
289 {
290 dprintf(("ASYNCIO_POLL %x: result %x, last error %d", lpOverlapped, dwResult, lpRequest->dwLastError));
291 lpOverlapped->Internal = lpRequest->dwLastError;
292 lpOverlapped->InternalHigh = dwResult;
293 if(lpRequest->lpdwResult) {
294 *lpRequest->lpdwResult = dwResult;
295 }
296 //wake up user thread
297 ::SetEvent(lpOverlapped->hEvent);
298 }
299 break;
300 }
301 //remove from in-process list and delete async request object
302 findAndRemoveRequest(ASYNC_INDEX_BUSY, hHandle);
303 delete lpRequest;
304 }
305 return 0;
306}
307//******************************************************************************
308//******************************************************************************
309BOOL OverlappedIOHandler::WriteFile(HANDLE hHandle,
310 LPCVOID lpBuffer,
311 DWORD nNumberOfBytesToWrite,
312 LPDWORD lpNumberOfBytesWritten,
313 LPOVERLAPPED lpOverlapped,
314 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
315 DWORD dwUserData,
316 DWORD dwTimeOut)
317{
318 LPASYNCIOREQUEST lpRequest, current;
319 int index;
320
321 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
322 ::SetLastError(ERROR_INVALID_PARAMETER);
323 return FALSE;
324 }
325
326 lpRequest = new ASYNCIOREQUEST;
327 if(lpRequest == NULL) {
328 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
329 return FALSE;
330 }
331 lpRequest->dwAsyncType = ASYNCIO_WRITE;
332 lpRequest->hHandle = hHandle;
333 lpRequest->lpBuffer = lpBuffer;
334 lpRequest->nNumberOfBytes = nNumberOfBytesToWrite;
335 lpRequest->lpdwResult = lpNumberOfBytesWritten;
336 lpRequest->lpOverlapped = lpOverlapped;
337 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
338 lpRequest->dwUserData = dwUserData;
339 lpRequest->dwTimeOut = dwTimeOut;
340 lpRequest->next = NULL;
341
342 if(dwAsyncType & ASYNCIO_READWRITE) {
343 index = ASYNC_INDEX_READ;
344 }
345 else index = ASYNC_INDEX_WRITE;
346
347 ::EnterCriticalSection(&critsect);
348 if(pending[index]) {
349 current = pending[index];
350 while(current->next) {
351 current = current->next;
352 }
353 current->next = lpRequest;
354 }
355 else pending[index] = lpRequest;
356 ::LeaveCriticalSection(&critsect);
357
358 lpOverlapped->Internal = ERROR_IO_PENDING;
359 lpOverlapped->InternalHigh = 0;
360 lpOverlapped->Offset = 0;
361 lpOverlapped->OffsetHigh = 0;
362 //reset overlapped semaphore to non-signalled
363 ::ResetEvent(lpOverlapped->hEvent);
364
365 //wake up async thread
366 ::SetEvent((dwAsyncType & ASYNCIO_READWRITE) ? hEventRead : hEventWrite);
367
368 ::SetLastError(ERROR_IO_PENDING);
369 return FALSE;
370}
371//******************************************************************************
372//******************************************************************************
373BOOL OverlappedIOHandler::ReadFile(HANDLE hHandle,
374 LPCVOID lpBuffer,
375 DWORD nNumberOfBytesToRead,
376 LPDWORD lpNumberOfBytesRead,
377 LPOVERLAPPED lpOverlapped,
378 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
379 DWORD dwUserData,
380 DWORD dwTimeOut)
381{
382 LPASYNCIOREQUEST lpRequest, current;
383
384 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
385 ::SetLastError(ERROR_INVALID_PARAMETER);
386 return FALSE;
387 }
388
389 lpRequest = new ASYNCIOREQUEST;
390 if(lpRequest == NULL) {
391 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
392 return FALSE;
393 }
394 lpRequest->dwAsyncType = ASYNCIO_READ;
395 lpRequest->hHandle = hHandle;
396 lpRequest->lpBuffer = lpBuffer;
397 lpRequest->nNumberOfBytes = nNumberOfBytesToRead;
398 lpRequest->lpdwResult = lpNumberOfBytesRead;
399 lpRequest->lpOverlapped = lpOverlapped;
400 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
401 lpRequest->dwUserData = dwUserData;
402 lpRequest->dwTimeOut = dwTimeOut;
403 lpRequest->next = NULL;
404
405 ::EnterCriticalSection(&critsect);
406 if(pending[ASYNC_INDEX_READ]) {
407 current = pending[ASYNC_INDEX_READ];
408 while(current->next) {
409 current = current->next;
410 }
411 current->next = lpRequest;
412 }
413 else pending[ASYNC_INDEX_READ] = lpRequest;
414 ::LeaveCriticalSection(&critsect);
415
416 lpOverlapped->Internal = ERROR_IO_PENDING;
417 lpOverlapped->InternalHigh = 0;
418 lpOverlapped->Offset = 0;
419 lpOverlapped->OffsetHigh = 0;
420 //reset overlapped semaphore to non-signalled
421 ::ResetEvent(lpOverlapped->hEvent);
422
423 //wake up async thread
424 ::SetEvent(hEventRead);
425 ::SetLastError(ERROR_IO_PENDING);
426 return FALSE;
427}
428//******************************************************************************
429//******************************************************************************
430BOOL OverlappedIOHandler::WaitForEvent(HANDLE hHandle,
431 DWORD dwEventMask,
432 LPDWORD lpfdwEvtMask,
433 LPOVERLAPPED lpOverlapped,
434 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
435 DWORD dwUserData,
436 DWORD dwTimeOut)
437{
438 LPASYNCIOREQUEST lpRequest, current;
439 DWORD dwLastError, dwResult;
440
441 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
442 ::SetLastError(ERROR_INVALID_PARAMETER);
443 return FALSE;
444 }
445 if(!lpPollHandler) {
446 DebugInt3();
447 ::SetLastError(ERROR_INVALID_PARAMETER);
448 return FALSE;
449 }
450
451 lpRequest = new ASYNCIOREQUEST;
452 if(lpRequest == NULL) {
453 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
454 return FALSE;
455 }
456 lpRequest->dwAsyncType = ASYNCIO_POLL;
457 lpRequest->hHandle = hHandle;
458 lpRequest->lpBuffer = NULL;
459 lpRequest->nNumberOfBytes = 0;
460 lpRequest->lpdwResult = lpfdwEvtMask;
461 lpRequest->lpOverlapped = lpOverlapped;
462 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
463 lpRequest->dwUserData = dwUserData;
464 lpRequest->dwTimeOut = dwTimeOut;
465 lpRequest->dwEventMask = dwEventMask;
466 lpRequest->next = NULL;
467
468 lpOverlapped->Internal = ERROR_IO_PENDING;
469 lpOverlapped->InternalHigh = 0;
470 lpOverlapped->Offset = 0;
471 lpOverlapped->OffsetHigh = 0;
472 //reset overlapped semaphore to non-signalled
473 ::ResetEvent(lpOverlapped->hEvent);
474
475 //first check if the event has already occured; if so, return result
476 //immediately
477 dwLastError = lpPollHandler(lpRequest, &dwResult, &dwTimeOut);
478 if(dwLastError != ERROR_IO_PENDING)
479 {
480 dprintf(("OverlappedIOHandler::WaitForEvent %x: result %x, last error %d", lpOverlapped, dwResult, dwLastError));
481 lpOverlapped->Internal = dwLastError;
482 lpOverlapped->InternalHigh = dwResult;
483 if(lpfdwEvtMask) {
484 *lpfdwEvtMask = dwResult;
485 }
486 //wake up user thread
487 ::SetEvent(lpOverlapped->hEvent);
488
489 delete lpRequest;
490 ::SetLastError(dwLastError);
491 return (dwLastError == ERROR_SUCCESS);
492 }
493
494 ::EnterCriticalSection(&critsect);
495 if(pending[ASYNC_INDEX_POLL]) {
496 current = pending[ASYNC_INDEX_READ];
497 while(current->next) {
498 current = current->next;
499 }
500 current->next = lpRequest;
501 }
502 else pending[ASYNC_INDEX_POLL] = lpRequest;
503 ::LeaveCriticalSection(&critsect);
504
505 //wake up async thread
506 ::SetEvent(hEventPoll);
507 ::SetLastError(ERROR_IO_PENDING);
508 return FALSE;
509}
510//******************************************************************************
511//******************************************************************************
512BOOL OverlappedIOHandler::CancelIo(HANDLE hHandle)
513{
514 LPASYNCIOREQUEST lpRequest;
515
516 for(int i=ASYNC_INDEX_READ;i<NR_ASYNC_OPERATIONS;i++)
517 {
518 while(TRUE) {
519 lpRequest = findAndRemoveRequest(i, hHandle);
520
521 if(lpRequest) {
522 ::SetEvent(lpRequest->hEventCancel); //cancel pending operation
523 if(i != ASYNC_INDEX_BUSY) {//thread that handles the request will delete it
524 delete lpRequest;
525 }
526 }
527 else break;
528 }
529 }
530 //TODO: return error if there were no pending requests???
531 ::SetLastError(ERROR_SUCCESS);
532 return TRUE;
533}
534//******************************************************************************
535//******************************************************************************
536BOOL OverlappedIOHandler::GetOverlappedResult(HANDLE hHandle,
537 LPOVERLAPPED lpOverlapped,
538 LPDWORD lpcbTransfer,
539 BOOL fWait)
540{
541 DWORD ret;
542
543 ret = ::WaitForSingleObject(lpOverlapped->hEvent, (fWait) ? INFINITE : 0);
544
545 if(lpcbTransfer)
546 *lpcbTransfer = lpOverlapped->InternalHigh;
547
548 ::SetLastError(lpOverlapped->Internal);
549
550 dprintf(("GetOverlappedResult %x -> result %d last error %d", hHandle, lpOverlapped->InternalHigh, lpOverlapped->Internal));
551 return (ret == WAIT_OBJECT_0);
552}
553//******************************************************************************
554//******************************************************************************
555LPASYNCIOREQUEST OverlappedIOHandler::findAndRemoveRequest(int index, HANDLE hHandle)
556{
557 LPASYNCIOREQUEST lpRequest, lpFound = NULL;
558
559 ::EnterCriticalSection(&critsect);
560 if(pending[index])
561 {
562 if(pending[index]->hHandle != hHandle)
563 {
564 lpRequest = pending[index];
565 while(lpRequest->next) {
566 if(lpRequest->next->hHandle == hHandle) {
567 lpFound = lpRequest->next;
568 lpRequest->next = lpFound->next;
569 break;
570 }
571 lpRequest = lpRequest->next;
572 }
573 }
574 else {
575 lpFound = pending[index];
576 pending[index] = lpFound->next;
577 }
578 }
579 ::LeaveCriticalSection(&critsect);
580 return lpFound;
581}
582//******************************************************************************
583//******************************************************************************
584
Note: See TracBrowser for help on using the repository browser.