source: trunk/tools/CmdQd/CmdQd.c@ 6543

Last change on this file since 6543 was 6543, checked in by bird, 24 years ago

CmdQd.c

File size: 40.9 KB
Line 
1/* $Id: CmdQd.c,v 1.2 2001-08-16 04:27:44 bird Exp $
2 *
3 * Command Queue Daemon / Client.
4 *
5 * Designed to execute commands asyncronus using multiple workers,
6 * and when all commands are submitted wait for them to complete.
7 *
8 * Copyright (c) 2001 knut st. osmundsen (kosmunds@csc.com)
9 *
10 * GPL
11 *
12 */
13
14
15/** @design Command Queue Daemon.
16 * This command daemon orginated as tool to exploit SMP and UNI systems better
17 * building large programs, but also when building one specific component of
18 * that program. It is gonna work just like the gnu make -j option.
19 *
20 * @subsection Work flow
21 *
22 * 1. Init daemon process. Creates a daemon process with a given number of
23 * workers. This is a detached process.
24 * 2. Submit jobs to the daemon. The daemon will queue the jobs and the
25 * workers will start working at once there is a job for them.
26 * 3. The nmake script will issue a wait command. We will now wait for all
27 * jobs to finish and in the mean time we'll display output from the jobs.
28 * Failing jobs will be queued up and show when all jobs are finished.
29 * 4. Two options: kill the daemon or start at step 2 again.
30 *
31 *
32 * @subsection Client <-> Daemon communication
33 *
34 * Client and daemon is one and the same executable. This has some advantages
35 * like implicit preloading of the client, fewer source files and fewer programs
36 * to install.
37 *
38 * The communication between the client and the daemon will use shared memory
39 * and an mutex semaphore which directs the conversation. The shared memory
40 * block is allocated by the daemon and will have a quite simple layout:
41 * Mutex Semaphore.
42 * Message Type.
43 * Message specific data:
44 * - Submit job:
45 * Returcode ignore. (4 bytes)
46 * Command to execute. (1 Kb)
47 * Current directory. (260 bytes)
48 * Environment block. (about 62KB)
49 * - Submit job response:
50 * Success/failure indicator.
51 *
52 * - Wait:
53 * Nothing.
54 * - Wait response:
55 * More output indicator.
56 * Success/failure indicator.
57 * Job output (about 63KB)
58 *
59 * - Kill:
60 * Nothing.
61 * - Kill response:
62 * Success/failure indicator.
63 *
64 * The shared memory block is 64KB.
65 *
66 *
67 * @subsection The Workers
68 *
69 * The workers is individual threads which waits for a job to be submitted to
70 * execution. Each worker is two threads. The job is executed thru a spawnvpe
71 * call and all stdout is read by the 1st worker thread, stderr is read by the
72 * 2nd thread. (Initially we've merged the two pipes and used one thread.)
73 * The output will be buffered (up to 4 MB). When the job is completed we'll
74 * put the output into either the success queue or the failure queue
75 * depending on the result.
76 *
77 *
78 */
79
80
81/*******************************************************************************
82* Header Files *
83*******************************************************************************/
84#include <stdio.h>
85#include <string.h>
86#include <stdlib.h>
87#include <stdarg.h>
88#include <assert.h>
89#include <direct.h>
90#include <signal.h>
91#include <process.h>
92
93#define INCL_BASE
94#include <os2.h>
95
96
97/*******************************************************************************
98* Defined Constants And Macros *
99*******************************************************************************/
100#define SHARED_MEM_NAME "\\SHAREMEM\\CmdQd"
101#define SHARED_MEM_SIZE 65536
102#define IDLE_TIMEOUT_MS -1 //(60*1000*3)
103#define OUTPUT_CHUNK (8192-8)
104
105#define HF_STDIN 0
106#define HF_STDOUT 1
107#define HF_STDERR 2
108
109/*******************************************************************************
110* Structures and Typedefs *
111*******************************************************************************/
112typedef struct SharedMem
113{
114 HEV hevClient; /* Client will wait on this. */
115 HEV hevDaemon; /* Daemon will wait on this. */
116 HMTX hmtx; /* Owner of the shared memory. */
117 HMTX hmtxClient; /* Take and release this for each */
118 /* client -> server -> client talk. */
119 enum
120 {
121 msgUnknown = 0,
122 msgSubmit = 1,
123 msgSubmitResponse = 2,
124 msgWait = 3,
125 msgWaitResponse = 4,
126 msgKill = 5,
127 msgKillResponse = 6
128 } enmMsgType;
129
130 union
131 {
132 struct Submit
133 {
134 unsigned rcIgnore; /* max return code to accept as good. */
135 char szCommand[1024]; /* job command. */
136 char szCurrentDir[CCHMAXPATH]; /* current directory. */
137 int cchEnv; /* Size of the environment block */
138 char szzEnv[SHARED_MEM_SIZE - CCHMAXPATH - 1024 - 4 - 4 - 4 - 4 - 4 - 4];
139 /* Environment block. */
140 } Submit;
141
142 struct SubmitResponse
143 {
144 BOOL fRc; /* Success idicator. */
145 } SubmitResponse;
146
147
148 struct Wait
149 {
150 int iNothing; /* Dummy. */
151 } Wait;
152
153 struct WaitResponse
154 {
155 BOOL fMore; /* More data. */
156 int rc; /* return code of first failing job. */
157 /* only valid if fMore == FALSE. */
158 char szOutput[SHARED_MEM_SIZE- 4 - 4 - 4 - 4 - 4 - 4 - 4];
159 /* The output of one or more jobs. */
160 } WaitResponse;
161
162
163 struct Kill
164 {
165 int iNothing; /* dummy. */
166 } Kill;
167
168 struct KillResponse
169 {
170 BOOL fRc; /* Success idicator. */
171 } KillResponse;
172
173 } u1;
174
175} SHAREDMEM, *PSHAREDMEM;
176
177
178typedef struct JobOutput
179{
180 struct JobOutput * pNext; /* Pointer to next output chunk. */
181 int cchOutput; /* Bytes used of the szOutput member. */
182 char szOutput[OUTPUT_CHUNK]; /* Output. */
183} JOBOUTPUT, *PJOBOUTPUT;
184
185
186typedef struct Job
187{
188 struct Job * pNext; /* Pointer to next job. */
189 int rc; /* Result. */
190 PJOBOUTPUT pJobOutput; /* Output. */
191 struct Submit JobInfo; /* Job. */
192} JOB, *PJOB;
193
194
195/*******************************************************************************
196* Global Variables *
197*******************************************************************************/
198PSHAREDMEM pShrMem; /* Pointer to the shared memory. */
199
200PJOB pJobQueue; /* Linked list of jobs. */
201PJOB pJobQueueEnd; /* Last job entry. */
202HMTX hmtxJobQueue; /* Read/Write mutex. */
203HEV hevJobQueue; /* Incomming job event sem. */
204ULONG cJobs; /* Count of jobs submitted. */
205
206HMTX hmtxJobQueueFine; /* Read/Write mutex for the next two queues. */
207HEV hevJobQueueFine; /* Posted when there is more output. */
208PJOB pJobCompleted; /* Linked list of successful jobs. */
209PJOB pJobCompletedLast; /* Last successful job entry. */
210PJOB pJobFailed; /* Linked list of failed jobs. */
211PJOB pJobFailedLast; /* Last failed job entry. */
212ULONG cJobsFinished; /* Count of jobs finished (failed or completed). */
213
214HMTX hmtxExec; /* Execute childs mutex sem. Required */
215 /* since we redirect standard files handles */
216 /* and changes the currentdirectory. */
217
218/*******************************************************************************
219* Internal Functions *
220*******************************************************************************/
221void syntax(void);
222
223/* operations */
224int Init(const char *arg0, int cWorkers);
225int Daemon(int cWorkers);
226int DaemonInit(int cWorkers);
227void signalhandler(int sig);
228void Worker(void * iWorkerId);
229int Submit(int rcIgnore);
230int Wait(void);
231int Kill(void);
232
233/* shared memory helpers */
234int shrmemCreate(void);
235int shrmemOpen(void);
236void shrmemFree(void);
237int shrmemSendDaemon(BOOL fWait);
238int shrmemSendClient(int enmMsgTypeResponse);
239
240/* error handling */
241void _Optlink Error(const char *pszFormat, ...);
242
243
244int main(int argc, char **argv)
245{
246
247 /*
248 * Display help.
249 */
250 if (argc < 2 || (argv[1][0] == '-'))
251 {
252 syntax();
253 if (argc < 2)
254 {
255 printf("\n!syntax error!");
256 return -1;
257 }
258 return 0;
259 }
260
261 /*
262 * String switch on command.
263 */
264 if (!stricmp(argv[1], "submit"))
265 {
266 int rcIgnore = 0;
267 if (argc == 2)
268 {
269 printf("fatal error: There is no job to submit...\n");
270 return -1;
271 }
272 if (argv[2][0] == '-' && (rcIgnore = atoi(argv[2]+1)) <= 0)
273 {
274 printf("syntax error: Invalid ignore return code number...\n");
275 return -1;
276 }
277 return Submit(rcIgnore);
278 }
279 else if (!stricmp(argv[1], "wait"))
280 {
281 return Wait();
282 }
283 else if (!strcmp(argv[1], "kill"))
284 {
285 return Kill();
286 }
287 else if (!strcmp(argv[1], "init"))
288 {
289 if (argc != 3 || atoi(argv[2]) <= 0 || atoi(argv[2]) >= 256)
290 {
291 printf("fatal error: invalid/missing number of workers.\n");
292 return -1;
293 }
294 return Init(argv[0], atoi(argv[2]));
295 }
296 else if (!strcmp(argv[1], "!Daemon!"))
297 {
298 if (argc != 3 || atoi(argv[2]) <= 0)
299 {
300 printf("fatal error: no worker count specified or to many parameters.\n");
301 return -2;
302 }
303
304 return Daemon(atoi(argv[2]));
305 }
306 else
307 {
308 syntax();
309 printf("\n!invalid command '%s'.\n", argv[1]);
310 return -1;
311 }
312
313 return 0;
314}
315
316
317/**
318 * Display syntax.
319 */
320void syntax(void)
321{
322 printf(
323 "Command Queue Daemon v0.0.1\n"
324 "---------------------------\n"
325 "syntax: CmdQd.exe <command> [args]\n"
326 "\n"
327 "commands:\n"
328 " init <workers>\n"
329 " Initiates the command queue daemon with the given number of workers.\n"
330 "\n"
331 " submit [-<n>] <command> [args]\n"
332 " Submits a command to the daemon.\n"
333 " Use '-<n>' to tell use to ignore return code 0-n.\n"
334 " \n"
335 " wait\n"
336 " Wait for all commands which are queued up to complete.\n"
337 " rc = count of failing commands.\n"
338 " \n"
339 " kill\n"
340 " Kills the daemon. Daemon will automatically die after\n"
341 " idling for some time.\n"
342 "\n"
343 "Copyright (c) 2001 knut st. osmundsen (kosmunds@csc.com)\n"
344 );
345}
346
347
348/**
349 * Starts a daemon process.
350 * @returns 0 on success.
351 * -4 on error.
352 * @param arg0 Executable filename.
353 * @param cWorkers Number of workers to start.
354 */
355int Init(const char *arg0, int cWorkers)
356{
357 int rc;
358 RESULTCODES Res; /* dummy, unused */
359 char szArg[CCHMAXPATH + 32];
360
361 if (!getenv("COMSPEC"))
362 {
363 Error("Fatal error: env. var. COMSPEC not found!\n");
364 return 0;
365 }
366
367 sprintf(&szArg[0], "%s\t!Daemon! %d", arg0, cWorkers);
368 szArg[strlen(arg0)] = '\0';
369 rc = DosExecPgm(NULL, 0, EXEC_BACKGROUND, &szArg[0], NULL, &Res, &szArg[0]);
370 if (rc)
371 Error("Fatal error: Failed to start daemon. rc=%d\n");
372 return rc;
373}
374
375
376/**
377 * This process is to be a daemon with a given number of works.
378 * @returns 0 on success.
379 * -4 on error.
380 * @param cWorkers Number of workers to start.
381 * @sketch
382 */
383int Daemon(int cWorkers)
384{
385 int rc;
386
387 /*
388 * Init Shared memory
389 */
390 rc = shrmemCreate();
391 if (rc)
392 return rc;
393
394 /*
395 * Init queues and semaphores.
396 */
397 rc = DaemonInit(cWorkers);
398 if (rc)
399 {
400 shrmemFree();
401 return rc;
402 }
403
404 /*
405 * Do work!
406 */
407 rc = shrmemSendDaemon(TRUE);
408 while (!rc)
409 {
410 switch (pShrMem->enmMsgType)
411 {
412 case msgSubmit:
413 {
414 PJOB pJob;
415
416 /*
417 * Make job entry.
418 */
419 _heap_check();
420 pJob = malloc((int)&((PJOB)0)->JobInfo.szzEnv[pShrMem->u1.Submit.cchEnv]);
421 if (pJob)
422 {
423 _heap_check();
424 memcpy(&pJob->JobInfo, &pShrMem->u1.Submit,
425 (int)&((struct Submit *)0)->szzEnv[pShrMem->u1.Submit.cchEnv]);
426 _heap_check();
427 pJob->rc = -1;
428 pJob->pNext = NULL;
429 pJob->pJobOutput = NULL;
430
431 /*
432 * Insert the entry.
433 */
434 rc = DosRequestMutexSem(hmtxJobQueue, SEM_INDEFINITE_WAIT);
435 if (rc)
436 break;
437 if (!pJobQueue)
438 pJobQueueEnd = pJobQueue = pJob;
439 else
440 {
441 pJobQueueEnd->pNext = pJob;
442 pJobQueueEnd = pJob;
443 }
444 cJobs++;
445 DosReleaseMutexSem(hmtxJobQueue);
446
447 /*
448 * Post the queue to wake up workers.
449 */
450 DosPostEventSem(hevJobQueue);
451 pShrMem->u1.SubmitResponse.fRc = TRUE;
452 }
453 else
454 {
455 Error("Internal Error: Out of memory (line=%d)\n", __LINE__);
456 pShrMem->u1.SubmitResponse.fRc = FALSE;
457 }
458 pShrMem->enmMsgType = msgSubmitResponse;
459 rc = shrmemSendDaemon(TRUE);
460 break;
461 }
462
463
464 case msgWait:
465 {
466 PJOB pJob = NULL;
467 PJOBOUTPUT pJobOutput = NULL;
468 char * psz;
469 int cch = 0;
470 char * pszOutput;
471 int cchOutput;
472 int rcFailure = 0;
473 BOOL fMore = TRUE;
474 ULONG ulIgnore;
475 void * pv;
476
477 DosPostEventSem(hevJobQueueFine); /* just so we don't get stuck in the loop... */
478 do
479 {
480 /* init response message */
481 pShrMem->enmMsgType = msgWaitResponse;
482 pShrMem->u1.WaitResponse.szOutput[0] = '\0';
483 pszOutput = &pShrMem->u1.WaitResponse.szOutput[0];
484 cchOutput = sizeof(pShrMem->u1.WaitResponse.szOutput) - 1;
485
486 /*
487 * Wait for output.
488 */
489 rc = DosWaitEventSem(hevJobQueueFine, SEM_INDEFINITE_WAIT);
490 if (rc)
491 break;
492
493 /*
494 * Copy output - Optimized so we don't cause to many context switches.
495 */
496 do
497 {
498 /*
499 * Next job.
500 */
501 if (!pJobOutput)
502 {
503 rc = DosRequestMutexSem(hmtxJobQueueFine, SEM_INDEFINITE_WAIT);
504 if (rc)
505 break;
506 pv = pJob;
507 pJob = pJobCompleted;
508 if (pJob)
509 {
510 pJobCompleted = pJob->pNext;
511 if (!pJobCompleted)
512 pJobCompletedLast = NULL;
513 }
514
515 if (!pJob && cJobs == cJobsFinished)
516 { /* all jobs finished, we may start output failures. */
517 pJob = pJobFailed;
518 if (pJob)
519 {
520 if (rcFailure == 0)
521 rcFailure = pJob->rc;
522
523 pJobFailed = pJob->pNext;
524 if (!pJobFailed)
525 pJobFailedLast = NULL;
526 }
527 else
528 fMore = FALSE;
529 }
530 else
531 DosResetEventSem(hevJobQueueFine, &ulIgnore); /* No more output, prepare wait. */
532 DosReleaseMutexSem(hmtxJobQueueFine);
533
534 if (pJob && pJob->pJobOutput)
535 {
536 pJobOutput = pJob->pJobOutput;
537 psz = pJobOutput->szOutput;
538 cch = pJobOutput->cchOutput;
539 }
540 if (pv)
541 free(pv);
542 }
543
544 /*
545 * Anything to output?
546 */
547 if (pJobOutput)
548 {
549 /*
550 * Copy output.
551 */
552 do
553 {
554 if (cch)
555 { /* copy */
556 int cchCopy = min(cch, cchOutput);
557 memcpy(pszOutput, psz, cchCopy);
558 psz += cchCopy; cch -= cchCopy;
559 pszOutput += cchCopy; cchOutput -= cchCopy;
560 }
561 if (!cch)
562 { /* next chunk */
563 pv = pJobOutput;
564 pJobOutput = pJobOutput->pNext;
565 if (pJobOutput)
566 {
567 psz = &pJobOutput->szOutput[0];
568 cch = pJobOutput->cchOutput;
569 }
570 free(pv);
571 }
572 } while (cch && cchOutput);
573 }
574 else
575 break; /* no more output, let's send what we got. */
576
577 } while (!rc && fMore && cchOutput);
578
579 /*
580 * We've got a message to send.
581 */
582 if (rc)
583 break;
584 *pszOutput = '\0';
585 pShrMem->u1.WaitResponse.rc = rcFailure;
586 pShrMem->u1.WaitResponse.fMore = fMore;
587 rc = shrmemSendDaemon(TRUE);
588 } while (!rc && fMore);
589 break;
590 }
591
592
593 case msgKill:
594 {
595 pShrMem->enmMsgType = msgKillResponse;
596 pShrMem->u1.KillResponse.fRc = TRUE;
597 shrmemSendDaemon(FALSE);
598 rc = -1;
599 break;
600 }
601
602 default:
603 Error("Internal error: Invalid message id %d\n", pShrMem->enmMsgType);
604 pShrMem->enmMsgType = msgUnknown;
605 rc = shrmemSendDaemon(TRUE);
606 }
607 }
608
609 /*
610 * Cleanup.
611 */
612 shrmemFree();
613 DosCloseMutexSem(hmtxJobQueue);
614 DosCloseMutexSem(hmtxJobQueueFine);
615 DosCloseEventSem(hevJobQueueFine);
616 DosCloseMutexSem(hmtxExec);
617 DosCloseEventSem(hevJobQueue);
618
619 _dump_allocated(16);
620
621 return 0;
622}
623
624
625/**
626 * Help which does most of the daemon init stuff.
627 * @returns 0 on success.
628 * @param cWorkers Number of worker threads to start.
629 */
630int DaemonInit(int cWorkers)
631{
632 int rc;
633 int i;
634
635 /*
636 * Init queues and semaphores.
637 */
638 rc = DosCreateEventSem(NULL, &hevJobQueue, 0, FALSE);
639 if (!rc)
640 {
641 rc = DosCreateMutexSem(NULL, &hmtxJobQueue, 0, FALSE);
642 if (!rc)
643 {
644 rc = DosCreateMutexSem(NULL, &hmtxJobQueueFine, 0, FALSE);
645 if (!rc)
646 {
647 rc = DosCreateEventSem(NULL, &hevJobQueueFine, 0, FALSE);
648 if (!rc)
649 {
650 rc = DosCreateMutexSem(NULL, &hmtxExec, 0, FALSE);
651 if (!rc)
652 {
653 /*
654 * Start workers.
655 */
656 rc = 0;
657 for (i = 0; i < cWorkers; i++)
658 if (_beginthread(Worker, NULL, 64*1024, (void*)i) == -1)
659 {
660 Error("Fatal error: failed to create worker thread no. %d\n", i);
661 rc = -1;
662 break;
663 }
664 if (!rc)
665 {
666 DosSetMaxFH(cWorkers * 3 + 20);
667 return 0; /* success! */
668 }
669
670 /* failure */
671 DosCloseMutexSem(hmtxExec);
672 }
673 else
674 Error("Fatal error: failed to create exec mutex. rc=%d", rc);
675 DosCloseEventSem(hevJobQueueFine);
676 }
677 else
678 Error("Fatal error: failed to create job queue fine event sem. rc=%d", rc);
679 DosCloseMutexSem(hmtxJobQueueFine);
680 }
681 else
682 Error("Fatal error: failed to create job queue fine mutex. rc=%d", rc);
683 DosCloseMutexSem(hmtxJobQueue);
684 }
685 else
686 Error("Fatal error: failed to create job queue mutex. rc=%d", rc);
687 DosCloseEventSem(hevJobQueue);
688 }
689 else
690 Error("Fatal error: failed to create job queue event sem. rc=%d", rc);
691
692 return rc;
693}
694
695
696/**
697 * Daemon signal handler.
698 */
699void signalhandler(int sig)
700{
701 shrmemFree();
702 exit(-42);
703}
704
705
706/**
707 * Worker thread.
708 * @param iWorkerId The worker process id.
709 * @sketch
710 */
711void Worker(void * iWorkerId)
712{
713 while (!DosWaitEventSem(hevJobQueue, SEM_INDEFINITE_WAIT))
714 {
715 PJOB pJob;
716
717 /*
718 * Get job.
719 */
720 if (DosRequestMutexSem(hmtxJobQueue, SEM_INDEFINITE_WAIT))
721 return;
722 pJob = pJobQueue;
723 if (pJob)
724 {
725 if (pJob != pJobQueueEnd)
726 pJobQueue = pJob->pNext;
727 else
728 {
729 ULONG ulIgnore;
730 pJobQueue = pJobQueueEnd = NULL;
731 DosResetEventSem(hevJobQueue, &ulIgnore);
732 }
733 }
734 DosReleaseMutexSem(hmtxJobQueue);
735
736 /*
737 * Execute job.
738 */
739 if (pJob)
740 {
741 int rc;
742 char szArg[4096];
743 char szObj[256];
744 PJOBOUTPUT pJobOutput = NULL;
745 PJOBOUTPUT pJobOutputLast = NULL;
746 RESULTCODES Res;
747 PID pid;
748 HFILE hStdOut = HF_STDOUT;
749 HFILE hStdErr = HF_STDERR;
750 HFILE hStdOutSave = -1;
751 HFILE hStdErrSave = -1;
752 HPIPE hPipeR = NULLHANDLE;
753 HPIPE hPipeW = NULLHANDLE;
754
755 //printf("debug-%d: start %s\n", iWorkerId, pJob->JobInfo.szCommand);
756
757 /*
758 * Redirect output and start process.
759 */
760 sprintf(szArg, "%s\t /C \"%s\"\n", getenv("COMSPEC"), pJob->JobInfo.szCommand);
761 *strchr(szArg, '\t') = '\0';
762 rc = DosCreatePipe(&hPipeR, &hPipeW, sizeof(pJobOutput->szOutput) - 1);
763 if (rc)
764 {
765 Error("Internal Error: Failed to create pipe! rc=%d\n", rc);
766 return;
767 }
768
769 if (DosRequestMutexSem(hmtxExec, SEM_INDEFINITE_WAIT))
770 {
771 DosClose(hPipeR);
772 DosClose(hPipeW);
773 return;
774 }
775
776 pJob->pJobOutput = pJobOutput = pJobOutputLast = malloc(sizeof(JOBOUTPUT));
777 pJobOutput->pNext = NULL;
778 pJobOutput->cchOutput = sprintf(pJobOutput->szOutput, "Job: %s\n", pJob->JobInfo.szCommand);
779
780 rc = DosSetDefaultDisk( pJob->JobInfo.szCurrentDir[0] >= 'a'
781 ? pJob->JobInfo.szCurrentDir[0] - 'a' + 1
782 : pJob->JobInfo.szCurrentDir[0] - 'A' + 1);
783 rc += DosSetCurrentDir(pJob->JobInfo.szCurrentDir);
784 if (!rc)
785 {
786 assert( pJob->JobInfo.szzEnv[pJob->JobInfo.cchEnv-1] == '\0'
787 && pJob->JobInfo.szzEnv[pJob->JobInfo.cchEnv-2] == '\0');
788 DosDupHandle(HF_STDOUT, &hStdOutSave);
789 DosDupHandle(HF_STDERR, &hStdErrSave);
790 DosDupHandle(hPipeW, &hStdOut);
791 DosDupHandle(hPipeW, &hStdErr);
792 rc = DosExecPgm(szObj, sizeof(szObj), EXEC_ASYNCRESULT,
793 szArg, pJob->JobInfo.szzEnv, &Res, szArg);
794 DosClose(hStdOut); hStdOut = HF_STDOUT;
795 DosClose(hStdErr); hStdErr = HF_STDERR;
796 DosDupHandle(hStdOutSave, &hStdOut);
797 DosDupHandle(hStdErrSave, &hStdErr);
798 DosClose(hStdOutSave);
799 DosClose(hStdErrSave);
800 DosReleaseMutexSem(hmtxExec);
801 DosClose(hPipeW);
802
803
804 /*
805 * Read Output.
806 */
807 if (!rc)
808 {
809 ULONG cchRead;
810 ULONG cchRead2 = 0;
811
812 cchRead = sizeof(pJobOutput->szOutput) - 1;
813 while (((rc = DosRead(hPipeR,
814 &pJobOutput->szOutput[pJobOutput->cchOutput],
815 cchRead, &cchRead2)) == NO_ERROR
816 || rc == ERROR_MORE_DATA)
817 && cchRead2 != 0)
818 {
819 pJobOutput->cchOutput += cchRead2;
820 pJobOutput->szOutput[pJobOutput->cchOutput] = '\0';
821
822 /* prepare next read */
823 cchRead = sizeof(pJobOutput->szOutput) - pJobOutput->cchOutput - 1;
824 if (cchRead < 16)
825 {
826 pJobOutput = pJobOutput->pNext = malloc(sizeof(JOBOUTPUT));
827 pJobOutput->pNext = NULL;
828 pJobOutput->cchOutput = 0;
829 cchRead = sizeof(pJobOutput->szOutput) - 1;
830 }
831 cchRead2 = 0;
832 }
833 rc = 0;
834 }
835
836 /* finished reading */
837 DosClose(hPipeR);
838
839 /*
840 * Get result.
841 */
842 if (!rc)
843 {
844 DosWaitChild(DCWA_PROCESS, DCWW_WAIT, &Res, &pid, Res.codeTerminate);
845 if ( Res.codeResult <= pJob->JobInfo.rcIgnore
846 && Res.codeTerminate == TC_EXIT)
847 pJob->rc = 0;
848 else
849 {
850 pJob->rc = -1;
851 rc = sprintf(szArg, "failed with rc=%d term=%d\n", Res.codeResult, Res.codeTerminate);
852 if (rc + pJobOutput->cchOutput + 1 >= sizeof(pJobOutput->szOutput))
853 {
854 pJobOutput = pJobOutput->pNext = malloc(sizeof(JOBOUTPUT));
855 pJobOutput->pNext = NULL;
856 pJobOutput->cchOutput = 0;
857 }
858 strcpy(&pJobOutput->szOutput[pJobOutput->cchOutput], szArg);
859 pJobOutput->cchOutput += rc;
860 }
861 }
862 else
863 {
864 pJobOutput->cchOutput += sprintf(&pJobOutput->szOutput[pJobOutput->cchOutput],
865 "DosExecPgm failed with rc=%d for command %s %s\n"
866 " obj=%s\n",
867 rc, szArg, pJob->JobInfo.szCommand, szObj);
868 pJob->rc = -1;
869 }
870 }
871 else
872 {
873 /*
874 * ChDir failed.
875 */
876 DosReleaseMutexSem(hmtxExec);
877 pJobOutput->cchOutput += sprintf(&pJobOutput->szOutput[pJobOutput->cchOutput ],
878 "Failed to set current directory to: %s\n",
879 pJob->JobInfo.szCurrentDir);
880 pJob->rc = -1;
881 DosClose(hPipeR);
882 }
883
884
885 /*
886 * Insert result in result queue.
887 */
888 if (DosRequestMutexSem(hmtxJobQueueFine, SEM_INDEFINITE_WAIT))
889 return;
890 pJob->pNext = NULL;
891 if (!pJob->rc) /* 0 on success. */
892 {
893 if (pJobCompletedLast)
894 pJobCompletedLast->pNext = pJob;
895 else
896 pJobCompleted = pJob;
897 pJobCompletedLast = pJob;
898 }
899 else
900 {
901 if (pJobFailedLast)
902 pJobFailedLast->pNext = pJob;
903 else
904 pJobFailed = pJob;
905 pJobFailedLast = pJob;
906 }
907 cJobsFinished++;
908 DosReleaseMutexSem(hmtxJobQueueFine);
909 /* wake up Wait. */
910 DosPostEventSem(hevJobQueueFine);
911 //printf("debug-%d: fine\n", iWorkerId);
912 }
913 }
914}
915
916
917/**
918 * Submits a command to the daemon.
919 * @returns 0 on success.
920 * -3 on failure.
921 * @param rcIgnore Ignores returcodes ranging from 0 to rcIgnore.
922 */
923int Submit(int rcIgnore)
924{
925 int i;
926 int cch;
927 int rc;
928 char * psz;
929 PPIB ppib;
930 PTIB ptib;
931
932 DosGetInfoBlocks(&ptib, &ppib);
933 rc = shrmemOpen();
934 if (rc)
935 return rc;
936
937 /*
938 * Build message.
939 */
940 pShrMem->enmMsgType = msgSubmit;
941 pShrMem->u1.Submit.rcIgnore = rcIgnore;
942 _getcwd(pShrMem->u1.Submit.szCurrentDir, sizeof(pShrMem->u1.Submit.szCurrentDir));
943
944 /* command */
945 psz = ppib->pib_pchcmd;
946 psz += strlen(psz) + 1 + 7; /* 7 = strlen("submit ")*/
947 while (*psz == ' ' || *psz == '\t')
948 psz++;
949 if (*psz == '-')
950 {
951 while (*psz != ' ' && *psz != '\t')
952 psz++;
953 while (*psz == ' ' || *psz == '\t')
954 psz++;
955 }
956 cch = strlen(psz) + 1;
957 if (cch > sizeof(pShrMem->u1.Submit.szCommand))
958 {
959 Error("Fatal error: Command too long.\n");
960 shrmemFree();
961 return -1;
962 }
963 memcpy(&pShrMem->u1.Submit.szCommand[0], psz, cch);
964
965 /* environment */
966 for (cch = 1, psz = ppib->pib_pchenv; *psz != '\0';)
967 {
968 int cchVar = strlen(psz) + 1;
969 cch += cchVar;
970 psz += cchVar;
971 }
972 if ( ppib->pib_pchenv[cch-2] != '\0'
973 || ppib->pib_pchenv[cch-1] != '\0')
974 {
975 Error("internal error\n");
976 return -1;
977 }
978 if (cch > sizeof(pShrMem->u1.Submit.szzEnv))
979 {
980 Error("Fatal error: environment is to bit, cchEnv=%d\n", cch);
981 shrmemFree();
982 return -ERROR_BAD_ENVIRONMENT;
983 }
984 pShrMem->u1.Submit.cchEnv = cch;
985 memcpy(&pShrMem->u1.Submit.szzEnv[0], ppib->pib_pchenv, cch);
986
987
988 /*
989 * Send message and get respons.
990 */
991 rc = shrmemSendClient(msgSubmitResponse);
992 if (rc)
993 {
994 shrmemFree();
995 return rc;
996 }
997
998 rc = !pShrMem->u1.SubmitResponse.fRc;
999 shrmemFree();
1000 return rc;
1001}
1002
1003
1004/**
1005 * Waits for the commands to complete.
1006 * Will write all output from completed command to stdout.
1007 * Will write failing commands last.
1008 * @returns Count of failing commands.
1009 */
1010int Wait(void)
1011{
1012 int rc;
1013
1014 rc = shrmemOpen();
1015 if (rc)
1016 return rc;
1017 do
1018 {
1019 pShrMem->enmMsgType = msgWait;
1020 pShrMem->u1.Wait.iNothing = 0;
1021 rc = shrmemSendClient(msgWaitResponse);
1022 if (rc)
1023 {
1024 shrmemFree();
1025 return -1;
1026 }
1027 printf("%s", pShrMem->u1.WaitResponse.szOutput);
1028 /*
1029 * Release the client mutex if more data and yield the CPU.
1030 * So we can submit more work. (Odin nmake lib...)
1031 */
1032 if (pShrMem->u1.WaitResponse.fMore)
1033 {
1034 DosReleaseMutexSem(pShrMem->hmtxClient);
1035 DosSleep(0);
1036 rc = DosRequestMutexSem(pShrMem->hmtxClient, SEM_INDEFINITE_WAIT);
1037 if (rc)
1038 {
1039 Error("Fatal error: failed to get client mutex. rc=%d\n", rc);
1040 shrmemFree();
1041 return -1;
1042 }
1043 }
1044 } while (pShrMem->u1.WaitResponse.fMore);
1045
1046 rc = pShrMem->u1.WaitResponse.rc;
1047 shrmemFree();
1048 return rc;
1049}
1050
1051
1052/**
1053 * Sends a kill command to the daemon to kill it and its workers.
1054 * @returns 0.
1055 */
1056int Kill(void)
1057{
1058 int rc;
1059
1060 rc = shrmemOpen();
1061 if (rc)
1062 return rc;
1063
1064 pShrMem->enmMsgType = msgKill;
1065 pShrMem->u1.Kill.iNothing = 0;
1066 rc = shrmemSendClient(msgKillResponse);
1067 if (!rc)
1068 rc = !pShrMem->u1.KillResponse.fRc;
1069
1070 shrmemFree();
1071 return rc;
1072}
1073
1074
1075/**
1076 * Creates the shared memory area.
1077 * The creator owns the memory when created.
1078 * @returns 0 on success. Error code on error.
1079 */
1080int shrmemCreate(void)
1081{
1082 int rc;
1083 rc = DosAllocSharedMem((PPVOID)&pShrMem,
1084 SHARED_MEM_NAME,
1085 SHARED_MEM_SIZE,
1086 PAG_COMMIT | PAG_READ | PAG_WRITE);
1087 if (rc)
1088 {
1089 Error("Fatal error: Failed to create shared memory object. rc=%d\n", rc);
1090 return rc;
1091 }
1092
1093 rc = DosCreateEventSem(NULL, &pShrMem->hevDaemon, DC_SEM_SHARED, FALSE);
1094 if (rc)
1095 {
1096 Error("Fatal error: Failed to create daemon event semaphore. rc=%d\n", rc);
1097 DosFreeMem(pShrMem);
1098 return rc;
1099 }
1100
1101 rc = DosCreateEventSem(NULL, &pShrMem->hevClient, DC_SEM_SHARED, FALSE);
1102 if (rc)
1103 {
1104 Error("Fatal error: Failed to create client event semaphore. rc=%d\n", rc);
1105 DosCloseEventSem(pShrMem->hevDaemon);
1106 DosFreeMem(pShrMem);
1107 return rc;
1108 }
1109
1110 rc = DosCreateMutexSem(NULL, &pShrMem->hmtx, DC_SEM_SHARED, TRUE);
1111 if (rc)
1112 {
1113 Error("Fatal error: Failed to create mutex semaphore. rc=%d\n", rc);
1114 DosCloseEventSem(pShrMem->hevClient);
1115 DosCloseEventSem(pShrMem->hevDaemon);
1116 DosFreeMem(pShrMem);
1117 return rc;
1118 }
1119
1120 rc = DosCreateMutexSem(NULL, &pShrMem->hmtxClient, DC_SEM_SHARED, FALSE);
1121 if (rc)
1122 {
1123 Error("Fatal error: Failed to create client mutex semaphore. rc=%d\n", rc);
1124 DosCloseEventSem(pShrMem->hevClient);
1125 DosCloseEventSem(pShrMem->hevClient);
1126 DosCloseEventSem(pShrMem->hevDaemon);
1127 DosFreeMem(pShrMem);
1128 return rc;
1129 }
1130
1131
1132 /*
1133 * Install signal handlers.
1134 */
1135 signal(SIGSEGV, signalhandler);
1136 signal(SIGTERM, signalhandler);
1137 signal(SIGABRT, signalhandler);
1138 signal(SIGINT, signalhandler);
1139 signal(SIGBREAK,signalhandler);
1140
1141 return rc;
1142}
1143
1144
1145/**
1146 * Opens the shared memory and the semaphores.
1147 * The caller is owner of the memory upon successful return.
1148 * @returns 0 on success. Error code on error.
1149 */
1150int shrmemOpen(void)
1151{
1152 int rc;
1153 ULONG ulDummy;
1154 rc = DosGetNamedSharedMem((PPVOID)&pShrMem,
1155 SHARED_MEM_NAME,
1156 PAG_READ | PAG_WRITE);
1157 if (rc)
1158 {
1159 Error("Fatal error: Failed to open shared memory. rc=%d\n");
1160 return rc;
1161 }
1162
1163 rc = DosOpenEventSem(NULL, &pShrMem->hevClient);
1164 if (rc)
1165 {
1166 Error("Fatal error: Failed to open client event semaphore. rc=%d\n");
1167 DosFreeMem(pShrMem);
1168 return rc;
1169 }
1170
1171 rc = DosOpenEventSem(NULL, &pShrMem->hevDaemon);
1172 if (rc)
1173 {
1174 Error("Fatal error: Failed to open daemon event semaphore. rc=%d\n");
1175 DosCloseEventSem(pShrMem->hevClient);
1176 DosFreeMem(pShrMem);
1177 return rc;
1178 }
1179
1180 rc = DosOpenMutexSem(NULL, &pShrMem->hmtx);
1181 if (rc)
1182 {
1183 Error("Fatal error: Failed to open mutex semaphore. rc=%d\n");
1184 DosCloseEventSem(pShrMem->hevClient);
1185 DosCloseEventSem(pShrMem->hevDaemon);
1186 DosFreeMem(pShrMem);
1187 return rc;
1188 }
1189
1190 rc = DosOpenMutexSem(NULL, &pShrMem->hmtxClient);
1191 if (rc)
1192 {
1193 Error("Fatal error: Failed to open client mutex semaphore. rc=%d\n");
1194 DosCloseEventSem(pShrMem->hevClient);
1195 DosCloseEventSem(pShrMem->hevDaemon);
1196 DosCloseMutexSem(pShrMem->hmtx);
1197 DosFreeMem(pShrMem);
1198 return rc;
1199 }
1200
1201 rc = DosRequestMutexSem(pShrMem->hmtxClient, SEM_INDEFINITE_WAIT);
1202 if (rc)
1203 {
1204 Error("Fatal error: Failed to open aquire ownership of client mutex semaphore. rc=%d\n");
1205 shrmemFree();
1206 return rc;
1207 }
1208
1209 rc = DosRequestMutexSem(pShrMem->hmtx, SEM_INDEFINITE_WAIT);
1210 if (rc)
1211 {
1212 Error("Fatal error: Failed to open aquire ownership of mutex semaphore. rc=%d\n");
1213 shrmemFree();
1214 return rc;
1215 }
1216
1217
1218 /*
1219 * Install signal handlers.
1220 */
1221 signal(SIGSEGV, signalhandler);
1222 signal(SIGTERM, signalhandler);
1223 signal(SIGABRT, signalhandler);
1224 signal(SIGINT, signalhandler);
1225 signal(SIGBREAK,signalhandler);
1226
1227 return rc;
1228}
1229
1230
1231/**
1232 * Frees the shared memory and the associated semaphores.
1233 */
1234void shrmemFree(void)
1235{
1236 if (!pShrMem)
1237 return;
1238 DosReleaseMutexSem(pShrMem->hmtxClient);
1239 DosReleaseMutexSem(pShrMem->hmtx);
1240 DosCloseMutexSem(pShrMem->hmtxClient);
1241 DosCloseMutexSem(pShrMem->hmtx);
1242 DosCloseEventSem(pShrMem->hevClient);
1243 DosCloseEventSem(pShrMem->hevDaemon);
1244 DosFreeMem(pShrMem);
1245 pShrMem = NULL;
1246}
1247
1248
1249/**
1250 * Daemon sends a message.
1251 * Upon we don't own the shared memory any longer.
1252 * @returns 0 on success. Error code on error.
1253 * -1 on timeout.
1254 * @param fWait Wait for new message.
1255 */
1256int shrmemSendDaemon(BOOL fWait)
1257{
1258 ULONG ulDummy;
1259 int rc;
1260
1261 /* send message */
1262 DosResetEventSem(pShrMem->hevDaemon, &ulDummy);
1263 rc = DosReleaseMutexSem(pShrMem->hmtx);
1264 if (!rc)
1265 rc = DosPostEventSem(pShrMem->hevClient);
1266
1267 /* wait for next message */
1268 if (!rc && fWait)
1269 {
1270 do
1271 {
1272 rc = DosWaitEventSem(pShrMem->hevDaemon, IDLE_TIMEOUT_MS);
1273 } while (rc == ERROR_TIMEOUT && pJobQueue);
1274
1275 if (rc == ERROR_TIMEOUT)
1276 {
1277 DosRequestMutexSem(pShrMem->hmtx, SEM_INDEFINITE_WAIT);
1278 shrmemFree();
1279 return -1;
1280 }
1281
1282 if (!rc)
1283 rc = DosRequestMutexSem(pShrMem->hmtx, SEM_INDEFINITE_WAIT);
1284 if (rc && rc != ERROR_INTERRUPT)
1285 Error("Internal error: failed to get next message from daemon, rc=%d\n", rc);
1286 }
1287 else
1288 Error("Internal error: failed to send message from daemon, rc=%d\n", rc);
1289 return rc;
1290}
1291
1292
1293/**
1294 * Client sends a message.
1295 * Upon we don't own the shared memory any longer.
1296 * @returns 0 on success. Error code on error.
1297 * @param enmMsgTypeResponse The expected response on this message.
1298 */
1299int shrmemSendClient(int enmMsgTypeResponse)
1300{
1301 ULONG ulDummy;
1302 int rc;
1303
1304 /* send message */
1305 DosResetEventSem(pShrMem->hevClient, &ulDummy);
1306 rc = DosReleaseMutexSem(pShrMem->hmtx);
1307 if (!rc)
1308 rc = DosPostEventSem(pShrMem->hevDaemon);
1309
1310 /* wait for response */
1311 if (!rc)
1312 {
1313 rc = DosWaitEventSem(pShrMem->hevClient, SEM_INDEFINITE_WAIT);
1314 if (!rc)
1315 {
1316 rc = DosRequestMutexSem(pShrMem->hmtx, SEM_INDEFINITE_WAIT);
1317 if (!rc && pShrMem->enmMsgType != enmMsgTypeResponse)
1318 {
1319 Error("Internal error: Invalid response message. response=%d expected=%d\n",
1320 pShrMem->enmMsgType, enmMsgTypeResponse);
1321 return -1;
1322 }
1323 }
1324 if (rc && rc != ERROR_INTERRUPT)
1325 Error("Internal error: failed to get response message from daemon, rc=%d\n", rc);
1326 }
1327 else
1328 Error("Internal error: failed to send message to daemon, rc=%d\n", rc);
1329
1330 return rc;
1331}
1332
1333
1334/**
1335 * printf lookalike used to print all run-tim errors.
1336 * @param pszFormat Format string.
1337 * @param ... Arguments (optional).
1338 */
1339void Error(const char *pszFormat, ...)
1340{
1341 va_list arg;
1342
1343 va_start(arg, pszFormat);
1344 vfprintf(stdout, pszFormat, arg);
1345 va_end(arg);
1346}
1347
Note: See TracBrowser for help on using the repository browser.