PyXR

c:\python24\lib \ pty.py



0001 """Pseudo terminal utilities."""
0002 
0003 # Bugs: No signal handling.  Doesn't set slave termios and window size.
0004 #       Only tested on Linux.
0005 # See:  W. Richard Stevens. 1992.  Advanced Programming in the
0006 #       UNIX Environment.  Chapter 19.
0007 # Author: Steen Lumholt -- with additions by Guido.
0008 
0009 from select import select
0010 import os
0011 import tty
0012 
0013 __all__ = ["openpty","fork","spawn"]
0014 
0015 STDIN_FILENO = 0
0016 STDOUT_FILENO = 1
0017 STDERR_FILENO = 2
0018 
0019 CHILD = 0
0020 
0021 def openpty():
0022     """openpty() -> (master_fd, slave_fd)
0023     Open a pty master/slave pair, using os.openpty() if possible."""
0024 
0025     try:
0026         return os.openpty()
0027     except (AttributeError, OSError):
0028         pass
0029     master_fd, slave_name = _open_terminal()
0030     slave_fd = slave_open(slave_name)
0031     return master_fd, slave_fd
0032 
0033 def master_open():
0034     """master_open() -> (master_fd, slave_name)
0035     Open a pty master and return the fd, and the filename of the slave end.
0036     Deprecated, use openpty() instead."""
0037 
0038     try:
0039         master_fd, slave_fd = os.openpty()
0040     except (AttributeError, OSError):
0041         pass
0042     else:
0043         slave_name = os.ttyname(slave_fd)
0044         os.close(slave_fd)
0045         return master_fd, slave_name
0046 
0047     return _open_terminal()
0048 
0049 def _open_terminal():
0050     """Open pty master and return (master_fd, tty_name).
0051     SGI and generic BSD version, for when openpty() fails."""
0052     try:
0053         import sgi
0054     except ImportError:
0055         pass
0056     else:
0057         try:
0058             tty_name, master_fd = sgi._getpty(os.O_RDWR, 0666, 0)
0059         except IOError, msg:
0060             raise os.error, msg
0061         return master_fd, tty_name
0062     for x in 'pqrstuvwxyzPQRST':
0063         for y in '0123456789abcdef':
0064             pty_name = '/dev/pty' + x + y
0065             try:
0066                 fd = os.open(pty_name, os.O_RDWR)
0067             except os.error:
0068                 continue
0069             return (fd, '/dev/tty' + x + y)
0070     raise os.error, 'out of pty devices'
0071 
0072 def slave_open(tty_name):
0073     """slave_open(tty_name) -> slave_fd
0074     Open the pty slave and acquire the controlling terminal, returning
0075     opened filedescriptor.
0076     Deprecated, use openpty() instead."""
0077 
0078     result = os.open(tty_name, os.O_RDWR)
0079     try:
0080         from fcntl import ioctl, I_PUSH
0081     except ImportError:
0082         return result
0083     try:
0084         ioctl(result, I_PUSH, "ptem")
0085         ioctl(result, I_PUSH, "ldterm")
0086     except IOError:
0087         pass
0088     return result
0089 
0090 def fork():
0091     """fork() -> (pid, master_fd)
0092     Fork and make the child a session leader with a controlling terminal."""
0093 
0094     try:
0095         pid, fd = os.forkpty()
0096     except (AttributeError, OSError):
0097         pass
0098     else:
0099         if pid == CHILD:
0100             try:
0101                 os.setsid()
0102             except OSError:
0103                 # os.forkpty() already set us session leader
0104                 pass
0105         return pid, fd
0106 
0107     master_fd, slave_fd = openpty()
0108     pid = os.fork()
0109     if pid == CHILD:
0110         # Establish a new session.
0111         os.setsid()
0112         os.close(master_fd)
0113 
0114         # Slave becomes stdin/stdout/stderr of child.
0115         os.dup2(slave_fd, STDIN_FILENO)
0116         os.dup2(slave_fd, STDOUT_FILENO)
0117         os.dup2(slave_fd, STDERR_FILENO)
0118         if (slave_fd > STDERR_FILENO):
0119             os.close (slave_fd)
0120 
0121     # Parent and child process.
0122     return pid, master_fd
0123 
0124 def _writen(fd, data):
0125     """Write all the data to a descriptor."""
0126     while data != '':
0127         n = os.write(fd, data)
0128         data = data[n:]
0129 
0130 def _read(fd):
0131     """Default read function."""
0132     return os.read(fd, 1024)
0133 
0134 def _copy(master_fd, master_read=_read, stdin_read=_read):
0135     """Parent copy loop.
0136     Copies
0137             pty master -> standard output   (master_read)
0138             standard input -> pty master    (stdin_read)"""
0139     while 1:
0140         rfds, wfds, xfds = select(
0141                 [master_fd, STDIN_FILENO], [], [])
0142         if master_fd in rfds:
0143             data = master_read(master_fd)
0144             os.write(STDOUT_FILENO, data)
0145         if STDIN_FILENO in rfds:
0146             data = stdin_read(STDIN_FILENO)
0147             _writen(master_fd, data)
0148 
0149 def spawn(argv, master_read=_read, stdin_read=_read):
0150     """Create a spawned process."""
0151     if type(argv) == type(''):
0152         argv = (argv,)
0153     pid, master_fd = fork()
0154     if pid == CHILD:
0155         os.execlp(argv[0], *argv)
0156     try:
0157         mode = tty.tcgetattr(STDIN_FILENO)
0158         tty.setraw(STDIN_FILENO)
0159         restore = 1
0160     except tty.error:    # This is the same as termios.error
0161         restore = 0
0162     try:
0163         _copy(master_fd, master_read, stdin_read)
0164     except (IOError, OSError):
0165         if restore:
0166             tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
0167 
0168     os.close(master_fd)
0169 

Generated by PyXR 0.9.4
SourceForge.net Logo