Changeset 913 for trunk/src


Ignore:
Timestamp:
Jul 28, 2011, 12:43:59 AM (14 years ago)
Author:
Dmitry A. Kuminov
Message:

OS/2: QProcess: New I/O pipe notification mechanism.

This remake is to fix a number of problems such as hangs
and truncated data during interprocess communication using
redirection of standard I/O channels through the QIODevice
interface provided by QProcess. See #199.

Location:
trunk/src/corelib/io
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/corelib/io/qprocess.cpp

    r911 r913  
    842842    destroyPipe(childStartedPipe);
    843843    destroyPipe(deathPipe);
    844 #else
    845     pipeData[InPipe].bytesLeft = 0;
    846     pipeData[OutPipe].bytesLeft = pipeData[ErrPipe].bytesLeft = 0;
    847     pipeData[InPipe].newBytes = 0;
    848     pipeData[OutPipe].newBytes = pipeData[ErrPipe].newBytes = 0;
    849844#endif
    850845#ifdef Q_OS_UNIX
     
    18771872            d->stdinChannel.notifier->setEnabled(true);
    18781873#if defined QPROCESS_DEBUG
    1879     qDebug("QProcess::writeData(%p \"%s\", %lld) == 1 (written to buffer)",
    1880            data, qt_prettyDebug(data, len, 16).constData(), len);
     1874        qDebug("QProcess::writeData(%p \"%s\", %lld) == 1 (written to buffer)",
     1875               data, qt_prettyDebug(data, len, 16).constData(), len);
    18811876#endif
    18821877#if defined(Q_OS_OS2)
    18831878        // try to write some bytes (there may be space in the pipe)
    1884         d->_q_canWrite();
     1879        d->_q_notified(QProcessPrivate::CanWrite);
    18851880#endif
    18861881        return 1;
     
    18961891#endif
    18971892#if defined(Q_OS_OS2)
    1898         // try to write some bytes (there may be space in the pipe)
    1899         d->_q_canWrite();
     1893    // try to write some bytes (there may be space in the pipe)
     1894    d->_q_notified(QProcessPrivate::CanWrite);
    19001895#endif
    19011896    return len;
  • trunk/src/corelib/io/qprocess.h

    r846 r913  
    231231    Q_PRIVATE_SLOT(d_func(), bool _q_startupNotification())
    232232    Q_PRIVATE_SLOT(d_func(), bool _q_processDied())
     233#if defined(Q_OS_OS2)
     234    Q_PRIVATE_SLOT(d_func(), void _q_notified(int))
     235#else
    233236    Q_PRIVATE_SLOT(d_func(), void _q_notified())
     237#endif
    234238    friend class QProcessManager;
    235239};
  • trunk/src/corelib/io/qprocess_os2.cpp

    r911 r913  
    126126    static void removeProcess(USHORT procKey);
    127127
     128    static QMutex *pipeStateLock() {
     129        Q_ASSERT(instance);
     130        return &instance->pipeStatLock;
     131    }
     132
    128133private:
    129134
     
    141146    QAtomicInt eventSemGuard;
    142147    QAtomicInt deathFlag;
     148    QMutex pipeStatLock;
    143149
    144150    typedef QHash<USHORT, QProcess *> ProcessList;
     
    171177{
    172178#if defined (QPROCESS_DEBUG)
    173     fprintf(stderr, "*** SIGCHLD\n");
     179    //fprintf(stderr, "*** SIGCHLD\n");
    174180#endif
    175181
     
    381387    DEBUG(() << "QProcessManager::run() BEGIN");
    382388
    383     // Note: the rationale behind using a worker thread so far is that
    384     // calling complex functions from a signal handler is not really a good
     389    // Note: the rationale behind using a worker thread for death detection is
     390    // that calling complex functions from a signal handler is not really a good
    385391    // idea unless there is a 100% guarantee that they are reentrant. So, the
    386392    // handler only posts a semaphore (I *hope* DosPostEventSem is reentrant)
     
    405411            DEBUG(() << "QProcessManager::run(): child death signaled");
    406412            foreach (QProcess *proc, processes) {
    407                 QProcessPrivate::WaitMode mode = (QProcessPrivate::WaitMode)
    408                     proc->d_func()->waitMode.fetchAndStoreRelaxed(QProcessPrivate::SigChild);
    409                 switch (mode) {
    410                 case QProcessPrivate::Semaphore:
     413                if (proc->d_func()->waitMode) {
    411414                    DosPostEventSem(proc->d_func()->waitSem);
    412                     break;
    413                 case QProcessPrivate::Async:
    414                 case QProcessPrivate::SigChild:
    415                     QMetaObject::invokeMethod(proc, "_q_processDied", Qt::QueuedConnection);
    416                     break;
     415                } else {
     416                    QMetaObject::invokeMethod(proc, "_q_notified", Qt::QueuedConnection,
     417                                              Q_ARG(int, QProcessPrivate::CanDie));
    417418                }
    418419            }
     
    435436        }
    436437
     438        // Note: OS/2 posts the semaphore once for each new pipe event but
     439        // DosQueryNPipeSemState() returns all previous events (already posted
     440        // on their own) that are still up to date. This makes it impossible to
     441        // distinguish between the repeated events and unique ones here. This is
     442        // done in _q_notified() using some heuristic. Due to the same reason of
     443        // duplicate events we do a lock here to be in sync with writeToStdin().
     444        // Note that all this hassle would not be necessary at all if there were
     445        // a method to get the number of free bytes in the write buf of the pipe
     446        // (similar to DosPeekNPipe() that checks the read buf) but there isn't.
     447
     448        QMutexLocker lock(pipeStateLock());
     449
    437450        arc = DosQueryNPipeSemState((HSEM)eventSem, pipeStates, pipeStatesSize);
    438451        if (arc != NO_ERROR) {
     
    444457        // same pipe key may be mixed. We need CLOSE messages to be posted after
    445458        // READ messages, so we do two passes.
     459       
     460        // @todo We don't need two passes any more!
    446461
    447462        int pass = 0;
     
    457472            if (pass == 1 && status != NPSS_CLOSE)
    458473                continue;
    459             DEBUG((" %d/%d: fStatus %u fFlag f%02X usKey %04hX usAvail %hu",
     474            DEBUG((" %d/%d: fStatus %u fFlag %02X usKey %04hX usAvail %hu",
    460475                   pass, i, (uint) pipeStates[i].fStatus,
    461476                   (uint) pipeStates[i].fFlag, pipeStates[i].usKey,
     
    467482            Q_ASSERT(proc);
    468483            DEBUG(("  process %p", proc));
    469             QProcessPrivate *d = proc->d_func();
    470 
    471             if (status == NPSS_CLOSE) {
    472                 // nothing to do but notify the object; it should close the
    473                 // pipe if it sees that it has been notified but there is no
    474                 // data to read, or if it fails to write to a (closed) pipe
     484
     485            int flags = 0;
     486            switch(type) {
     487            case QProcessPrivate::InPipe:
     488                Q_ASSERT(status == NPSS_CLOSE || status == NPSS_WSPACE);
     489                flags |= QProcessPrivate::CanWrite;
     490                // save the current number of free bytes in the pipe to let
     491                // _q_canWrite() go (this is also used in tryCloseStdinPipe())
     492                proc->d_func()->pipeData[type].bytes = pipeStates[i].usAvail;
     493                break;
     494            case QProcessPrivate::OutPipe:
     495                Q_ASSERT(status == NPSS_CLOSE || status == NPSS_RDATA);
     496                flags |= QProcessPrivate::CanReadStdOut;
     497                break;
     498            case QProcessPrivate::ErrPipe:
     499                Q_ASSERT(status == NPSS_CLOSE || status == NPSS_RDATA);
     500                flags |= QProcessPrivate::CanReadStdOut;
     501                break;
     502            }
     503
     504            if (proc->d_func()->waitMode) {
     505                DosPostEventSem(proc->d_func()->waitSem);
    475506            } else {
    476                 // update the counter
    477                 if (d->pipeData[type].newBytes.fetchAndStoreRelaxed(pipeStates[i].usAvail) != 0) {
    478                     // the object didn't process the previous notification yet;
    479                     // there is no point to send a new one
    480                     continue;
    481                 }
    482             }
    483 
    484             // signal the process object
    485             QProcessPrivate::WaitMode mode = (QProcessPrivate::WaitMode)
    486                 proc->d_func()->waitMode.fetchAndStoreRelaxed(QProcessPrivate::SigChild);
    487             switch (mode) {
    488             case QProcessPrivate::Semaphore:
    489                 DosPostEventSem(proc->d_func()->waitSem);
    490                 break;
    491             case QProcessPrivate::Async:
    492             case QProcessPrivate::SigChild:
    493                 const char *method = 0;
    494                 switch(type) {
    495                 case QProcessPrivate::InPipe:
    496                     Q_ASSERT(status == NPSS_CLOSE || status == NPSS_WSPACE);
    497                     method = "_q_canWrite";
    498                     break;
    499                 case QProcessPrivate::OutPipe:
    500                     Q_ASSERT(status == NPSS_CLOSE || status == NPSS_RDATA);
    501                     method = "_q_canReadStandardOutput";
    502                     break;
    503                 case QProcessPrivate::ErrPipe:
    504                     Q_ASSERT(status == NPSS_CLOSE || status == NPSS_RDATA);
    505                     method = "_q_canReadStandardError";
    506                     break;
    507                 }
    508                 QMetaObject::invokeMethod(proc, method, Qt::QueuedConnection);
    509                 break;
     507                QMetaObject::invokeMethod(proc, "_q_notified", Qt::QueuedConnection,
     508                                          Q_ARG(int, flags));
    510509            }
    511510        }
     
    520519void QProcessPrivate::init()
    521520{
    522     waitMode = Async;
     521    waitMode = false;
    523522    waitSem = NULLHANDLE;
     523    memset(pipeData, 0, sizeof(pipeData));
    524524
    525525    procKey = QProcessManager::InvalidProcKey;
     
    577577                         OPEN_ACCESS_READONLY | OPEN_SHARE_DENYREADWRITE |
    578578                         OPEN_FLAGS_NOINHERIT, (PEAOP2)NULL);
     579            if (rc == NO_ERROR) {
     580                // set the initial number of free bytes
     581                pipeData[type].bytes = PIPE_SIZE_STDIN;
     582            }
    579583        }
    580584        break;
     
    10641068    }
    10651069
    1066     pipeData[InPipe].bytesLeft = PIPE_SIZE_STDIN;
    1067     pipeData[OutPipe].bytesLeft = 0;
    1068     pipeData[ErrPipe].bytesLeft = 0;
     1070    pipeData[InPipe].signaled = false;
     1071    pipeData[OutPipe].signaled = false;
     1072    pipeData[ErrPipe].signaled = false;
    10691073
    10701074    procKey = QProcessManager::addProcess(q);
     
    12461250qint64 QProcessPrivate::bytesAvailableFromStdout() const
    12471251{
    1248     QProcessPrivate* that = const_cast<QProcessPrivate*>(this);
    1249 
    1250     int newBytes = 0;
    1251     if (dying) {
    1252         // we are dying and won't get notifications from QProcessManager
    1253         // anymore, look manually if there's stiil something in the pipe
    1254         APIRET arc;
    1255         ULONG state;
     1252    if (!dying && pipeData[OutPipe].signaled) {
     1253        // reuse the number we stored in _q_notified()
     1254        DEBUG(("QProcessPrivate::bytesAvailableFromStdout() == %lld (reused)",
     1255               pipeData[OutPipe].bytes));
     1256        return pipeData[OutPipe].bytes;
     1257    }
     1258
     1259    qint64 bytes = 0;
     1260    if (stdoutChannel.pipe.server != HPIPE(~0) &&
     1261        (!dying || stdoutChannel.type == QProcessPrivate::Channel::Normal)) {
     1262        ULONG state, dummy;
    12561263        AVAILDATA avail;
    1257         if (that->stdoutChannel.type == QProcessPrivate::Channel::Normal &&
    1258             that->stdoutChannel.pipe.server != HPIPE(~0)) {
    1259             arc = DosPeekNPipe(that->stdoutChannel.pipe.server, 0, 0, 0, &avail, &state);
    1260             Q_ASSERT(arc == NO_ERROR || arc == ERROR_INVALID_PARAMETER);
    1261             // note that even if ERROR_INVALID_PARAMETER, it seems to return the
    1262             // correct values in avail and state (undocumented)
    1263             newBytes = avail.cbpipe;
    1264             that->pipeData[OutPipe].newBytes = 0;
    1265         }
    1266     } else {
    1267         // grab new bytes from QProcessManager (if any)
    1268         newBytes = that->pipeData[OutPipe].newBytes.fetchAndStoreRelaxed(0);
    1269     }
    1270 
    1271     if (newBytes)
    1272         that->pipeData[OutPipe].bytesLeft = newBytes;
    1273 
    1274     DEBUG(("QProcessPrivate::bytesAvailableFromStdout() == %lld",
    1275            that->pipeData[OutPipe].bytesLeft));
    1276     return that->pipeData[OutPipe].bytesLeft;
     1264        APIRET arc = DosPeekNPipe(stdoutChannel.pipe.server, 0, 0, &dummy,
     1265                                  &avail, &state);
     1266        Q_ASSERT(arc == NO_ERROR);
     1267        bytes = (qint64)avail.cbpipe;
     1268        if (!dying && state != NP_STATE_CONNECTED) {
     1269            // if the pipe is closed, we negate the result (to preserve the
     1270            // number of bytes remaining) and add -1 to it (to distinguish from
     1271            // the case when there's no data byt the pipe is not closed)
     1272            bytes = -1 - bytes;
     1273        }
     1274    }
     1275
     1276    DEBUG(("QProcessPrivate::bytesAvailableFromStdout() == %lld", bytes));
     1277    return bytes;
    12771278}
    12781279
    12791280qint64 QProcessPrivate::bytesAvailableFromStderr() const
    12801281{
    1281     QProcessPrivate* that = const_cast<QProcessPrivate*>(this);
    1282 
    1283     int newBytes = 0;
    1284     if (dying) {
    1285         // we are dying and won't get notifications from QProcessManager
    1286         // anymore, look manually if there's stiil something in the pipe
    1287         APIRET arc;
    1288         ULONG state;
     1282    if (!dying && pipeData[ErrPipe].signaled) {
     1283        // reuse the number we stored in _q_notified()
     1284        DEBUG(("QProcessPrivate::bytesAvailableFromStderr() == %lld (reused)",
     1285               pipeData[ErrPipe].bytes));
     1286        return pipeData[ErrPipe].bytes;
     1287    }
     1288
     1289    qint64 bytes = 0;
     1290    if (stderrChannel.pipe.server != HPIPE(~0) &&
     1291        (!dying || stderrChannel.type == QProcessPrivate::Channel::Normal)) {
     1292        ULONG state, dummy;
    12891293        AVAILDATA avail;
    1290         if (that->stderrChannel.type == QProcessPrivate::Channel::Normal &&
    1291             that->stderrChannel.pipe.server != HPIPE(~0)) {
    1292             arc = DosPeekNPipe(that->stderrChannel.pipe.server, 0, 0, 0, &avail, &state);
    1293             Q_ASSERT(arc == NO_ERROR || arc == ERROR_INVALID_PARAMETER);
    1294             // note that even if ERROR_INVALID_PARAMETER, it seems to return the
    1295             // correct values in avail and state (undocumented)
    1296             newBytes = avail.cbpipe;
    1297             that->pipeData[ErrPipe].newBytes = 0;
    1298         }
    1299     } else {
    1300         // grab new bytes from QProcessManager (if any)
    1301         newBytes = that->pipeData[ErrPipe].newBytes.fetchAndStoreRelaxed(0);
    1302     }
    1303 
    1304     if (newBytes)
    1305         that->pipeData[ErrPipe].bytesLeft = newBytes;
    1306 
    1307     DEBUG(("QProcessPrivate::bytesAvailableFromStderr() == %lld",
    1308            that->pipeData[ErrPipe].bytesLeft));
    1309     return that->pipeData[ErrPipe].bytesLeft;
     1294        APIRET arc = DosPeekNPipe(stderrChannel.pipe.server, 0, 0, &dummy,
     1295                                  &avail, &state);
     1296        Q_ASSERT(arc == NO_ERROR);
     1297        bytes = (qint64)avail.cbpipe;
     1298        if (!dying && state != NP_STATE_CONNECTED) {
     1299            // if the pipe is closed, we negate the result (to preserve the
     1300            // number of bytes remaining) and add -1 to it (to distinguish from
     1301            // the case when there's no data byt the pipe is not closed)
     1302            bytes = -1 - bytes;
     1303        }
     1304    }
     1305
     1306    DEBUG(("QProcessPrivate::bytesAvailableFromStderr() == %lld", bytes));
     1307    return bytes;
    13101308}
    13111309
     
    13181316    if (arc == NO_ERROR) {
    13191317        bytesRead = (qint64)actual;
    1320         // update our counter
    1321         Q_ASSERT(pipeData[OutPipe].bytesLeft >= bytesRead);
    1322         pipeData[OutPipe].bytesLeft -= bytesRead;
    13231318    }
    13241319
     
    13361331    if (arc == NO_ERROR) {
    13371332        bytesRead = (qint64)actual;
    1338         // update our counter
    1339         Q_ASSERT(pipeData[ErrPipe].bytesLeft >= bytesRead);
    1340         pipeData[ErrPipe].bytesLeft -= bytesRead;
    13411333    }
    13421334
     
    13511343
    13521344    if (stdinChannel.pipe.closePending) {
    1353         // grab new bytes from QProcessManager (if any)
    1354         int newBytes = pipeData[InPipe].newBytes.fetchAndStoreRelaxed(0);
    1355         if (newBytes)
    1356             pipeData[InPipe].bytesLeft = newBytes;
    1357         if (pipeData[InPipe].bytesLeft == PIPE_SIZE_STDIN) {
    1358             // the other end has read everything from the pipe buf so that it's
    1359             // safe to close it now (satisfy the closeWriteChannel() request)
     1345        // check if the other end has read everything from the pipe buf so that
     1346        // it's safe to close it (satisfy the closeWriteChannel() request) now
     1347        if (pipeData[InPipe].bytes == PIPE_SIZE_STDIN) {
    13601348            destroyPipe(stdinChannel.pipe);
    13611349        }
     
    13651353qint64 QProcessPrivate::writeToStdin(const char *data, qint64 maxlen)
    13661354{
    1367     QProcessPrivate* that = const_cast<QProcessPrivate*>(this);
    1368 
    1369     // grab new bytes from QProcessManager (if any)
    1370     int newBytes = that->pipeData[InPipe].newBytes.fetchAndStoreRelaxed(0);
    1371     if (newBytes)
    1372         that->pipeData[InPipe].bytesLeft = newBytes;
    1373 
    1374     DEBUG(("QProcessPrivate::writeToStdin(): %lld free bytes in pipe",
    1375            that->pipeData[InPipe].bytesLeft));
    1376     if (!that->pipeData[InPipe].bytesLeft)
    1377         return 0;
     1355    QMutexLocker lock(QProcessManager::pipeStateLock());
     1356
     1357    // Reset the number of bytes before writing to the pipe. This makes sure
     1358    // this method will be called again only after QProcessManager::run() sets
     1359    // bytes to non-zero and sends us a signal. Note that we do it under the
     1360    // lock to avoid cases when ProcessManager::run() overwrites this bytes
     1361    // field with an outdated value from the previous pipe event (that could get
     1362    // reported again due to another event fired on behalf of some other pipe).
     1363    pipeData[InPipe].bytes = 0;
    13781364
    13791365    ULONG actual = 0;
     
    13831369    if (arc == NO_ERROR) {
    13841370        written = (qint64)actual;
    1385         // update our counter
    1386         Q_ASSERT(pipeData[InPipe].bytesLeft >= written);
    1387         pipeData[InPipe].bytesLeft -= written;
    13881371    }
    13891372
     
    14571440}
    14581441
    1459 bool QProcessPrivate::waitForReadyRead(int msecs)
     1442bool QProcessPrivate::waitFor(WaitCond cond, int msecs)
    14601443{
    14611444    Q_Q(QProcess);
    1462     DEBUG(("QProcessPrivate::waitForReadyRead(%d)", msecs));
     1445
     1446#if defined QPROCESS_DEBUG
     1447    const char *condStr = cond == WaitReadyRead ? "ReadyRead" :
     1448                          cond == WaitBytesWritten ? "BytesWritten" :
     1449                          cond == WaitFinished ? "Finished" : "???";
     1450    DEBUG(("QProcessPrivate::waitFor(%s, %d)", condStr, msecs));
     1451#endif
    14631452
    14641453    QTime stopWatch;
     
    14661455
    14671456    APIRET arc;
     1457    bool ret = false;
     1458
     1459    waitMode = true;
    14681460    ensureWaitSem();
    14691461
     1462    // QProcessManager::run() could post a method invocation before noticing we
     1463    // entered waitMode, process it now to avoid an endless hang in wait state
     1464    // due to the absense of the notification via the semaphore
     1465    bool firstTime = true;
     1466    QCoreApplication::sendPostedEvents(q, QEvent::MetaCall);
     1467    if (QCoreApplication::instance() == NULL) {
     1468        // however, if there is no QApplication, _q_notified() won't be called
     1469        // by the above, only removed from the queue. So we need a manual call.
     1470        firstTime = false;
     1471    }
     1472
    14701473    forever {
    1471         bool timedOut = false, failed = false;
    1472         if (waitMode.testAndSetRelaxed(Async, Semaphore)) {
    1473             int timeout = qt_timeout_value(msecs, stopWatch.elapsed());
    1474             qDosNI(arc = DosWaitEventSem(waitSem, (ULONG)timeout));
    1475             if (arc == ERROR_TIMEOUT) {
    1476                 timedOut = true;
    1477             } else if (arc != NO_ERROR) {
    1478                 Q_ASSERT(arc == NO_ERROR);
    1479                 failed = true;
    1480             } else {
    1481                 bool readyReadEmitted = false;
    1482                 if (pipeData[OutPipe].newBytes.fetchAndAddRelaxed(0)) {
    1483                     bool canRead = _q_canReadStandardOutput();
    1484                     if (processChannel == QProcess::StandardOutput && canRead)
    1485                         readyReadEmitted = true;
     1474        if (firstTime) {
     1475            firstTime = false;
     1476        } else {
     1477            // check all conditions upon the signal from QProcessManager::run()
     1478            _q_notified(CanAll);
     1479        }
     1480
     1481        bool done = false;
     1482
     1483        switch (cond)
     1484        {
     1485            case WaitReadyRead: {
     1486                // check if there was a _q_canReadStandardOutput/Error() signal
     1487                // that got something from the pipe
     1488                if (processChannel == QProcess::StandardOutput &&
     1489                    pipeData[OutPipe].signaled) {
     1490                    ret = pipeData[OutPipe].result;
     1491                    done = true;
     1492                    break;
    14861493                }
    1487                 if (pipeData[ErrPipe].newBytes.fetchAndAddRelaxed(0)) {
    1488                     bool canRead = _q_canReadStandardError();
    1489                     if (processChannel == QProcess::StandardError && canRead)
    1490                         readyReadEmitted = true;
     1494                if (processChannel == QProcess::StandardError &&
     1495                    pipeData[ErrPipe].signaled) {
     1496                    ret = pipeData[ErrPipe].result;
     1497                    done = true;
     1498                    break;
    14911499                }
    1492                 if (readyReadEmitted) {
    1493                     waitMode.fetchAndStoreRelaxed(Async);
    1494                     return true;
     1500
     1501                // check if there was a _q_processDied() signal
     1502                if (dying || processState == QProcess::NotRunning) {
     1503                    done = true;
     1504                    break;
    14951505                }
    14961506
    1497                 if (pipeData[InPipe].newBytes.fetchAndAddRelaxed(0))
    1498                     _q_canWrite();
    1499 
    1500                 if (!pid)
    1501                     failed = true;
    1502             }
    1503         } else {
    1504             // we've got SIGCHLD, proceeed to _q_processDied()
    1505         }
    1506 
    1507         waitMode.fetchAndStoreRelaxed(Async);
    1508 
    1509         if (timedOut || failed) {
    1510             if (timedOut) {
    1511                 processError = QProcess::Timedout;
    1512                 q->setErrorString(QProcess::tr("Process operation timed out"));
    1513             }
     1507                break;
     1508            }
     1509
     1510            case WaitBytesWritten: {
     1511                // check if there was a _q_canWrite() signal that wrote
     1512                // something to the pipe
     1513                if (pipeData[InPipe].signaled) {
     1514                    ret = pipeData[InPipe].result;
     1515                    done = true;
     1516                    break;
     1517                }
     1518
     1519                // check if there was a _q_processDied() signal
     1520                if (dying || processState == QProcess::NotRunning) {
     1521                    done = true;
     1522                    break;
     1523                }
     1524
     1525                if (writeBuffer.isEmpty()) {
     1526                    done = true;
     1527                    break;
     1528                }
     1529
     1530                break;
     1531            }
     1532
     1533            case WaitFinished: {
     1534                // check if there was a _q_processDied() signal
     1535                if (dying || processState == QProcess::NotRunning) {
     1536                    ret = true;
     1537                    done = true;
     1538                    break;
     1539                }
     1540
     1541                break;
     1542            }
     1543        }
     1544
     1545        // reset all signaled flags
     1546        pipeData[OutPipe].signaled = false;
     1547        pipeData[ErrPipe].signaled = false;
     1548        pipeData[InPipe].signaled = false;
     1549
     1550        if (done)
    15141551            break;
    1515         }
    1516 
    1517         if (_q_processDied())
    1518             return false;
    1519     }
     1552
     1553        // wait for the new signals
     1554        int timeout = qt_timeout_value(msecs, stopWatch.elapsed());
     1555        qDosNI(arc = DosWaitEventSem(waitSem, (ULONG)timeout));
     1556
     1557        if (arc == ERROR_TIMEOUT) {
     1558            processError = QProcess::Timedout;
     1559            q->setErrorString(QProcess::tr("Process operation timed out"));
     1560            break;
     1561        } else if (arc != NO_ERROR) {
     1562            Q_ASSERT(arc == NO_ERROR);
     1563            break;
     1564        }
     1565    }
     1566
     1567    waitMode = false;
     1568
     1569    ULONG postCnt = 0;
     1570    arc = DosResetEventSem(waitSem, &postCnt);
     1571    if (arc == NO_ERROR && postCnt) {
     1572        // QProcessManager::run() posted the semaphore before seeing that we
     1573        // left waitMode, repost it as a method invocation to avoid signal loss
     1574        QMetaObject::invokeMethod(q, "_q_notified", Qt::QueuedConnection,
     1575                                  Q_ARG(int, QProcessPrivate::CanAll));
     1576    }
     1577
     1578    DEBUG(("QProcessPrivate::waitFor(%s, %d) returns %d", condStr, msecs, ret));
     1579    return ret;
     1580}
     1581
     1582bool QProcessPrivate::waitForReadyRead(int msecs)
     1583{
     1584    return waitFor(WaitReadyRead, msecs);
     1585}
     1586
     1587bool QProcessPrivate::waitForBytesWritten(int msecs)
     1588{
     1589    return waitFor(WaitBytesWritten, msecs);
     1590}
     1591
     1592bool QProcessPrivate::waitForFinished(int msecs)
     1593{
     1594    return waitFor(WaitFinished, msecs);
     1595}
     1596
     1597bool QProcessPrivate::waitForWrite(int msecs)
     1598{
     1599    // ### this function isn't actually used in OS/2 and Unix code paths
    15201600    return false;
    1521 }
    1522 
    1523 bool QProcessPrivate::waitForBytesWritten(int msecs)
    1524 {
    1525     Q_Q(QProcess);
    1526     DEBUG(("QProcessPrivate::waitForBytesWritten(%d)", msecs));
    1527 
    1528     QTime stopWatch;
    1529     stopWatch.start();
    1530 
    1531     APIRET arc;
    1532     ensureWaitSem();
    1533 
    1534     while (!writeBuffer.isEmpty()) {
    1535         bool timedOut = false, failed = false;
    1536         if (waitMode.testAndSetRelaxed(Async, Semaphore)) {
    1537             int timeout = qt_timeout_value(msecs, stopWatch.elapsed());
    1538             qDosNI(arc = DosWaitEventSem(waitSem, (ULONG)timeout));
    1539             if (arc == ERROR_TIMEOUT) {
    1540                 timedOut = true;
    1541             } else if (arc != NO_ERROR) {
    1542                 Q_ASSERT(arc == NO_ERROR);
    1543                 failed = true;
    1544             } else {
    1545                 if (pipeData[InPipe].newBytes.fetchAndAddRelaxed(0)) {
    1546                     waitMode.fetchAndStoreRelaxed(Async);
    1547                     return _q_canWrite();
    1548                 }
    1549 
    1550                 if (pipeData[OutPipe].newBytes.fetchAndAddRelaxed(0))
    1551                     _q_canReadStandardOutput();
    1552 
    1553                 if (pipeData[ErrPipe].newBytes.fetchAndAddRelaxed(0))
    1554                     _q_canReadStandardError();
    1555 
    1556                 if (!pid)
    1557                     failed = true;
    1558             }
    1559         } else {
    1560             // we've got SIGCHLD, proceeed to _q_processDied()
    1561         }
    1562 
    1563         waitMode.fetchAndStoreRelaxed(Async);
    1564 
    1565         if (timedOut || failed) {
    1566             if (timedOut) {
    1567                 processError = QProcess::Timedout;
    1568                 q->setErrorString(QProcess::tr("Process operation timed out"));
    1569             }
    1570             break;
    1571         }
    1572 
    1573         if (_q_processDied())
    1574             return false;
    1575     }
    1576 
    1577     return false;
    1578 }
    1579 
    1580 bool QProcessPrivate::waitForFinished(int msecs)
    1581 {
    1582     Q_Q(QProcess);
    1583     DEBUG(("QProcessPrivate::waitForFinished(%d)", msecs));
    1584 
    1585     QTime stopWatch;
    1586     stopWatch.start();
    1587 
    1588     APIRET arc;
    1589     ensureWaitSem();
    1590 
    1591     forever {
    1592         bool timedOut = false, failed = false;
    1593         if (waitMode.testAndSetRelaxed(Async, Semaphore)) {
    1594             int timeout = qt_timeout_value(msecs, stopWatch.elapsed());
    1595             qDosNI(arc = DosWaitEventSem(waitSem, (ULONG)timeout));
    1596             if (arc == ERROR_TIMEOUT) {
    1597                 timedOut = true;
    1598             } else if (arc != NO_ERROR) {
    1599                 Q_ASSERT(arc == NO_ERROR);
    1600                 failed = true;
    1601             } else {
    1602                 if (pipeData[InPipe].newBytes.fetchAndAddRelaxed(0))
    1603                     _q_canWrite();
    1604 
    1605                 if (pipeData[OutPipe].newBytes.fetchAndAddRelaxed(0))
    1606                     _q_canReadStandardOutput();
    1607 
    1608                 if (pipeData[ErrPipe].newBytes.fetchAndAddRelaxed(0))
    1609                     _q_canReadStandardError();
    1610 
    1611                 if (!pid) {
    1612                     waitMode.fetchAndStoreRelaxed(Async);
    1613                     return true;
    1614                 }
    1615             }
    1616         } else {
    1617             // we've got SIGCHLD, proceeed to _q_processDied()
    1618         }
    1619 
    1620         waitMode.fetchAndStoreRelaxed(Async);
    1621 
    1622         if (timedOut || failed) {
    1623             if (timedOut) {
    1624                 processError = QProcess::Timedout;
    1625                 q->setErrorString(QProcess::tr("Process operation timed out"));
    1626             }
    1627             break;
    1628         }
    1629 
    1630         if (_q_processDied())
    1631             return true;
    1632     }
    1633     return false;
    1634 }
    1635 
    1636 bool QProcessPrivate::waitForWrite(int msecs)
    1637 {
    1638     // ### this function isn't actually used in OS/2 and Unix code paths
    1639 
    1640     APIRET arc;
    1641     ensureWaitSem();
    1642 
    1643     bool ret = false;
    1644     if (waitMode.testAndSetRelaxed(Async, Semaphore)) {
    1645         qDosNI(arc = DosWaitEventSem(waitSem, (ULONG)msecs));
    1646         if (arc == NO_ERROR) {
    1647             ret = pipeData[InPipe].newBytes.fetchAndAddRelaxed(0);
    1648         }
    1649     }
    1650     waitMode.fetchAndStoreRelaxed(Async);
    1651     return ret;
    16521601}
    16531602
     
    16861635}
    16871636
    1688 void QProcessPrivate::_q_notified()
    1689 {
     1637void QProcessPrivate::_q_notified(int flags)
     1638{
     1639    DEBUG(("QProcessPrivate::_q_notified: flags %x", flags));
     1640
     1641    // note: in all read cases below, we look for the number of bytes actually
     1642    // available to sort out (ignore) outdated notifications from
     1643    // QProcessManager::run() indicated by 0 bytes and to detect closures of
     1644    // the remote end of the pipe indicated by bytes < 0
     1645
     1646    if (flags & CanReadStdOut) {
     1647        qint64 bytes = bytesAvailableFromStdout();
     1648        if (bytes) {
     1649            bool closed = false;
     1650            if (bytes < 0) {
     1651                bytes = -1 -bytes;
     1652                closed = true;
     1653            }
     1654            pipeData[OutPipe].bytes = bytes;
     1655            pipeData[OutPipe].signaled = true;
     1656            pipeData[OutPipe].result = _q_canReadStandardOutput();
     1657            if (closed && bytes) {
     1658                // ask _q_canReadStandardOutput() to close the pipe by setting
     1659                // bytes to 0 (only if not already done by the previous call)
     1660                pipeData[OutPipe].bytes = 0;
     1661                _q_canReadStandardOutput();
     1662            }
     1663            if (!waitMode) {
     1664                // reset the signaled flag
     1665                pipeData[OutPipe].signaled = false;
     1666            }
     1667#if defined QPROCESS_DEBUG
     1668        } else if (!waitMode) {
     1669            DEBUG(("QProcessPrivate::_q_notified: stale CanReadStdOut signal!"));
     1670#endif
     1671        }
     1672    }
     1673
     1674    if (flags & CanReadStdErr) {
     1675        qint64 bytes = bytesAvailableFromStderr();
     1676        if (bytes) {
     1677            bool closed = false;
     1678            if (bytes < 0) {
     1679                bytes = -1 -bytes;
     1680                closed = true;
     1681            }
     1682            pipeData[ErrPipe].bytes = bytes;
     1683            pipeData[ErrPipe].signaled = true;
     1684            pipeData[ErrPipe].result = _q_canReadStandardError();
     1685            if (closed && bytes) {
     1686                // ask _q_canReadStandardError() to close the pipe by setting
     1687                // bytes to 0 (only if not already done by the previous call)
     1688                pipeData[ErrPipe].bytes = 0;
     1689                _q_canReadStandardError();
     1690            }
     1691            if (!waitMode) {
     1692                // reset the signaled flag
     1693                pipeData[ErrPipe].signaled = false;
     1694            }
     1695#if defined QPROCESS_DEBUG
     1696        } else if (!waitMode) {
     1697            DEBUG(("QProcessPrivate::_q_notified: stale CanReadStdErr signal!"));
     1698#endif
     1699        }
     1700    }
     1701
     1702    if (flags & CanWrite) {
     1703        if (pipeData[InPipe].bytes) {
     1704            pipeData[InPipe].signaled = true;
     1705            pipeData[InPipe].result = _q_canWrite();
     1706            if (!waitMode) {
     1707                // reset the signaled flag
     1708                pipeData[InPipe].signaled = false;
     1709            }
     1710        }
     1711    }
     1712
     1713    if (flags & CanDie) {
     1714        _q_processDied();
     1715    }
    16901716}
    16911717
  • trunk/src/corelib/io/qprocess_p.h

    r911 r913  
    177177    bool _q_startupNotification();
    178178    bool _q_processDied();
     179#if defined(Q_OS_OS2)
     180    void _q_notified(int flags);
     181#else
    179182    void _q_notified();
     183#endif
    180184
    181185    QProcess::ProcessChannel processChannel;
     
    246250    void ensureWaitSem();
    247251    void tryCloseStdinPipe();
     252    enum WaitCond { WaitReadyRead, WaitBytesWritten, WaitFinished };
     253    bool waitFor(WaitCond cond, int msecs);
    248254#endif
    249255
     
    258264#endif
    259265#ifdef Q_OS_OS2
    260     enum WaitMode { Async, Semaphore, SigChild };
    261     QAtomicInt waitMode;
     266    enum { CanReadStdOut = 1, CanReadStdErr = 2, CanWrite = 4, CanDie = 8, CanAll = 0x0F };
     267    bool waitMode : 1;
    262268    HEV waitSem;
    263269    struct PipeData {
    264         QAtomicInt newBytes;
    265         qint64 bytesLeft;
     270        bool signaled: 1;
     271        bool result : 1;
     272        qint64 bytes;
    266273    };
    267274    PipeData pipeData[3]; // size must match PipeType!
Note: See TracChangeset for help on using the changeset viewer.