正如我所说,你可以创建自己的缓冲区并将STDOUT / STDERR写入它们,检查一路上的大小。为方便起见,你可以写一个小的 io.BytesIO 包装为你做检查,例如:
io.BytesIO
from io import BytesIO # lets first create a size-controlled BytesIO buffer for convenience class MeasuredStream(BytesIO): def __init__(self, maxsize=1024): # lets use a 1 KB as a default super(MeasuredStream, self).__init__() self.maxsize = maxsize self.length = 0 def write(self, b): if self.length + len(b) > self.maxsize: # o-oh, max size exceeded # write only up to maxsize, truncate the rest super(MeasuredStream, self).write(b[:self.maxsize - self.length]) raise ValueError("Max size reached, excess data is truncated") # plenty of space left, write the bytes and increase the length self.length += super(MeasuredStream, self).write(b) return len(b) # convention: return the written number of bytes
请注意,如果你打算做截断/寻找&替换你必须考虑你的那些 length 但这足以达到我们的目的。
length
无论如何,现在您需要做的就是处理自己的流并考虑可能的情况 ValueError 来自 MeasuredStream 而不是使用 Popen.communicate() 。不幸的是,这也意味着你必须自己处理超时。就像是:
ValueError
MeasuredStream
Popen.communicate()
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired import sys import time MEMORY_LIMIT = 64 * 1024 * 1024 TIMEOUT_LIMIT = 5 * 60 STDOUT_LIMIT = 1024 * 1024 # let's use 1 MB as a STDOUT limit __NR_FILE_NOT_FOUND = -1 __NR_TIMEOUT = -2 __NR_MEMORY_OUT = -3 __NR_MAX_STDOUT_EXCEEDED = -4 # let's add a new return code # a cross-platform precision clock get_timer = time.clock if sys.platform == "win32" else time.time def limit_memory(memory): import resource return lambda :resource.setrlimit(resource.RLIMIT_AS, (memory, memory)) def run_program(cmd, sinput='', timeout=TIMEOUT_LIMIT, memory=MEMORY_LIMIT): """Run the command line and output (ret, sout, serr).""" from subprocess import Popen, PIPE, STDOUT try: proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, preexec_fn=limit_memory(memory), timeout=timeout) except FileNotFoundError: return (__NR_FILE_NOT_FOUND, "", "") sout = MeasuredStream(STDOUT_LIMIT) # store STDOUT in a measured stream start_time = get_timer() # store a reference timer for our custom timeout try: proc.stdin.write(sinput.encode("utf-8")) # write the input to STDIN proc.stdin.flush() # flush the STDOUT buffer while True: # our main listener loop line = proc.stdout.readline() # read a line from the STDOUT # use proc.stdout.read(buf_size) instead to handle your own buffer if line != b"": # content collected... sout.write(line) # write it to our stream elif proc.poll() is not None: # process finished, nothing to do break # finally, check the current time progress... if get_timer() >= start_time + TIMEOUT_LIMIT: raise TimeoutExpired(proc.args, TIMEOUT_LIMIT) ret = proc.poll() # get the return code except TimeoutExpired: proc.kill() # we're no longer interested in the process, kill it ret = __NR_TIMEOUT except MemoryError: ret = __NR_MEMORY_OUT except ValueError: # max buffer reached proc.kill() # we're no longer interested in the process, kill it ret = __NR_MAX_STDOUT_EXCEEDED sout.seek(0) # rewind the buffer return ret, sout.read().decode("utf-8") # send the results back if __name__ == "__main__": ret, out, err = run_program(['./example.sh'], timeout=8) print("return code: %i\n" % ret) print("stdout:\n%s" % out) print("stderr:\n%s" % err)
这有两个“问题”,第一个很明显 - 我正在将子进程STDERR传递给STDOUT,因此结果将是混合的。因为从STDOUT和STDERR流中读取是一个阻塞操作,如果你想要单独阅读它们你必须产生两个线程(并分别处理它们的 ValueError 超出流大小时的异常)。第二个问题是子进程STDOUT可以锁定超时检查,因为它依赖于STDOUT实际刷新一些数据。这也可以通过单独的计时器线程来解决,如果超过超时,该线程将强制终止进程。事实上,这正是如此 Popen.communicate() 确实。
操作原理基本上是相同的,你只需要将支票外包给单独的线程并最终将所有内容连接起来。这是我要留给你的练习;)
至于你的第二个 缺少功能 你能详细说明一下你的想法吗?
似乎这个问题比看起来更复杂,我很难在网上发现解决方案并理解它们。
事实上,问题的复杂性来自于有几种方法可以解决它。我探索了三种方式( threading , multiprocessing 和 asyncio )。
threading
multiprocessing
asyncio
最后,我选择使用一个单独的线程来监听当前的子进程并捕获程序的输出。在我看来,这是最简单,最便携,最有效的方法。
因此,这个解决方案背后的基本思想是创建一个将要监听的线程 stdout 和 stderr 并收集所有的输出。达到限制后,您只需终止该过程并返回。
stdout
stderr
这是我的代码的简化版本:
from subprocess import Popen, PIPE, TimeoutExpired from queue import Queue from time import sleep from threading import Thread MAX_BUF = 35 def stream_reader(p, q, n): stdout_buf, stderr_buf = b'', b'' while p.poll() is None: sleep(0.1) stdout_buf += p.stdout.read(n) stderr_buf += p.stderr.read(n) if (len(stdout_buf) > n) or (len(stderr_buf) > n): stdout_buf, stderr_buf = stdout_buf[:n], stderr_buf[:n] try: p.kill() except ProcessLookupError: pass break q.put((stdout_buf.decode('utf-8', errors="ignore"), stderr_buf.decode('utf-8', errors="ignore"))) # Main function cmd = ['./example.sh'] proc = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE) q = Queue() t_io = Thread(target=stream_reader, args=(proc, q, MAX_BUF,), daemon=True) t_io.start() # Running the process try: proc.stdin.write(b'AAAAAAA') proc.stdin.close() except IOError: pass try: ret = proc.wait(timeout=20) except TimeoutExpired: ret = -1 # Or whatever code you decide to give it. t_io.join() sout, serr = q.get() print(ret, sout, serr)
你可以附加你想要的任何东西 example.sh 运行的脚本。请注意,这里避免了一些陷阱,以避免死锁和破坏代码(我测试了一下这个脚本)。然而,我并不完全确定这个脚本,所以不要犹豫提及明显的错误或改进。
example.sh