source: trunk/src/kernel32/overlappedio.cpp@ 7560

Last change on this file since 7560 was 7560, checked in by sandervl, 24 years ago

overlapped io updates

File size: 17.6 KB
Line 
1/* $Id: overlappedio.cpp,v 1.5 2001-12-06 15:57:52 sandervl Exp $ */
2
3/*
4 * Win32 overlapped IO class
5 *
6 * Copyright 2001 Sander van Leeuwen <sandervl@xs4all.nl>
7 *
8 * Project Odin Software License can be found in LICENSE.TXT
9 *
10 */
11
12
13
14#include <os2win.h>
15#include <string.h>
16#include <handlemanager.h>
17#include <heapstring.h>
18#include "overlappedio.h"
19#include "oslibdos.h"
20
21#define DBG_LOCALLOG DBG_overlappedio
22#include "dbglocal.h"
23
24
25//******************************************************************************
26//******************************************************************************
27OverlappedIOHandler::OverlappedIOHandler(LPOVERLAPPED_HANDLER lpReadHandler,
28 LPOVERLAPPED_HANDLER lpWriteHandler,
29 LPOVERLAPPED_HANDLER lpPollHandler) :
30 hThreadRead(0), hThreadWrite(0), hThreadPoll(0)
31{
32 OverlappedIOError errcode = OutOfMemory;
33
34 if(lpReadHandler == NULL || lpPollHandler == NULL) {
35 throw(InvalidParameter);
36 }
37
38 pending[ASYNC_INDEX_READ] = pending[ASYNC_INDEX_WRITE] = pending [ASYNC_INDEX_POLL] = NULL;
39
40 this->lpReadHandler = lpReadHandler;
41 this->lpWriteHandler = lpWriteHandler;
42 this->lpPollHandler = lpPollHandler;
43
44 ::InitializeCriticalSection(&critsect);
45 //poll, read & write event semaphores are auto-reset (one thread wakes up
46 //after a SetEvent call)
47 hEventPoll = ::CreateEventA(NULL, FALSE, FALSE, NULL);
48 hEventRead = ::CreateEventA(NULL, FALSE, FALSE, NULL);
49 hEventWrite = ::CreateEventA(NULL, FALSE, FALSE, NULL);
50
51 //the exit event semaphore is manual reset, because signalling this event
52 //must be able to wake up multiple threads
53 hEventExit = ::CreateEventA(NULL, TRUE, FALSE, NULL);
54 if(!hEventPoll || !hEventRead || !hEventWrite || !hEventExit)
55 {
56 DebugInt3();
57 errcode = EventCreationFailed;
58 goto failed;
59 }
60
61 DWORD dwThreadId;
62 LPOVERLAPPED_THREAD_PARAM threadparam;
63
64 dwAsyncType = (lpWriteHandler) ? ASYNCIO_READ : ASYNCIO_READWRITE;
65 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
66 if(!threadparam) goto outofmem;
67 threadparam->dwOperation = dwAsyncType;
68 threadparam->lpOverlappedObj = this;
69 hThreadRead = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
70
71 if(lpWriteHandler) {
72 dwAsyncType |= ASYNCIO_WRITE;
73
74 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
75 if(!threadparam) goto outofmem;
76 threadparam->dwOperation = ASYNCIO_WRITE;
77 threadparam->lpOverlappedObj = this;
78 hThreadWrite = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
79 }
80
81 if(lpPollHandler) {
82 dwAsyncType |= ASYNCIO_POLL;
83
84 threadparam = (LPOVERLAPPED_THREAD_PARAM)malloc(sizeof(OVERLAPPED_THREAD_PARAM));
85 if(!threadparam) goto outofmem;
86 threadparam->dwOperation = ASYNCIO_POLL;
87 threadparam->lpOverlappedObj = this;
88 hThreadPoll = ::CreateThread(NULL, 32*1024, OverlappedIOThread, (LPVOID)threadparam, 0, &dwThreadId);
89 }
90
91 if((lpPollHandler && !hThreadPoll) || !hThreadRead || (lpWriteHandler && !hThreadWrite))
92 {
93 DebugInt3();
94 errcode = ThreadCreationFailed;
95 goto failed;
96 }
97 return;
98
99outofmem:
100 errcode = OutOfMemory;
101 //fall through
102failed:
103 //SvL: NOTE: We might not fail gracefully when threads have already been
104 // created. (thread accessing memory that has been freed)
105 // Don't feel like wasting time to fix this as this should never
106 // happen anyway.
107 if(hEventExit) {
108 ::SetEvent(hEventExit);
109 ::CloseHandle(hEventExit);
110 }
111
112 if(hEventRead) ::CloseHandle(hEventRead);
113 if(hEventWrite) ::CloseHandle(hEventWrite);
114 if(hEventPoll) ::CloseHandle(hEventPoll);
115
116 if(hThreadRead) ::CloseHandle(hThreadRead);
117 if(hThreadPoll) ::CloseHandle(hThreadPoll);
118 if(hThreadWrite) ::CloseHandle(hThreadWrite);
119 ::DeleteCriticalSection(&critsect);
120
121 throw(errcode);
122}
123//******************************************************************************
124//******************************************************************************
125OverlappedIOHandler::~OverlappedIOHandler()
126{
127 dprintf(("~OverlappedIOHandler: signalling overlapped threads"));
128 ::SetEvent(hEventExit);
129
130 ::CloseHandle(hEventExit);
131 ::CloseHandle(hEventRead);
132 ::CloseHandle(hEventWrite);
133 ::CloseHandle(hEventPoll);
134
135 ::CloseHandle(hThreadRead);
136 if(hThreadPoll) ::CloseHandle(hThreadPoll);
137 if(hThreadWrite) ::CloseHandle(hThreadWrite);
138
139 DeleteCriticalSection(&critsect);
140}
141//******************************************************************************
142//******************************************************************************
143DWORD CALLBACK OverlappedIOThread(LPVOID lpThreadParam)
144{
145 LPOVERLAPPED_THREAD_PARAM threadparam = (LPOVERLAPPED_THREAD_PARAM)lpThreadParam;
146 DWORD dwOperation;
147 OverlappedIOHandler *lpOverlappedObj;
148
149 if(threadparam == NULL) {
150 DebugInt3();
151 return 0;
152 }
153 lpOverlappedObj = threadparam->lpOverlappedObj;
154 dwOperation = threadparam->dwOperation;
155 //free thread parameter first
156 free(threadparam);
157
158 return lpOverlappedObj->threadHandler(dwOperation);
159}
160//******************************************************************************
161//******************************************************************************
162DWORD OverlappedIOHandler::threadHandler(DWORD dwOperation)
163{
164 LPASYNCIOREQUEST lpRequest;
165 LPOVERLAPPED lpOverlapped;
166 HANDLE hEvents[2];
167 DWORD ret, dwTimeOut, dwResult;
168 int index;
169
170 dprintf(("OverlappedIOThread: started for event %d", dwOperation));
171 switch(dwOperation) {
172 case ASYNCIO_READ:
173 case ASYNCIO_READWRITE:
174 hEvents[0] = hEventRead;
175 index = ASYNC_INDEX_READ;
176 break;
177
178 case ASYNCIO_WRITE:
179 hEvents[0] = hEventWrite;
180 index = ASYNC_INDEX_WRITE;
181 break;
182
183 case ASYNCIO_POLL:
184 hEvents[0] = hEventPoll;
185 index = ASYNC_INDEX_POLL;
186 break;
187 default:
188 DebugInt3();
189 }
190 hEvents[1] = hEventExit;
191
192 while(TRUE) {
193 ret = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
194 if(ret == WAIT_FAILED) {
195 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
196 break;
197 }
198 if(ret == WAIT_FAILED) {
199 dprintf(("!WARNING!: WaitForMultipleObjects -> WAIT_FAILED!"));
200 break;
201 }
202 //if hEventExit has been signalled, then we are told to exit
203 if(ret == (WAIT_OBJECT_0+1)) {
204 dprintf(("end of threadHandler signalled"));
205 break;
206 }
207 ::EnterCriticalSection(&critsect);
208 if(pending[index] == NULL) {
209 //oh, oh
210 ::LeaveCriticalSection(&critsect);
211 dprintf(("!ERROR!: overlapped thread woken up, but no tasks pending!!"));
212 DebugInt3();
213 continue;
214 }
215 lpRequest = pending[index];
216 pending[index] = lpRequest->next;
217 lpRequest->next = NULL;
218 ::LeaveCriticalSection(&critsect);
219
220 lpOverlapped = lpRequest->lpOverlapped;;
221
222 switch(dwOperation) {
223 case ASYNCIO_READ:
224 case ASYNCIO_READWRITE:
225 lpReadHandler(lpRequest, &dwResult, NULL);
226 lpOverlapped->Internal = lpRequest->dwLastError;
227 lpOverlapped->InternalHigh = dwResult;
228 if(lpRequest->lpdwResult) {
229 *lpRequest->lpdwResult = dwResult;
230 }
231 //wake up user thread
232 ::SetEvent(lpOverlapped->hEvent);
233 delete lpRequest;
234 break;
235
236 case ASYNCIO_WRITE:
237 lpWriteHandler(lpRequest, &dwResult, NULL);
238 lpOverlapped->Internal = lpRequest->dwLastError;
239 lpOverlapped->InternalHigh = dwResult;
240 if(lpRequest->lpdwResult) {
241 *lpRequest->lpdwResult = dwResult;
242 }
243 //wake up user thread
244 ::SetEvent(lpOverlapped->hEvent);
245 delete lpRequest;
246 break;
247
248 case ASYNCIO_POLL:
249 while(TRUE)
250 {
251 dwTimeOut = 0;
252 if(lpPollHandler(lpRequest, &dwResult, &dwTimeOut) == TRUE) {
253 break;
254 }
255 if(dwTimeOut == 0) {
256 dprintf(("!ERROR!: lpPollHandler returned timeout 0!!"));
257 DebugInt3();
258 break;
259 }
260 Sleep(dwTimeOut);
261 }
262 lpOverlapped->Internal = lpRequest->dwLastError;
263 lpOverlapped->InternalHigh = dwResult;
264 if(lpRequest->lpdwResult) {
265 *lpRequest->lpdwResult = dwResult;
266 }
267 //wake up user thread
268 ::SetEvent(lpOverlapped->hEvent);
269 delete lpRequest;
270 break;
271 }
272 }
273 return 0;
274}
275//******************************************************************************
276//******************************************************************************
277BOOL OverlappedIOHandler::WriteFile(HANDLE hHandle,
278 LPCVOID lpBuffer,
279 DWORD nNumberOfBytesToWrite,
280 LPDWORD lpNumberOfBytesWritten,
281 LPOVERLAPPED lpOverlapped,
282 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
283 DWORD dwUserData)
284{
285 LPASYNCIOREQUEST lpRequest, current;
286 int index;
287
288 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
289 ::SetLastError(ERROR_INVALID_PARAMETER);
290 return FALSE;
291 }
292
293 lpRequest = new ASYNCIOREQUEST;
294 if(lpRequest == NULL) {
295 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
296 return FALSE;
297 }
298 lpRequest->dwAsyncType = ASYNCIO_WRITE;
299 lpRequest->hHandle = hHandle;
300 lpRequest->lpBuffer = lpBuffer;
301 lpRequest->nNumberOfBytes = nNumberOfBytesToWrite;
302 lpRequest->lpdwResult = lpNumberOfBytesWritten;
303 lpRequest->lpOverlapped = lpOverlapped;
304 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
305 lpRequest->dwUserData = dwUserData;
306 lpRequest->next = NULL;
307
308 if(dwAsyncType == ASYNCIO_READWRITE) {
309 index = ASYNC_INDEX_READ;
310 }
311 else index = ASYNC_INDEX_WRITE;
312
313 ::EnterCriticalSection(&critsect);
314 if(pending[index]) {
315 current = pending[index];
316 while(current->next) {
317 current = current->next;
318 }
319 current->next = lpRequest;
320 }
321 else pending[index] = lpRequest;
322 ::LeaveCriticalSection(&critsect);
323
324 lpOverlapped->Internal = STATUS_PENDING;
325 lpOverlapped->InternalHigh = 0;
326 lpOverlapped->Offset = 0;
327 lpOverlapped->OffsetHigh = 0;
328 //reset overlapped semaphore to non-signalled
329 ::ResetEvent(lpOverlapped->hEvent);
330
331 //wake up async thread
332 ::SetEvent((dwAsyncType == ASYNCIO_READWRITE) ? hEventRead : hEventWrite);
333
334 ::SetLastError(ERROR_IO_PENDING);
335 return FALSE;
336}
337//******************************************************************************
338//******************************************************************************
339BOOL OverlappedIOHandler::ReadFile(HANDLE hHandle,
340 LPCVOID lpBuffer,
341 DWORD nNumberOfBytesToRead,
342 LPDWORD lpNumberOfBytesRead,
343 LPOVERLAPPED lpOverlapped,
344 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
345 DWORD dwUserData)
346{
347 LPASYNCIOREQUEST lpRequest, current;
348
349 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
350 ::SetLastError(ERROR_INVALID_PARAMETER);
351 return FALSE;
352 }
353
354 lpRequest = new ASYNCIOREQUEST;
355 if(lpRequest == NULL) {
356 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
357 return FALSE;
358 }
359 lpRequest->dwAsyncType = ASYNCIO_READ;
360 lpRequest->hHandle = hHandle;
361 lpRequest->lpBuffer = lpBuffer;
362 lpRequest->nNumberOfBytes = nNumberOfBytesToRead;
363 lpRequest->lpdwResult = lpNumberOfBytesRead;
364 lpRequest->lpOverlapped = lpOverlapped;
365 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
366 lpRequest->dwUserData = dwUserData;
367 lpRequest->next = NULL;
368
369 ::EnterCriticalSection(&critsect);
370 if(pending[ASYNC_INDEX_READ]) {
371 current = pending[ASYNC_INDEX_READ];
372 while(current->next) {
373 current = current->next;
374 }
375 current->next = lpRequest;
376 }
377 else pending[ASYNC_INDEX_READ] = lpRequest;
378 ::LeaveCriticalSection(&critsect);
379
380 lpOverlapped->Internal = STATUS_PENDING;
381 lpOverlapped->InternalHigh = 0;
382 lpOverlapped->Offset = 0;
383 lpOverlapped->OffsetHigh = 0;
384 //reset overlapped semaphore to non-signalled
385 ::ResetEvent(lpOverlapped->hEvent);
386
387 //wake up async thread
388 ::SetEvent(hEventRead);
389 ::SetLastError(ERROR_IO_PENDING);
390 return FALSE;
391}
392//******************************************************************************
393//******************************************************************************
394BOOL OverlappedIOHandler::WaitForEvent(HANDLE hHandle,
395 LPDWORD lpfdwEvtMask,
396 LPOVERLAPPED lpOverlapped,
397 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
398 DWORD dwUserData)
399{
400 LPASYNCIOREQUEST lpRequest, current;
401
402 if(!lpOverlapped || lpOverlapped->hEvent == 0) {
403 ::SetLastError(ERROR_INVALID_PARAMETER);
404 return FALSE;
405 }
406
407 lpRequest = new ASYNCIOREQUEST;
408 if(lpRequest == NULL) {
409 ::SetLastError(ERROR_NOT_ENOUGH_MEMORY);
410 return FALSE;
411 }
412 lpRequest->dwAsyncType = ASYNCIO_POLL;
413 lpRequest->hHandle = hHandle;
414 lpRequest->lpBuffer = NULL;
415 lpRequest->nNumberOfBytes = 0;
416 lpRequest->lpdwResult = lpfdwEvtMask;
417 lpRequest->lpOverlapped = lpOverlapped;
418 lpRequest->lpCompletionRoutine = lpCompletionRoutine;
419 lpRequest->dwUserData = dwUserData;
420 lpRequest->next = NULL;
421
422 ::EnterCriticalSection(&critsect);
423 if(pending[ASYNC_INDEX_POLL]) {
424 current = pending[ASYNC_INDEX_READ];
425 while(current->next) {
426 current = current->next;
427 }
428 current->next = lpRequest;
429 }
430 else pending[ASYNC_INDEX_POLL] = lpRequest;
431 ::LeaveCriticalSection(&critsect);
432
433 lpOverlapped->Internal = STATUS_PENDING;
434 lpOverlapped->InternalHigh = 0;
435 lpOverlapped->Offset = 0;
436 lpOverlapped->OffsetHigh = 0;
437 //reset overlapped semaphore to non-signalled
438 ::ResetEvent(lpOverlapped->hEvent);
439
440 //wake up async thread
441 ::SetEvent(hEventPoll);
442 ::SetLastError(ERROR_IO_PENDING);
443 return FALSE;
444}
445//******************************************************************************
446//******************************************************************************
447BOOL OverlappedIOHandler::CancelIo(HANDLE hHandle)
448{
449 LPASYNCIOREQUEST lpRequest;
450
451 for(int i=ASYNC_INDEX_READ;i<NR_ASYNC_OPERATIONS;i++)
452 {
453 while(TRUE) {
454 lpRequest = findAndRemoveRequest(i, hHandle);
455 if(lpRequest) {
456 delete lpRequest;
457 }
458 else break;
459 }
460 }
461 //TODO: return error if there were no pending requests???
462 ::SetLastError(ERROR_SUCCESS);
463 return TRUE;
464}
465//******************************************************************************
466//******************************************************************************
467BOOL OverlappedIOHandler::GetOverlappedResult(HANDLE hHandle,
468 LPOVERLAPPED lpOverlapped,
469 LPDWORD lpcbTransfer,
470 DWORD dwTimeout)
471{
472 DWORD ret;
473
474 ret = ::WaitForSingleObject(lpOverlapped->hEvent, dwTimeout);
475
476 if(lpcbTransfer)
477 *lpcbTransfer = lpOverlapped->InternalHigh;
478
479 ::SetLastError(lpOverlapped->Internal);
480
481 return (ret == WAIT_OBJECT_0);
482}
483//******************************************************************************
484//******************************************************************************
485LPASYNCIOREQUEST OverlappedIOHandler::findAndRemoveRequest(int index, HANDLE hHandle)
486{
487 LPASYNCIOREQUEST lpRequest, lpFound = NULL;
488
489 ::EnterCriticalSection(&critsect);
490 if(pending[index])
491 {
492 if(pending[index]->hHandle != hHandle)
493 {
494 lpRequest = pending[index];
495 while(lpRequest->next) {
496 if(lpRequest->next->hHandle == hHandle) {
497 lpFound = lpRequest->next;
498 lpRequest->next = lpFound->next;
499 break;
500 }
501 lpRequest = lpRequest->next;
502 }
503 }
504 else {
505 lpFound = pending[index];
506 pending[index] = lpFound->next;
507 }
508 }
509 ::LeaveCriticalSection(&critsect);
510 return lpFound;
511}
512//******************************************************************************
513//******************************************************************************
Note: See TracBrowser for help on using the repository browser.