PyXR

c:\python24\lib \ test \ test_subprocess.py



0001 import unittest
0002 from test import test_support
0003 import subprocess
0004 import sys
0005 import signal
0006 import os
0007 import tempfile
0008 import time
0009 import re
0010 
0011 mswindows = (sys.platform == "win32")
0012 
0013 #
0014 # Depends on the following external programs: Python
0015 #
0016 
0017 if mswindows:
0018     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
0019                                                 'os.O_BINARY);')
0020 else:
0021     SETBINARY = ''
0022 
0023 # In a debug build, stuff like "[6580 refs]" is printed to stderr at
0024 # shutdown time.  That frustrates tests trying to check stderr produced
0025 # from a spawned Python process.
0026 def remove_stderr_debug_decorations(stderr):
0027     return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
0028 
0029 class ProcessTestCase(unittest.TestCase):
0030     def mkstemp(self):
0031         """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
0032         if hasattr(tempfile, "mkstemp"):
0033             return tempfile.mkstemp()
0034         else:
0035             fname = tempfile.mktemp()
0036             return os.open(fname, os.O_RDWR|os.O_CREAT), fname
0037 
0038     #
0039     # Generic tests
0040     #
0041     def test_call_seq(self):
0042         # call() function with sequence argument
0043         rc = subprocess.call([sys.executable, "-c",
0044                               "import sys; sys.exit(47)"])
0045         self.assertEqual(rc, 47)
0046 
0047     def test_call_kwargs(self):
0048         # call() function with keyword args
0049         newenv = os.environ.copy()
0050         newenv["FRUIT"] = "banana"
0051         rc = subprocess.call([sys.executable, "-c",
0052                           'import sys, os;' \
0053                           'sys.exit(os.getenv("FRUIT")=="banana")'],
0054                         env=newenv)
0055         self.assertEqual(rc, 1)
0056 
0057     def test_stdin_none(self):
0058         # .stdin is None when not redirected
0059         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
0060                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0061         p.wait()
0062         self.assertEqual(p.stdin, None)
0063 
0064     def test_stdout_none(self):
0065         # .stdout is None when not redirected
0066         p = subprocess.Popen([sys.executable, "-c",
0067                              'print "    this bit of output is from a '
0068                              'test of stdout in a different '
0069                              'process ..."'],
0070                              stdin=subprocess.PIPE, stderr=subprocess.PIPE)
0071         p.wait()
0072         self.assertEqual(p.stdout, None)
0073 
0074     def test_stderr_none(self):
0075         # .stderr is None when not redirected
0076         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
0077                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
0078         p.wait()
0079         self.assertEqual(p.stderr, None)
0080 
0081     def test_executable(self):
0082         p = subprocess.Popen(["somethingyoudonthave",
0083                               "-c", "import sys; sys.exit(47)"],
0084                              executable=sys.executable)
0085         p.wait()
0086         self.assertEqual(p.returncode, 47)
0087 
0088     def test_stdin_pipe(self):
0089         # stdin redirection
0090         p = subprocess.Popen([sys.executable, "-c",
0091                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
0092                         stdin=subprocess.PIPE)
0093         p.stdin.write("pear")
0094         p.stdin.close()
0095         p.wait()
0096         self.assertEqual(p.returncode, 1)
0097 
0098     def test_stdin_filedes(self):
0099         # stdin is set to open file descriptor
0100         tf = tempfile.TemporaryFile()
0101         d = tf.fileno()
0102         os.write(d, "pear")
0103         os.lseek(d, 0, 0)
0104         p = subprocess.Popen([sys.executable, "-c",
0105                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
0106                          stdin=d)
0107         p.wait()
0108         self.assertEqual(p.returncode, 1)
0109 
0110     def test_stdin_fileobj(self):
0111         # stdin is set to open file object
0112         tf = tempfile.TemporaryFile()
0113         tf.write("pear")
0114         tf.seek(0)
0115         p = subprocess.Popen([sys.executable, "-c",
0116                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
0117                          stdin=tf)
0118         p.wait()
0119         self.assertEqual(p.returncode, 1)
0120 
0121     def test_stdout_pipe(self):
0122         # stdout redirection
0123         p = subprocess.Popen([sys.executable, "-c",
0124                           'import sys; sys.stdout.write("orange")'],
0125                          stdout=subprocess.PIPE)
0126         self.assertEqual(p.stdout.read(), "orange")
0127 
0128     def test_stdout_filedes(self):
0129         # stdout is set to open file descriptor
0130         tf = tempfile.TemporaryFile()
0131         d = tf.fileno()
0132         p = subprocess.Popen([sys.executable, "-c",
0133                           'import sys; sys.stdout.write("orange")'],
0134                          stdout=d)
0135         p.wait()
0136         os.lseek(d, 0, 0)
0137         self.assertEqual(os.read(d, 1024), "orange")
0138 
0139     def test_stdout_fileobj(self):
0140         # stdout is set to open file object
0141         tf = tempfile.TemporaryFile()
0142         p = subprocess.Popen([sys.executable, "-c",
0143                           'import sys; sys.stdout.write("orange")'],
0144                          stdout=tf)
0145         p.wait()
0146         tf.seek(0)
0147         self.assertEqual(tf.read(), "orange")
0148 
0149     def test_stderr_pipe(self):
0150         # stderr redirection
0151         p = subprocess.Popen([sys.executable, "-c",
0152                           'import sys; sys.stderr.write("strawberry")'],
0153                          stderr=subprocess.PIPE)
0154         self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
0155                          "strawberry")
0156 
0157     def test_stderr_filedes(self):
0158         # stderr is set to open file descriptor
0159         tf = tempfile.TemporaryFile()
0160         d = tf.fileno()
0161         p = subprocess.Popen([sys.executable, "-c",
0162                           'import sys; sys.stderr.write("strawberry")'],
0163                          stderr=d)
0164         p.wait()
0165         os.lseek(d, 0, 0)
0166         self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
0167                          "strawberry")
0168 
0169     def test_stderr_fileobj(self):
0170         # stderr is set to open file object
0171         tf = tempfile.TemporaryFile()
0172         p = subprocess.Popen([sys.executable, "-c",
0173                           'import sys; sys.stderr.write("strawberry")'],
0174                          stderr=tf)
0175         p.wait()
0176         tf.seek(0)
0177         self.assertEqual(remove_stderr_debug_decorations(tf.read()),
0178                          "strawberry")
0179 
0180     def test_stdout_stderr_pipe(self):
0181         # capture stdout and stderr to the same pipe
0182         p = subprocess.Popen([sys.executable, "-c",
0183                           'import sys;' \
0184                           'sys.stdout.write("apple");' \
0185                           'sys.stdout.flush();' \
0186                           'sys.stderr.write("orange")'],
0187                          stdout=subprocess.PIPE,
0188                          stderr=subprocess.STDOUT)
0189         output = p.stdout.read()
0190         stripped = remove_stderr_debug_decorations(output)
0191         self.assertEqual(stripped, "appleorange")
0192 
0193     def test_stdout_stderr_file(self):
0194         # capture stdout and stderr to the same open file
0195         tf = tempfile.TemporaryFile()
0196         p = subprocess.Popen([sys.executable, "-c",
0197                           'import sys;' \
0198                           'sys.stdout.write("apple");' \
0199                           'sys.stdout.flush();' \
0200                           'sys.stderr.write("orange")'],
0201                          stdout=tf,
0202                          stderr=tf)
0203         p.wait()
0204         tf.seek(0)
0205         output = tf.read()
0206         stripped = remove_stderr_debug_decorations(output)
0207         self.assertEqual(stripped, "appleorange")
0208 
0209     def test_cwd(self):
0210         tmpdir = os.getenv("TEMP", "/tmp")
0211         tmpdir = os.path.realpath(tmpdir)
0212         p = subprocess.Popen([sys.executable, "-c",
0213                           'import sys,os;' \
0214                           'sys.stdout.write(os.getcwd())'],
0215                          stdout=subprocess.PIPE,
0216                          cwd=tmpdir)
0217         normcase = os.path.normcase
0218         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
0219 
0220     def test_env(self):
0221         newenv = os.environ.copy()
0222         newenv["FRUIT"] = "orange"
0223         p = subprocess.Popen([sys.executable, "-c",
0224                           'import sys,os;' \
0225                           'sys.stdout.write(os.getenv("FRUIT"))'],
0226                          stdout=subprocess.PIPE,
0227                          env=newenv)
0228         self.assertEqual(p.stdout.read(), "orange")
0229 
0230     def test_communicate(self):
0231         p = subprocess.Popen([sys.executable, "-c",
0232                           'import sys,os;' \
0233                           'sys.stderr.write("pineapple");' \
0234                           'sys.stdout.write(sys.stdin.read())'],
0235                          stdin=subprocess.PIPE,
0236                          stdout=subprocess.PIPE,
0237                          stderr=subprocess.PIPE)
0238         (stdout, stderr) = p.communicate("banana")
0239         self.assertEqual(stdout, "banana")
0240         self.assertEqual(remove_stderr_debug_decorations(stderr),
0241                          "pineapple")
0242 
0243     def test_communicate_returns(self):
0244         # communicate() should return None if no redirection is active
0245         p = subprocess.Popen([sys.executable, "-c",
0246                               "import sys; sys.exit(47)"])
0247         (stdout, stderr) = p.communicate()
0248         self.assertEqual(stdout, None)
0249         self.assertEqual(stderr, None)
0250 
0251     def test_communicate_pipe_buf(self):
0252         # communicate() with writes larger than pipe_buf
0253         # This test will probably deadlock rather than fail, if
0254         # communicate() does not work properly.
0255         x, y = os.pipe()
0256         if mswindows:
0257             pipe_buf = 512
0258         else:
0259             pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
0260         os.close(x)
0261         os.close(y)
0262         p = subprocess.Popen([sys.executable, "-c",
0263                           'import sys,os;'
0264                           'sys.stdout.write(sys.stdin.read(47));' \
0265                           'sys.stderr.write("xyz"*%d);' \
0266                           'sys.stdout.write(sys.stdin.read())' % pipe_buf],
0267                          stdin=subprocess.PIPE,
0268                          stdout=subprocess.PIPE,
0269                          stderr=subprocess.PIPE)
0270         string_to_write = "abc"*pipe_buf
0271         (stdout, stderr) = p.communicate(string_to_write)
0272         self.assertEqual(stdout, string_to_write)
0273 
0274     def test_writes_before_communicate(self):
0275         # stdin.write before communicate()
0276         p = subprocess.Popen([sys.executable, "-c",
0277                           'import sys,os;' \
0278                           'sys.stdout.write(sys.stdin.read())'],
0279                          stdin=subprocess.PIPE,
0280                          stdout=subprocess.PIPE,
0281                          stderr=subprocess.PIPE)
0282         p.stdin.write("banana")
0283         (stdout, stderr) = p.communicate("split")
0284         self.assertEqual(stdout, "bananasplit")
0285         self.assertEqual(remove_stderr_debug_decorations(stderr), "")
0286 
0287     def test_universal_newlines(self):
0288         p = subprocess.Popen([sys.executable, "-c",
0289                           'import sys,os;' + SETBINARY +
0290                           'sys.stdout.write("line1\\n");'
0291                           'sys.stdout.flush();'
0292                           'sys.stdout.write("line2\\r");'
0293                           'sys.stdout.flush();'
0294                           'sys.stdout.write("line3\\r\\n");'
0295                           'sys.stdout.flush();'
0296                           'sys.stdout.write("line4\\r");'
0297                           'sys.stdout.flush();'
0298                           'sys.stdout.write("\\nline5");'
0299                           'sys.stdout.flush();'
0300                           'sys.stdout.write("\\nline6");'],
0301                          stdout=subprocess.PIPE,
0302                          universal_newlines=1)
0303         stdout = p.stdout.read()
0304         if hasattr(open, 'newlines'):
0305             # Interpreter with universal newline support
0306             self.assertEqual(stdout,
0307                              "line1\nline2\nline3\nline4\nline5\nline6")
0308         else:
0309             # Interpreter without universal newline support
0310             self.assertEqual(stdout,
0311                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
0312 
0313     def test_universal_newlines_communicate(self):
0314         # universal newlines through communicate()
0315         p = subprocess.Popen([sys.executable, "-c",
0316                           'import sys,os;' + SETBINARY +
0317                           'sys.stdout.write("line1\\n");'
0318                           'sys.stdout.flush();'
0319                           'sys.stdout.write("line2\\r");'
0320                           'sys.stdout.flush();'
0321                           'sys.stdout.write("line3\\r\\n");'
0322                           'sys.stdout.flush();'
0323                           'sys.stdout.write("line4\\r");'
0324                           'sys.stdout.flush();'
0325                           'sys.stdout.write("\\nline5");'
0326                           'sys.stdout.flush();'
0327                           'sys.stdout.write("\\nline6");'],
0328                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
0329                          universal_newlines=1)
0330         (stdout, stderr) = p.communicate()
0331         if hasattr(open, 'newlines'):
0332             # Interpreter with universal newline support
0333             self.assertEqual(stdout,
0334                              "line1\nline2\nline3\nline4\nline5\nline6")
0335         else:
0336             # Interpreter without universal newline support
0337             self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
0338 
0339     def test_no_leaking(self):
0340         # Make sure we leak no resources
0341         max_handles = 1026 # too much for most UNIX systems
0342         if mswindows:
0343             max_handles = 65 # a full test is too slow on Windows
0344         for i in range(max_handles):
0345             p = subprocess.Popen([sys.executable, "-c",
0346                     "import sys;sys.stdout.write(sys.stdin.read())"],
0347                     stdin=subprocess.PIPE,
0348                     stdout=subprocess.PIPE,
0349                     stderr=subprocess.PIPE)
0350             data = p.communicate("lime")[0]
0351             self.assertEqual(data, "lime")
0352 
0353 
0354     def test_list2cmdline(self):
0355         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
0356                          '"a b c" d e')
0357         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
0358                          'ab\\"c \\ d')
0359         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
0360                          'a\\\\\\b "de fg" h')
0361         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
0362                          'a\\\\\\"b c d')
0363         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
0364                          '"a\\\\b c" d e')
0365         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
0366                          '"a\\\\b\\ c" d e')
0367 
0368 
0369     def test_poll(self):
0370         p = subprocess.Popen([sys.executable,
0371                           "-c", "import time; time.sleep(1)"])
0372         count = 0
0373         while p.poll() is None:
0374             time.sleep(0.1)
0375             count += 1
0376         # We expect that the poll loop probably went around about 10 times,
0377         # but, based on system scheduling we can't control, it's possible
0378         # poll() never returned None.  It "should be" very rare that it
0379         # didn't go around at least twice.
0380         self.assert_(count >= 2)
0381         # Subsequent invocations should just return the returncode
0382         self.assertEqual(p.poll(), 0)
0383 
0384 
0385     def test_wait(self):
0386         p = subprocess.Popen([sys.executable,
0387                           "-c", "import time; time.sleep(2)"])
0388         self.assertEqual(p.wait(), 0)
0389         # Subsequent invocations should just return the returncode
0390         self.assertEqual(p.wait(), 0)
0391 
0392     #
0393     # POSIX tests
0394     #
0395     if not mswindows:
0396         def test_exceptions(self):
0397             # catched & re-raised exceptions
0398             try:
0399                 p = subprocess.Popen([sys.executable, "-c", ""],
0400                                  cwd="/this/path/does/not/exist")
0401             except OSError, e:
0402                 # The attribute child_traceback should contain "os.chdir"
0403                 # somewhere.
0404                 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
0405             else:
0406                 self.fail("Expected OSError")
0407 
0408         def test_run_abort(self):
0409             # returncode handles signal termination
0410             p = subprocess.Popen([sys.executable,
0411                                   "-c", "import os; os.abort()"])
0412             p.wait()
0413             self.assertEqual(-p.returncode, signal.SIGABRT)
0414 
0415         def test_preexec(self):
0416             # preexec function
0417             p = subprocess.Popen([sys.executable, "-c",
0418                               'import sys,os;' \
0419                               'sys.stdout.write(os.getenv("FRUIT"))'],
0420                              stdout=subprocess.PIPE,
0421                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
0422             self.assertEqual(p.stdout.read(), "apple")
0423 
0424         def test_args_string(self):
0425             # args is a string
0426             f, fname = self.mkstemp()
0427             os.write(f, "#!/bin/sh\n")
0428             os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
0429                         sys.executable)
0430             os.close(f)
0431             os.chmod(fname, 0700)
0432             p = subprocess.Popen(fname)
0433             p.wait()
0434             self.assertEqual(p.returncode, 47)
0435             os.remove(fname)
0436 
0437         def test_invalid_args(self):
0438             # invalid arguments should raise ValueError
0439             self.assertRaises(ValueError, subprocess.call,
0440                               [sys.executable,
0441                                "-c", "import sys; sys.exit(47)"],
0442                               startupinfo=47)
0443             self.assertRaises(ValueError, subprocess.call,
0444                               [sys.executable,
0445                                "-c", "import sys; sys.exit(47)"],
0446                               creationflags=47)
0447 
0448         def test_shell_sequence(self):
0449             # Run command through the shell (sequence)
0450             newenv = os.environ.copy()
0451             newenv["FRUIT"] = "apple"
0452             p = subprocess.Popen(["echo $FRUIT"], shell=1,
0453                                  stdout=subprocess.PIPE,
0454                                  env=newenv)
0455             self.assertEqual(p.stdout.read().strip(), "apple")
0456 
0457         def test_shell_string(self):
0458             # Run command through the shell (string)
0459             newenv = os.environ.copy()
0460             newenv["FRUIT"] = "apple"
0461             p = subprocess.Popen("echo $FRUIT", shell=1,
0462                                  stdout=subprocess.PIPE,
0463                                  env=newenv)
0464             self.assertEqual(p.stdout.read().strip(), "apple")
0465 
0466         def test_call_string(self):
0467             # call() function with string argument on UNIX
0468             f, fname = self.mkstemp()
0469             os.write(f, "#!/bin/sh\n")
0470             os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
0471                         sys.executable)
0472             os.close(f)
0473             os.chmod(fname, 0700)
0474             rc = subprocess.call(fname)
0475             self.assertEqual(rc, 47)
0476 
0477 
0478     #
0479     # Windows tests
0480     #
0481     if mswindows:
0482         def test_startupinfo(self):
0483             # startupinfo argument
0484             # We uses hardcoded constants, because we do not want to
0485             # depend on win32all.
0486             STARTF_USESHOWWINDOW = 1
0487             SW_MAXIMIZE = 3
0488             startupinfo = subprocess.STARTUPINFO()
0489             startupinfo.dwFlags = STARTF_USESHOWWINDOW
0490             startupinfo.wShowWindow = SW_MAXIMIZE
0491             # Since Python is a console process, it won't be affected
0492             # by wShowWindow, but the argument should be silently
0493             # ignored
0494             subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
0495                         startupinfo=startupinfo)
0496 
0497         def test_creationflags(self):
0498             # creationflags argument
0499             CREATE_NEW_CONSOLE = 16
0500             sys.stderr.write("    a DOS box should flash briefly ...\n")
0501             subprocess.call(sys.executable +
0502                                 ' -c "import time; time.sleep(0.25)"',
0503                             creationflags=CREATE_NEW_CONSOLE)
0504 
0505         def test_invalid_args(self):
0506             # invalid arguments should raise ValueError
0507             self.assertRaises(ValueError, subprocess.call,
0508                               [sys.executable,
0509                                "-c", "import sys; sys.exit(47)"],
0510                               preexec_fn=lambda: 1)
0511             self.assertRaises(ValueError, subprocess.call,
0512                               [sys.executable,
0513                                "-c", "import sys; sys.exit(47)"],
0514                               close_fds=True)
0515 
0516         def test_shell_sequence(self):
0517             # Run command through the shell (sequence)
0518             newenv = os.environ.copy()
0519             newenv["FRUIT"] = "physalis"
0520             p = subprocess.Popen(["set"], shell=1,
0521                                  stdout=subprocess.PIPE,
0522                                  env=newenv)
0523             self.assertNotEqual(p.stdout.read().find("physalis"), -1)
0524 
0525         def test_shell_string(self):
0526             # Run command through the shell (string)
0527             newenv = os.environ.copy()
0528             newenv["FRUIT"] = "physalis"
0529             p = subprocess.Popen("set", shell=1,
0530                                  stdout=subprocess.PIPE,
0531                                  env=newenv)
0532             self.assertNotEqual(p.stdout.read().find("physalis"), -1)
0533 
0534         def test_call_string(self):
0535             # call() function with string argument on Windows
0536             rc = subprocess.call(sys.executable +
0537                                  ' -c "import sys; sys.exit(47)"')
0538             self.assertEqual(rc, 47)
0539 
0540 
0541 def test_main():
0542     test_support.run_unittest(ProcessTestCase)
0543 
0544 if __name__ == "__main__":
0545     test_main()
0546 

Generated by PyXR 0.9.4
SourceForge.net Logo