#!/usr/bin/env python """ exec_command Implements exec_command function that is (almost) equivalent to commands.getstatusoutput function but on NT, DOS systems the returned status is actually correct (though, the returned status values may be different by a factor). In addition, exec_command takes keyword arguments for (re-)defining environment variables. Provides functions: exec_command --- execute command in a specified directory and in the modified environment. find_executable --- locate a command using info from environment variable PATH. Equivalent to posix `which` command. Author: Pearu Peterson Created: 11 January 2003 Requires: Python 2.x Succesfully tested on: ======== ============ ================================================= os.name sys.platform comments ======== ============ ================================================= posix linux2 Debian (sid) Linux, Python 2.1.3+, 2.2.3+, 2.3.3 PyCrust 0.9.3, Idle 1.0.2 posix linux2 Red Hat 9 Linux, Python 2.1.3, 2.2.2, 2.3.2 posix sunos5 SunOS 5.9, Python 2.2, 2.3.2 posix darwin Darwin 7.2.0, Python 2.3 nt win32 Windows Me Python 2.3(EE), Idle 1.0, PyCrust 0.7.2 Python 2.1.1 Idle 0.8 nt win32 Windows 98, Python 2.1.1. Idle 0.8 nt win32 Cygwin 98-4.10, Python 2.1.1(MSC) - echo tests fail i.e. redefining environment variables may not work. FIXED: don't use cygwin echo! Comment: also `cmd /c echo` will not work but redefining environment variables do work. posix cygwin Cygwin 98-4.10, Python 2.3.3(cygming special) nt win32 Windows XP, Python 2.3.3 ======== ============ ================================================= Known bugs: * Tests, that send messages to stderr, fail when executed from MSYS prompt because the messages are lost at some point. """ from __future__ import division, absolute_import, print_function __all__ = ['exec_command', 'find_executable'] import os import sys import shlex from numpy.distutils.misc_util import is_sequence, make_temp_file from numpy.distutils import log from numpy.distutils.compat import get_exception from numpy.compat import open_latin1 def temp_file_name(): fo, name = make_temp_file() fo.close() return name def get_pythonexe(): pythonexe = sys.executable if os.name in ['nt', 'dos']: fdir, fn = os.path.split(pythonexe) fn = fn.upper().replace('PYTHONW', 'PYTHON') pythonexe = os.path.join(fdir, fn) assert os.path.isfile(pythonexe), '%r is not a file' % (pythonexe,) return pythonexe def find_executable(exe, path=None, _cache={}): """Return full path of a executable or None. Symbolic links are not followed. """ key = exe, path try: return _cache[key] except KeyError: pass log.debug('find_executable(%r)' % exe) orig_exe = exe if path is None: path = os.environ.get('PATH', os.defpath) if os.name=='posix': realpath = os.path.realpath else: realpath = lambda a:a if exe.startswith('"'): exe = exe[1:-1] suffixes = [''] if os.name in ['nt', 'dos', 'os2']: fn, ext = os.path.splitext(exe) extra_suffixes = ['.exe', '.com', '.bat'] if ext.lower() not in extra_suffixes: suffixes = extra_suffixes if os.path.isabs(exe): paths = [''] else: paths = [ os.path.abspath(p) for p in path.split(os.pathsep) ] for path in paths: fn = os.path.join(path, exe) for s in suffixes: f_ext = fn+s if not os.path.islink(f_ext): f_ext = realpath(f_ext) if os.path.isfile(f_ext) and os.access(f_ext, os.X_OK): log.info('Found executable %s' % f_ext) _cache[key] = f_ext return f_ext log.warn('Could not locate executable %s' % orig_exe) return None ############################################################ def _preserve_environment( names ): log.debug('_preserve_environment(%r)' % (names)) env = {} for name in names: env[name] = os.environ.get(name) return env def _update_environment( **env ): log.debug('_update_environment(...)') for name, value in env.items(): os.environ[name] = value or '' def _supports_fileno(stream): """ Returns True if 'stream' supports the file descriptor and allows fileno(). """ if hasattr(stream, 'fileno'): try: r = stream.fileno() return True except IOError: return False else: return False def exec_command(command, execute_in='', use_shell=None, use_tee=None, _with_python = 1, **env ): """ Return (status,output) of executed command. Parameters ---------- command : str A concatenated string of executable and arguments. execute_in : str Before running command ``cd execute_in`` and after ``cd -``. use_shell : {bool, None}, optional If True, execute ``sh -c command``. Default None (True) use_tee : {bool, None}, optional If True use tee. Default None (True) Returns ------- res : str Both stdout and stderr messages. Notes ----- On NT, DOS systems the returned status is correct for external commands. Wild cards will not work for non-posix systems or when use_shell=0. """ log.debug('exec_command(%r,%s)' % (command,\ ','.join(['%s=%r'%kv for kv in env.items()]))) if use_tee is None: use_tee = os.name=='posix' if use_shell is None: use_shell = os.name=='posix' execute_in = os.path.abspath(execute_in) oldcwd = os.path.abspath(os.getcwd()) if __name__[-12:] == 'exec_command': exec_dir = os.path.dirname(os.path.abspath(__file__)) elif os.path.isfile('exec_command.py'): exec_dir = os.path.abspath('.') else: exec_dir = os.path.abspath(sys.argv[0]) if os.path.isfile(exec_dir): exec_dir = os.path.dirname(exec_dir) if oldcwd!=execute_in: os.chdir(execute_in) log.debug('New cwd: %s' % execute_in) else: log.debug('Retaining cwd: %s' % oldcwd) oldenv = _preserve_environment( list(env.keys()) ) _update_environment( **env ) try: # _exec_command is robust but slow, it relies on # usable sys.std*.fileno() descriptors. If they # are bad (like in win32 Idle, PyCrust environments) # then _exec_command_python (even slower) # will be used as a last resort. # # _exec_command_posix uses os.system and is faster # but not on all platforms os.system will return # a correct status. if (_with_python and _supports_fileno(sys.stdout) and sys.stdout.fileno() == -1): st = _exec_command_python(command, exec_command_dir = exec_dir, **env) elif os.name=='posix': st = _exec_command_posix(command, use_shell=use_shell, use_tee=use_tee, **env) else: st = _exec_command(command, use_shell=use_shell, use_tee=use_tee,**env) finally: if oldcwd!=execute_in: os.chdir(oldcwd) log.debug('Restored cwd to %s' % oldcwd) _update_environment(**oldenv) return st def _exec_command_posix( command, use_shell = None, use_tee = None, **env ): log.debug('_exec_command_posix(...)') if is_sequence(command): command_str = ' '.join(list(command)) else: command_str = command tmpfile = temp_file_name() stsfile = None if use_tee: stsfile = temp_file_name() filter = '' if use_tee == 2: filter = r'| tr -cd "\n" | tr "\n" "."; echo' command_posix = '( %s ; echo $? > %s ) 2>&1 | tee %s %s'\ % (command_str, stsfile, tmpfile, filter) else: stsfile = temp_file_name() command_posix = '( %s ; echo $? > %s ) > %s 2>&1'\ % (command_str, stsfile, tmpfile) #command_posix = '( %s ) > %s 2>&1' % (command_str,tmpfile) log.debug('Running os.system(%r)' % (command_posix)) status = os.system(command_posix) if use_tee: if status: # if command_tee fails then fall back to robust exec_command log.warn('_exec_command_posix failed (status=%s)' % status) return _exec_command(command, use_shell=use_shell, **env) if stsfile is not None: f = open_latin1(stsfile, 'r') status_text = f.read() status = int(status_text) f.close() os.remove(stsfile) f = open_latin1(tmpfile, 'r') text = f.read() f.close() os.remove(tmpfile) if text[-1:]=='\n': text = text[:-1] return status, text def _exec_command_python(command, exec_command_dir='', **env): log.debug('_exec_command_python(...)') python_exe = get_pythonexe() cmdfile = temp_file_name() stsfile = temp_file_name() outfile = temp_file_name() f = open(cmdfile, 'w') f.write('import os\n') f.write('import sys\n') f.write('sys.path.insert(0,%r)\n' % (exec_command_dir)) f.write('from exec_command import exec_command\n') f.write('del sys.path[0]\n') f.write('cmd = %r\n' % command) f.write('os.environ = %r\n' % (os.environ)) f.write('s,o = exec_command(cmd, _with_python=0, **%r)\n' % (env)) f.write('f=open(%r,"w")\nf.write(str(s))\nf.close()\n' % (stsfile)) f.write('f=open(%r,"w")\nf.write(o)\nf.close()\n' % (outfile)) f.close() cmd = '%s %s' % (python_exe, cmdfile) status = os.system(cmd) if status: raise RuntimeError("%r failed" % (cmd,)) os.remove(cmdfile) f = open_latin1(stsfile, 'r') status = int(f.read()) f.close() os.remove(stsfile) f = open_latin1(outfile, 'r') text = f.read() f.close() os.remove(outfile) return status, text def quote_arg(arg): if arg[0]!='"' and ' ' in arg: return '"%s"' % arg return arg def _exec_command( command, use_shell=None, use_tee = None, **env ): log.debug('_exec_command(...)') if use_shell is None: use_shell = os.name=='posix' if use_tee is None: use_tee = os.name=='posix' using_command = 0 if use_shell: # We use shell (unless use_shell==0) so that wildcards can be # used. sh = os.environ.get('SHELL', '/bin/sh') if is_sequence(command): argv = [sh, '-c', ' '.join(list(command))] else: argv = [sh, '-c', command] else: # On NT, DOS we avoid using command.com as it's exit status is # not related to the exit status of a command. if is_sequence(command): argv = command[:] else: argv = shlex.split(command) if hasattr(os, 'spawnvpe'): spawn_command = os.spawnvpe else: spawn_command = os.spawnve argv[0] = find_executable(argv[0]) or argv[0] if not os.path.isfile(argv[0]): log.warn('Executable %s does not exist' % (argv[0])) if os.name in ['nt', 'dos']: # argv[0] might be internal command argv = [os.environ['COMSPEC'], '/C'] + argv using_command = 1 _so_has_fileno = _supports_fileno(sys.stdout) _se_has_fileno = _supports_fileno(sys.stderr) so_flush = sys.stdout.flush se_flush = sys.stderr.flush if _so_has_fileno: so_fileno = sys.stdout.fileno() so_dup = os.dup(so_fileno) if _se_has_fileno: se_fileno = sys.stderr.fileno() se_dup = os.dup(se_fileno) outfile = temp_file_name() fout = open(outfile, 'w') if using_command: errfile = temp_file_name() ferr = open(errfile, 'w') log.debug('Running %s(%s,%r,%r,os.environ)' \ % (spawn_command.__name__, os.P_WAIT, argv[0], argv)) if sys.version_info[0] >= 3 and os.name == 'nt': # Pre-encode os.environ, discarding un-encodable entries, # to avoid it failing during encoding as part of spawn. Failure # is possible if the environment contains entries that are not # encoded using the system codepage as windows expects. # # This is not necessary on unix, where os.environ is encoded # using the surrogateescape error handler and decoded using # it as part of spawn. encoded_environ = {} for k, v in os.environ.items(): try: encoded_environ[k.encode(sys.getfilesystemencoding())] = v.encode( sys.getfilesystemencoding()) except UnicodeEncodeError: log.debug("ignoring un-encodable env entry %s", k) else: encoded_environ = os.environ argv0 = argv[0] if not using_command: argv[0] = quote_arg(argv0) so_flush() se_flush() if _so_has_fileno: os.dup2(fout.fileno(), so_fileno) if _se_has_fileno: if using_command: #XXX: disabled for now as it does not work from cmd under win32. # Tests fail on msys os.dup2(ferr.fileno(), se_fileno) else: os.dup2(fout.fileno(), se_fileno) try: status = spawn_command(os.P_WAIT, argv0, argv, encoded_environ) except Exception: errmess = str(get_exception()) status = 999 sys.stderr.write('%s: %s'%(errmess, argv[0])) so_flush() se_flush() if _so_has_fileno: os.dup2(so_dup, so_fileno) os.close(so_dup) if _se_has_fileno: os.dup2(se_dup, se_fileno) os.close(se_dup) fout.close() fout = open_latin1(outfile, 'r') text = fout.read() fout.close() os.remove(outfile) if using_command: ferr.close() ferr = open_latin1(errfile, 'r') errmess = ferr.read() ferr.close() os.remove(errfile) if errmess and not status: # Not sure how to handle the case where errmess # contains only warning messages and that should # not be treated as errors. #status = 998 if text: text = text + '\n' #text = '%sCOMMAND %r FAILED: %s' %(text,command,errmess) text = text + errmess print (errmess) if text[-1:]=='\n': text = text[:-1] if status is None: status = 0 if use_tee: print (text) return status, text def test_nt(**kws): pythonexe = get_pythonexe() echo = find_executable('echo') using_cygwin_echo = echo != 'echo' if using_cygwin_echo: log.warn('Using cygwin echo in win32 environment is not supported') s, o=exec_command(pythonexe\ +' -c "import os;print os.environ.get(\'AAA\',\'\')"') assert s==0 and o=='', (s, o) s, o=exec_command(pythonexe\ +' -c "import os;print os.environ.get(\'AAA\')"', AAA='Tere') assert s==0 and o=='Tere', (s, o) os.environ['BBB'] = 'Hi' s, o=exec_command(pythonexe\ +' -c "import os;print os.environ.get(\'BBB\',\'\')"') assert s==0 and o=='Hi', (s, o) s, o=exec_command(pythonexe\ +' -c "import os;print os.environ.get(\'BBB\',\'\')"', BBB='Hey') assert s==0 and o=='Hey', (s, o) s, o=exec_command(pythonexe\ +' -c "import os;print os.environ.get(\'BBB\',\'\')"') assert s==0 and o=='Hi', (s, o) elif 0: s, o=exec_command('echo Hello') assert s==0 and o=='Hello', (s, o) s, o=exec_command('echo a%AAA%') assert s==0 and o=='a', (s, o) s, o=exec_command('echo a%AAA%', AAA='Tere') assert s==0 and o=='aTere', (s, o) os.environ['BBB'] = 'Hi' s, o=exec_command('echo a%BBB%') assert s==0 and o=='aHi', (s, o) s, o=exec_command('echo a%BBB%', BBB='Hey') assert s==0 and o=='aHey', (s, o) s, o=exec_command('echo a%BBB%') assert s==0 and o=='aHi', (s, o) s, o=exec_command('this_is_not_a_command') assert s and o!='', (s, o) s, o=exec_command('type not_existing_file') assert s and o!='', (s, o) s, o=exec_command('echo path=%path%') assert s==0 and o!='', (s, o) s, o=exec_command('%s -c "import sys;sys.stderr.write(sys.platform)"' \ % pythonexe) assert s==0 and o=='win32', (s, o) s, o=exec_command('%s -c "raise \'Ignore me.\'"' % pythonexe) assert s==1 and o, (s, o) s, o=exec_command('%s -c "import sys;sys.stderr.write(\'0\');sys.stderr.write(\'1\');sys.stderr.write(\'2\')"'\ % pythonexe) assert s==0 and o=='012', (s, o) s, o=exec_command('%s -c "import sys;sys.exit(15)"' % pythonexe) assert s==15 and o=='', (s, o) s, o=exec_command('%s -c "print \'Heipa\'"' % pythonexe) assert s==0 and o=='Heipa', (s, o) print ('ok') def test_posix(**kws): s, o=exec_command("echo Hello",**kws) assert s==0 and o=='Hello', (s, o) s, o=exec_command('echo $AAA',**kws) assert s==0 and o=='', (s, o) s, o=exec_command('echo "$AAA"',AAA='Tere',**kws) assert s==0 and o=='Tere', (s, o) s, o=exec_command('echo "$AAA"',**kws) assert s==0 and o=='', (s, o) os.environ['BBB'] = 'Hi' s, o=exec_command('echo "$BBB"',**kws) assert s==0 and o=='Hi', (s, o) s, o=exec_command('echo "$BBB"',BBB='Hey',**kws) assert s==0 and o=='Hey', (s, o) s, o=exec_command('echo "$BBB"',**kws) assert s==0 and o=='Hi', (s, o) s, o=exec_command('this_is_not_a_command',**kws) assert s!=0 and o!='', (s, o) s, o=exec_command('echo path=$PATH',**kws) assert s==0 and o!='', (s, o) s, o=exec_command('python -c "import sys,os;sys.stderr.write(os.name)"',**kws) assert s==0 and o=='posix', (s, o) s, o=exec_command('python -c "raise \'Ignore me.\'"',**kws) assert s==1 and o, (s, o) s, o=exec_command('python -c "import sys;sys.stderr.write(\'0\');sys.stderr.write(\'1\');sys.stderr.write(\'2\')"',**kws) assert s==0 and o=='012', (s, o) s, o=exec_command('python -c "import sys;sys.exit(15)"',**kws) assert s==15 and o=='', (s, o) s, o=exec_command('python -c "print \'Heipa\'"',**kws) assert s==0 and o=='Heipa', (s, o) print ('ok') def test_execute_in(**kws): pythonexe = get_pythonexe() tmpfile = temp_file_name() fn = os.path.basename(tmpfile) tmpdir = os.path.dirname(tmpfile) f = open(tmpfile, 'w') f.write('Hello') f.close() s, o = exec_command('%s -c "print \'Ignore the following IOError:\','\ 'open(%r,\'r\')"' % (pythonexe, fn),**kws) assert s and o!='', (s, o) s, o = exec_command('%s -c "print open(%r,\'r\').read()"' % (pythonexe, fn), execute_in = tmpdir,**kws) assert s==0 and o=='Hello', (s, o) os.remove(tmpfile) print ('ok') def test_svn(**kws): s, o = exec_command(['svn', 'status'],**kws) assert s, (s, o) print ('svn ok') def test_cl(**kws): if os.name=='nt': s, o = exec_command(['cl', '/V'],**kws) assert s, (s, o) print ('cl ok') if os.name=='posix': test = test_posix elif os.name in ['nt', 'dos']: test = test_nt else: raise NotImplementedError('exec_command tests for ', os.name) ############################################################ if __name__ == "__main__": test(use_tee=0) test(use_tee=1) test_execute_in(use_tee=0) test_execute_in(use_tee=1) test_svn(use_tee=1) test_cl(use_tee=1)