Changeset 1254 for python


Ignore:
Timestamp:
Nov 10, 2017, 12:00:15 AM (8 years ago)
Author:
dmik
Message:

python: Use spawn instead of fork in subprocess.

This significantly improves performance when running many external programs.
See #275 for more details.

Note that this commit also makes os.pipe use pipe() instead of socketpair() and
this in turn fixes #267. Note that socketpair() is still available via socket.socketpair()
in Python.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk/Lib/subprocess.py

    r603 r1254  
    396396import signal
    397397import errno
     398
     399os2 = (os.name == "os2")
    398400
    399401import sysconfig
     
    427429    class pywintypes:
    428430        error = IOError
     431elif os2:
     432    import threading
     433    import fcntl
     434    import time
    429435else:
    430436    import select
     
    661667            raise TypeError("bufsize must be an integer")
    662668
    663         if mswindows:
     669        if mswindows or os2:
    664670            if preexec_fn is not None:
    665671                raise ValueError("preexec_fn is not supported on Windows "
    666                                  "platforms")
    667             if close_fds and (stdin is not None or stdout is not None or
    668                               stderr is not None):
     672                                 "and OS/2 platforms")
     673            if not os2 and close_fds and (stdin is not None or stdout is not None or
     674                                          stderr is not None):
    669675                raise ValueError("close_fds is not supported on Windows "
    670676                                 "platforms if you redirect stdin/stdout/stderr")
    671         else:
     677        if not mswindows:
    672678            # POSIX
    673679            if startupinfo is not None:
     
    12121218                to_close.remove(fd)
    12131219
    1214             # For transferring possible exec failure from child to parent
    1215             # The first char specifies the exception type: 0 means
    1216             # OSError, 1 means some other error.
    1217             errpipe_read, errpipe_write = self.pipe_cloexec()
    1218             try:
     1220            if os2:
     1221                # We use spawn on OS/2 instead of fork because fork is very
     1222                # inefficient there (mostly due to the lack of COW page support
     1223                # in the kernel).
     1224                mode = os.P_NOWAIT
     1225                oldcwd = None
     1226                dups = [ None, None, None ]
     1227                cloexec_fds = set()
     1228
    12191229                try:
    1220                     gc_was_enabled = gc.isenabled()
    1221                     # Disable gc to avoid bug where gc -> file_dealloc ->
    1222                     # write to stderr -> hang.  http://bugs.python.org/issue1336
    1223                     gc.disable()
    1224                     try:
    1225                         self.pid = os.fork()
    1226                     except:
    1227                         if gc_was_enabled:
    1228                             gc.enable()
    1229                         raise
    1230                     self._child_created = True
    1231                     if self.pid == 0:
    1232                         # Child
    1233                         try:
    1234                             # Close parent's pipe ends
    1235                             if p2cwrite is not None:
    1236                                 os.close(p2cwrite)
    1237                             if c2pread is not None:
    1238                                 os.close(c2pread)
    1239                             if errread is not None:
    1240                                 os.close(errread)
    1241                             os.close(errpipe_read)
    1242 
    1243                             # When duping fds, if there arises a situation
    1244                             # where one of the fds is either 0, 1 or 2, it
    1245                             # is possible that it is overwritten (#12607).
    1246                             if c2pwrite == 0:
    1247                                 c2pwrite = os.dup(c2pwrite)
    1248                             if errwrite == 0 or errwrite == 1:
    1249                                 errwrite = os.dup(errwrite)
    1250 
    1251                             # Dup fds for child
    1252                             def _dup2(a, b):
    1253                                 # dup2() removes the CLOEXEC flag but
    1254                                 # we must do it ourselves if dup2()
    1255                                 # would be a no-op (issue #10806).
    1256                                 if a == b:
    1257                                     self._set_cloexec_flag(a, False)
    1258                                 elif a is not None:
    1259                                     os.dup2(a, b)
    1260                             _dup2(p2cread, 0)
    1261                             _dup2(c2pwrite, 1)
    1262                             _dup2(errwrite, 2)
    1263 
    1264                             # Close pipe fds.  Make sure we don't close the
    1265                             # same fd more than once, or standard fds.
    1266                             closed = { None }
    1267                             for fd in [p2cread, c2pwrite, errwrite]:
    1268                                 if fd not in closed and fd > 2:
    1269                                     os.close(fd)
    1270                                     closed.add(fd)
    1271 
    1272                             if cwd is not None:
    1273                                 os.chdir(cwd)
    1274 
    1275                             if preexec_fn:
    1276                                 preexec_fn()
    1277 
    1278                             # Close all other fds, if asked for - after
    1279                             # preexec_fn(), which may open FDs.
    1280                             if close_fds:
    1281                                 self._close_fds(but=errpipe_write)
    1282 
    1283                             if env is None:
    1284                                 os.execvp(executable, args)
    1285                             else:
    1286                                 os.execvpe(executable, args, env)
    1287 
    1288                         except:
    1289                             exc_type, exc_value, tb = sys.exc_info()
    1290                             # Save the traceback and attach it to the exception object
    1291                             exc_lines = traceback.format_exception(exc_type,
    1292                                                                    exc_value,
    1293                                                                    tb)
    1294                             exc_value.child_traceback = ''.join(exc_lines)
    1295                             os.write(errpipe_write, pickle.dumps(exc_value))
    1296 
    1297                         # This exitcode won't be reported to applications, so it
    1298                         # really doesn't matter what we return.
    1299                         os._exit(255)
    1300 
    1301                     # Parent
    1302                     if gc_was_enabled:
    1303                         gc.enable()
     1230                    # Change the directory if asked
     1231                    if cwd is not None:
     1232                        oldcwd = os.getcwd()
     1233                        os.chdir(cwd)
     1234
     1235                    # Duplicate stdio if needed
     1236                    if p2cread is not None:
     1237                        dups[0] = os.dup(0)
     1238                        os.dup2(p2cread, 0)
     1239                        self._set_cloexec_flag(dups[0])
     1240                    if c2pwrite is not None:
     1241                        dups[1] = os.dup(1)
     1242                        os.dup2(c2pwrite, 1)
     1243                        self._set_cloexec_flag(dups[1])
     1244                    if errwrite is not None:
     1245                        dups[2] = os.dup(2)
     1246                        os.dup2(errwrite, 2)
     1247                        self._set_cloexec_flag(dups[2])
     1248
     1249                    # Disable inheritance
     1250                    if close_fds:
     1251                        for i in xrange(3, MAXFD):
     1252                            try:
     1253                                f = fcntl.fcntl(i, fcntl.F_GETFD)
     1254                                if not (f & fcntl.FD_CLOEXEC):
     1255                                    cloexec_fds.add(i)
     1256                                    self._set_cloexec_flag(i)
     1257                            except:
     1258                                pass
     1259
     1260                    if env is None:
     1261                        pid = os.spawnvp(mode, executable, args)
     1262                    else:
     1263                        pid = os.spawnvpe(mode, executable, args, env)
    13041264                finally:
    1305                     # be sure the FD is closed no matter what
    1306                     os.close(errpipe_write)
    1307 
    1308                 # Wait for exec to fail or succeed; possibly raising exception
    1309                 # Exception limited to 1M
    1310                 data = _eintr_retry_call(os.read, errpipe_read, 1048576)
    1311             finally:
     1265                    # Restore inheritance
     1266                    for i in cloexec_fds:
     1267                        self._set_cloexec_flag(i, False)
     1268
     1269                    # Restore the parent stdio
     1270                    for i, fd in enumerate(dups):
     1271                        if fd is not None:
     1272                            os.dup2(fd, i)
     1273                            os.close(fd)
     1274
     1275                    # Restore the current directory
     1276                    if oldcwd is not None:
     1277                        os.chdir(oldcwd)
     1278
     1279                # Child is launched. Close the parent's copy of those pipe
     1280                # handles that only the child should have open.  You need
     1281                # to make sure that no handles to the write end of the
     1282                # output pipe are maintained in this process or else the
     1283                # pipe will not close when the child process exits and the
     1284                # ReadFile will hang.
    13121285                if p2cread is not None and p2cwrite is not None:
    13131286                    _close_in_parent(p2cread)
     
    13171290                    _close_in_parent(errwrite)
    13181291
    1319                 # be sure the FD is closed no matter what
    1320                 os.close(errpipe_read)
    1321 
    1322             if data != "":
     1292                self._child_created = True
     1293                self.pid = pid
     1294
     1295            else:
     1296                # For transferring possible exec failure from child to parent
     1297                # The first char specifies the exception type: 0 means
     1298                # OSError, 1 means some other error.
     1299                errpipe_read, errpipe_write = self.pipe_cloexec()
    13231300                try:
    1324                     _eintr_retry_call(os.waitpid, self.pid, 0)
    1325                 except OSError as e:
    1326                     if e.errno != errno.ECHILD:
    1327                         raise
    1328                 child_exception = pickle.loads(data)
    1329                 raise child_exception
     1301                    try:
     1302                        gc_was_enabled = gc.isenabled()
     1303                        # Disable gc to avoid bug where gc -> file_dealloc ->
     1304                        # write to stderr -> hang.  http://bugs.python.org/issue1336
     1305                        gc.disable()
     1306                        try:
     1307                            self.pid = os.fork()
     1308                        except:
     1309                            if gc_was_enabled:
     1310                                gc.enable()
     1311                            raise
     1312                        self._child_created = True
     1313                        if self.pid == 0:
     1314                            # Child
     1315                            try:
     1316                                # Close parent's pipe ends
     1317                                if p2cwrite is not None:
     1318                                    os.close(p2cwrite)
     1319                                if c2pread is not None:
     1320                                    os.close(c2pread)
     1321                                if errread is not None:
     1322                                    os.close(errread)
     1323                                os.close(errpipe_read)
     1324
     1325                                # When duping fds, if there arises a situation
     1326                                # where one of the fds is either 0, 1 or 2, it
     1327                                # is possible that it is overwritten (#12607).
     1328                                if c2pwrite == 0:
     1329                                    c2pwrite = os.dup(c2pwrite)
     1330                                if errwrite == 0 or errwrite == 1:
     1331                                    errwrite = os.dup(errwrite)
     1332
     1333                                # Dup fds for child
     1334                                def _dup2(a, b):
     1335                                    # dup2() removes the CLOEXEC flag but
     1336                                    # we must do it ourselves if dup2()
     1337                                    # would be a no-op (issue #10806).
     1338                                    if a == b:
     1339                                        self._set_cloexec_flag(a, False)
     1340                                    elif a is not None:
     1341                                        os.dup2(a, b)
     1342                                _dup2(p2cread, 0)
     1343                                _dup2(c2pwrite, 1)
     1344                                _dup2(errwrite, 2)
     1345
     1346                                # Close pipe fds.  Make sure we don't close the
     1347                                # same fd more than once, or standard fds.
     1348                                closed = { None }
     1349                                for fd in [p2cread, c2pwrite, errwrite]:
     1350                                    if fd not in closed and fd > 2:
     1351                                        os.close(fd)
     1352                                        closed.add(fd)
     1353
     1354                                if cwd is not None:
     1355                                    os.chdir(cwd)
     1356
     1357                                if preexec_fn:
     1358                                    preexec_fn()
     1359
     1360                                # Close all other fds, if asked for - after
     1361                                # preexec_fn(), which may open FDs.
     1362                                if close_fds:
     1363                                    self._close_fds(but=errpipe_write)
     1364
     1365                                if env is None:
     1366                                    os.execvp(executable, args)
     1367                                else:
     1368                                    os.execvpe(executable, args, env)
     1369
     1370                            except:
     1371                                exc_type, exc_value, tb = sys.exc_info()
     1372                                # Save the traceback and attach it to the exception object
     1373                                exc_lines = traceback.format_exception(exc_type,
     1374                                                                       exc_value,
     1375                                                                       tb)
     1376                                exc_value.child_traceback = ''.join(exc_lines)
     1377                                os.write(errpipe_write, pickle.dumps(exc_value))
     1378
     1379                            # This exitcode won't be reported to applications, so it
     1380                            # really doesn't matter what we return.
     1381                            os._exit(255)
     1382
     1383                        # Parent
     1384                        if gc_was_enabled:
     1385                            gc.enable()
     1386                    finally:
     1387                        # be sure the FD is closed no matter what
     1388                        os.close(errpipe_write)
     1389
     1390                    # Wait for exec to fail or succeed; possibly raising exception
     1391                    # Exception limited to 1M
     1392                    data = _eintr_retry_call(os.read, errpipe_read, 1048576)
     1393                finally:
     1394                    if p2cread is not None and p2cwrite is not None:
     1395                        _close_in_parent(p2cread)
     1396                    if c2pwrite is not None and c2pread is not None:
     1397                        _close_in_parent(c2pwrite)
     1398                    if errwrite is not None and errread is not None:
     1399                        _close_in_parent(errwrite)
     1400
     1401                    # be sure the FD is closed no matter what
     1402                    os.close(errpipe_read)
     1403
     1404                if data != "":
     1405                    try:
     1406                        _eintr_retry_call(os.waitpid, self.pid, 0)
     1407                    except OSError as e:
     1408                        if e.errno != errno.ECHILD:
     1409                            raise
     1410                    child_exception = pickle.loads(data)
     1411                    raise child_exception
    13301412
    13311413
     
    13931475
    13941476        def _communicate(self, input):
    1395             if self.stdin:
    1396                 # Flush stdio buffer.  This might block, if the user has
    1397                 # been writing to .stdin in an uncontrolled fashion.
    1398                 self.stdin.flush()
    1399                 if not input:
     1477            if os2:
     1478                def _readerthread(file, buffer):
     1479                    while True:
     1480                        data = _eintr_retry_call(file.read)
     1481                        if data == "":
     1482                            file.close()
     1483                            break
     1484                        buffer.append(data)
     1485
     1486                stdout = None # Return
     1487                stderr = None # Return
     1488
     1489                if self.stdout:
     1490                    stdout = []
     1491                    stdout_thread = threading.Thread(target=_readerthread,
     1492                                                     args=(self.stdout, stdout))
     1493                    stdout_thread.setDaemon(True)
     1494                    stdout_thread.start()
     1495
     1496                if self.stderr:
     1497                    stderr = []
     1498                    stderr_thread = threading.Thread(target=_readerthread,
     1499                                                     args=(self.stderr, stderr))
     1500                    stderr_thread.setDaemon(True)
     1501                    stderr_thread.start()
     1502
     1503                if self.stdin:
     1504                    if input is not None:
     1505                        try:
     1506                            self.stdin.write(input)
     1507                        except IOError as e:
     1508                            if e.errno != errno.EPIPE:
     1509                                raise
    14001510                    self.stdin.close()
    14011511
    1402             if _has_poll:
    1403                 stdout, stderr = self._communicate_with_poll(input)
     1512                if self.stdout:
     1513                    stdout_thread.join()
     1514                if self.stderr:
     1515                    stderr_thread.join()
     1516
    14041517            else:
    1405                 stdout, stderr = self._communicate_with_select(input)
     1518                if self.stdin:
     1519                    # Flush stdio buffer.  This might block, if the user has
     1520                    # been writing to .stdin in an uncontrolled fashion.
     1521                    self.stdin.flush()
     1522                    if not input:
     1523                        self.stdin.close()
     1524
     1525                if _has_poll:
     1526                    stdout, stderr = self._communicate_with_poll(input)
     1527                else:
     1528                    stdout, stderr = self._communicate_with_select(input)
    14061529
    14071530            # All data exchanged.  Translate lists into strings.
     
    16171740
    16181741
     1742def _demo_os2():
     1743    #
     1744    # Example 1: Simple redirection: Get current process data
     1745    #
     1746    pdata = Popen(["pstat", "/p:%s" % hex(os.getpid())[2:]], stdout=PIPE, stderr=PIPE).communicate()[0]
     1747    print "Current process data:"
     1748    print pdata
     1749    #
     1750    # Example 2: Connecting several subprocesses
     1751    #
     1752    p1 = Popen(["pstat", "/p:%s" % hex(os.getpid())[2:]], stdout=PIPE)
     1753    p2 = Popen(["grep", "\.EXE"], stdin=p1.stdout, stdout=PIPE)
     1754    print "Current .EXE:"
     1755    print p2.communicate()[0]
     1756
     1757
    16191758if __name__ == "__main__":
    16201759    if mswindows:
    16211760        _demo_windows()
     1761    elif os2:
     1762        _demo_os2()
    16221763    else:
    16231764        _demo_posix()
  • python/trunk/Modules/posixmodule.c

    r1253 r1254  
    48214821    i = pipe_err = 0;
    48224822    while ((pipe_err == 0) && (i < file_count))
    4823 #ifndef __KLIBC__
    48244823        pipe_err = pipe((int *)&p_fd[i++]);
    4825 #else
    4826         pipe_err = socketpair(AF_UNIX, SOCK_STREAM,0,(int *)&p_fd[i++]);
    4827 #endif
    48284824    if (pipe_err < 0)
    48294825    {
     
    69626958    int res;
    69636959    Py_BEGIN_ALLOW_THREADS
    6964 #ifndef __KLIBC__
    69656960    res = pipe(fds);
    6966 #else
    6967     res = socketpair(AF_UNIX, SOCK_STREAM,0, fds);
    6968 #endif
    69696961    Py_END_ALLOW_THREADS
    69706962    if (res != 0)
Note: See TracChangeset for help on using the changeset viewer.