#!/usr/bin/python3.2 # Runs a command and output its stdout and stderr in a tkinter ScrolledText, # with a button to kill the process. Works with both short and long lived # processes. # # Implements a non-blocking subprocess pipe, since PEP 3145 is not part of # python at the time of writing (2012/01) # # References: # non-blocking popen: # http://www.python.org/dev/peps/pep-3145/ # # Thanks to Peter Otten for his help with tkinter. # http://mail.python.org/pipermail/python-list/2012-January/1286923.html # # Thanks to Sebastien Claeys for his post on Stack Overflow # http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python/5749687#5749687 # # # You can run it with any commands available in the path, but this particular # bash script will help demonstrate how it works with a long process that # does not terminate with a regular kill. # # #!/bin/bash # # trap '' SIGTERM # # while ((1)) # do # date # sleep 2 # done # # Yves Dorfsman - a python programer based in Calgary - 2012/01 # import os import sys import fcntl import tkinter import subprocess import tkinter.scrolledtext class nonBlockingPopen(subprocess.Popen): ''' Nonblocking version of suprocess.Popen. Useful while waiting for PEP 3145. In a regular subprocess.Popen, if you read from a pipe, it will block until the process is terminated, even if there is nothing to read. this class will only read what is available in the buffer at the time. This means that you need to schedule reading the output on a regular interval. ''' import os import fcntl def __init__(self, *args, **keywords): super().__init__(*args, **keywords) if self.stdout is not None: fod = self.stdout.fileno() fol = fcntl.fcntl(fod, fcntl.F_GETFL) fcntl.fcntl(fod, fcntl.F_SETFL, fol | os.O_NONBLOCK) if self.stderr is not None: fed = self.stderr.fileno() fel = fcntl.fcntl(fed, fcntl.F_GETFL) fcntl.fcntl(fed, fcntl.F_SETFL, fel | os.O_NONBLOCK) class SelfUpdatePIDLabel(tkinter.Label): ''' tkinter Label which updates itself base on the status of a process. If self.proc is None, or the process is terminated (self.proc.poll is not None) it will display a blank value, otherwise it will display the pid of the process. The same label can be re-used with another process, just update the value of self.proc. ''' def __init__(self, *args, proc=None, **keywords): self.proc = proc self.pid = None self.pidtext ='' self.rc = None self.baselabel = 'PID: ' super().__init__(*args, text=self.baselabel, **keywords) self.auto_update() def auto_update(self): if self.proc is None: self.pid = None newtext = '' else: self.rc = self.proc.poll() if self.rc is None: self.pid = self.proc.pid newtext = str(self.pid) else: self.pid = None newtext = '' if self.pidtext != newtext: self.pidtext = newtext self.config(text = self.baselabel + newtext) self.after(100, self.auto_update) class SelfUpdateKillingButton(tkinter.Button): ''' tkinter Button associated to a process to kill it. The button monitors the state of the process and is only active while the process is running. It sends a SIGTERM (subprocess.terminate) the first time it is activated, and a SIGKILL (subprocess.kill) any subsequent time. The Button can be re-used by other processes, simply update the value of self.proc. ''' def __init__(self, *args, proc=None, **keywords): self.conf_running_not_killed_var = 0 self.conf_running_killed_var = 1 self.conf_not_running_var = 2 self.conf = self.conf_not_running_var self.proc = proc self.rc = None self.killedOnce = False super().__init__(*args, command=self.kill_proc, state=tkinter.DISABLED, **keywords) self.auto_update() def kill_proc(self): if not self.killedOnce: self.killedOnce = True self.proc.terminate() self.conf_running_killed() else: self.proc.kill() def conf_running_not_killed(self): self.conf = self.conf_running_not_killed_var self.configure(fg='black') self.configure(activeforeground='black') self.configure(text='KILL (term)') self.config(state=tkinter.NORMAL) def conf_running_killed(self): self.conf = self.conf_running_killed_var self.configure(fg='red') self.configure(activeforeground='red') self.configure(text='KILL (kill)') self.config(state=tkinter.NORMAL) def conf_not_running(self): self.killedOnce = False self.conf = self.conf_not_running_var self.config(state=tkinter.DISABLED) self.configure(fg='black') self.configure(activeforeground='black') self.configure(text='kill') def auto_update(self): if self.proc is not None: self.rc = self.proc.poll() if self.rc is None: if self.killedOnce is not True: if self.conf != self.conf_running_killed_var: self.conf_running_not_killed() else: if self.conf != self.conf_running_killed_var: self.conf_running_killed() elif self.conf != self.conf_not_running_var: self.conf_not_running() self.after(100, self.auto_update) class ProcessOutputScrolledText(tkinter.scrolledtext.ScrolledText): ''' tkinter ScrolledText which updates itself from the stdout of a suprocess.Popen output. It keeps reading the output of the pipe even once the process is terminated. This is necessary for very short live processes (the process might already be terminated by the time we read its output) as well as processes with output bigger than the buffer. ''' def __init__(self, *args, proc=None, **keywords): self.lastouput = None self.proc = proc self.rc = None self.killedOnce = False super().__init__(*args, **keywords) self.auto_update() def destroy(self): ''' Terminate, then kill the process before destroying itself. ''' if self.proc is not None: self.rc = self.proc.poll() if self.rc is None: self.proc.terminate() if self.proc.poll() is None: self.proc.kill() super().destroy() def auto_update(self): if self.proc is not None: output = self.proc.stdout.read() if output is not None: self.insert(tkinter.END, output) self.see(tkinter.END) # We keep reading regardless of the state of the process. We need # to do this to capture the output from short lived processes. self.after(100, self.auto_update) def main(*args): tk = tkinter.Tk() proc = None st = ProcessOutputScrolledText(tk, proc=proc) st.pack(expand=True) pidlabel = SelfUpdatePIDLabel(tk, proc=proc) pidlabel.pack() killButton = SelfUpdateKillingButton(tk, proc=proc) killButton.pack() close = tkinter.Button(tk, text='close', command=tk.destroy) close.pack() proc = nonBlockingPopen(sys.argv[1:], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) st.proc = proc pidlabel.proc = proc killButton.proc = proc tk.mainloop() if __name__ == '__main__': if len(sys.argv) >= 2: main(*sys.argv) else: raise ValueError("Needs a command to execute.")