您可以使用 psutil.Process.suspend() 暂停执行超过给定内存阈值的正在运行的进程。监控部分只是反复比较 psutil.Process().memory_info().rss (具有给定阈值的正在运行的进程的“驻留集大小”)。您如何安排进一步处理取决于您自己。
psutil.Process.suspend()
psutil.Process().memory_info().rss
在下面的示例中,我将暂停罪魁祸首进程,直到其余部分完成,然后逐个恢复一次暂停的进程。这是一种显示一般机制的简单方法。
import time import random from threading import Thread from multiprocessing import Process, active_children import psutil def format_mib(mem_bytes): """Format bytes into mebibyte-string.""" return f'{mem_bytes / 2 ** 20:.2f} MiB' def f(append_length): """Main function in child-process. Appends random floats to list.""" p = psutil.Process() li = [] for i in range(10): li.extend([random.random() for _ in range(append_length)]) print(f'i: {i} | pid: {p.pid} | ' f'{format_mib(p.memory_full_info().rss)}') time.sleep(2) def monitored(running_processes, max_mib): """Monitor memory usage for running processes. Suspend execution for processes surpassing `max_mib` and complete one by one after behaving processes have finished. """ running_processes = [psutil.Process(pid=p.pid) for p in running_processes] suspended_processes = [] while running_processes: active_children() # Joins all finished processes. # Without it, p.is_running() below on Unix would not return `False` # for finished processes. actual_processes = running_processes.copy() for p in actual_processes: if not p.is_running(): running_processes.remove(p) print(f'removed finished process: {p}') else: if p.memory_info().rss / 2 ** 20 > max_mib: print(f'suspending process: {p}') p.suspend() running_processes.remove(p) suspended_processes.append(p) time.sleep(1) for p in suspended_processes: print(f'\nresuming process: {p}') p.resume() p.wait() if __name__ == '__main__': MAX_MiB = 200 append_lengths = [100000, 500000, 1000000, 2000000, 300000] processes = [Process(target=f, args=(append_length,)) for append_length in append_lengths] for p in processes: p.start() m = Thread(target=monitored, args=(processes, MAX_MiB)) m.start() m.join()
示例输出(缩短),两个进程在超过200 MiB阈值时暂停,并在执行完成后恢复:
i: 0 | pid: 17997 | 13.53 MiB i: 0 | pid: 18001 | 19.70 MiB i: 0 | pid: 17998 | 25.88 MiB i: 0 | pid: 17999 | 41.32 MiB i: 0 | pid: 18000 | 72.21 MiB ... i: 2 | pid: 17997 | 20.84 MiB i: 2 | pid: 18001 | 42.02 MiB i: 2 | pid: 17998 | 60.56 MiB i: 2 | pid: 17999 | 103.36 MiB i: 2 | pid: 18000 | 215.70 MiB suspending process: psutil.Process(pid=18000, name='python', started='18:20:09') i: 3 | pid: 17997 | 23.93 MiB i: 3 | pid: 18001 | 47.75 MiB i: 3 | pid: 17998 | 76.00 MiB i: 3 | pid: 17999 | 141.59 MiB ... i: 5 | pid: 17997 | 30.11 MiB i: 5 | pid: 18001 | 68.24 MiB i: 5 | pid: 17998 | 107.23 MiB i: 5 | pid: 17999 | 203.52 MiB suspending process: psutil.Process(pid=17999, name='python', started='18:20:09') i: 6 | pid: 17997 | 33.19 MiB i: 6 | pid: 18001 | 77.49 MiB i: 6 | pid: 17998 | 122.59 MiB ... i: 9 | pid: 17997 | 42.47 MiB i: 9 | pid: 18001 | 105.68 MiB i: 9 | pid: 17998 | 168.96 MiB removed finished process: psutil.Process(pid=17997, status='terminated') removed finished process: psutil.Process(pid=17998, status='terminated') removed finished process: psutil.Process(pid=18001, status='terminated') resuming process: psutil.Process(pid=18000, name='python', started='18:20:09') i: 3 | pid: 18000 | 277.46 MiB i: 4 | pid: 18000 | 339.22 MiB i: 5 | pid: 18000 | 400.84 MiB ... i: 9 | pid: 18000 | 648.00 MiB resuming process: psutil.Process(pid=17999, name='python', started='18:20:09') i: 6 | pid: 17999 | 234.55 MiB ... i: 9 | pid: 17999 | 327.31 MiB Process finished with exit code 0
的 编辑: 强>
我认为我唯一要解决的问题是,我怎么能让它一次只产生一定数量的线程[sic!],因为东西完成添加剩余的线程,然后在最后完成所有被暂停的线程?
我扩展了上面的代码以启用新的进程,因为旧的进程完成了最大的运行进程设置为核心数。我还将它重构为一个类,因为它会开始变得混乱所有必要的状态来管理。在下面的代码中,为了简洁起见,我可以互换地使用“任务”和“进程”这两个名称。请注意已更改的进程启动方法以及代码中附带的注释。
import time import random from threading import Thread from collections import deque from multiprocessing import Process, active_children, set_start_method import psutil # `def format_mib` and `def f` from above unchanged... class TaskProcessor(Thread): """Processor class which monitors memory usage for running tasks (processes). Suspends execution for tasks surpassing `max_mib` and completes them one by one, after behaving tasks have finished. """ def __init__(self, n_cores, max_mib, tasks): super().__init__() self.n_cores = n_cores self.max_mib = max_mib # memory threshold self.tasks = deque(tasks) self._running_tasks = [] self._suspended_tasks = [] def run(self): """Main-function in new thread.""" self._update_running_tasks() self._monitor_running_tasks() self._process_suspended_tasks() def _update_running_tasks(self): """Start new tasks if we have less running tasks than cores.""" while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0: p = self.tasks.popleft() p.start() # for further process-management we here just need the # psutil.Process wrapper self._running_tasks.append(psutil.Process(pid=p.pid)) print(f'Started process: {self._running_tasks[-1]}') def _monitor_running_tasks(self): """Monitor running tasks. Replace completed tasks and suspend tasks which exceed the memory threshold `self.max_mib`. """ # loop while we have running or non-started tasks while self._running_tasks or self.tasks: active_children() # Joins all finished processes. # Without it, p.is_running() below on Unix would not return # `False` for finished processes. self._update_running_tasks() actual_tasks = self._running_tasks.copy() for p in actual_tasks: if not p.is_running(): # process has finished self._running_tasks.remove(p) print(f'Removed finished process: {p}') else: if p.memory_info().rss / 2 ** 20 > self.max_mib: p.suspend() self._running_tasks.remove(p) self._suspended_tasks.append(p) print(f'Suspended process: {p}') time.sleep(1) def _process_suspended_tasks(self): """Resume processing of suspended tasks.""" for p in self._suspended_tasks: print(f'\nResuming process: {p}') p.resume() p.wait() if __name__ == '__main__': # Forking (default on Unix-y systems) an already multithreaded process is # error-prone. Since we intend to start processes after we are already # multithreaded, we switch to another start-method. set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available MAX_MiB = 200 N_CORES = 2 append_lengths = [100000, 500000, 1000000, 2000000, 300000] tasks = [Process(target=f, args=(append_length,)) for append_length in append_lengths] tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks) tp.start() tp.join()
示例输出(缩短):
Started process: psutil.Process(pid=9422, name='python', started='13:45:53') Started process: psutil.Process(pid=9423, name='python', started='13:45:53') i: 0 | pid: 9422 | 18.95 MiB i: 0 | pid: 9423 | 31.45 MiB ... i: 9 | pid: 9422 | 47.36 MiB i: 9 | pid: 9423 | 175.41 MiB Removed finished process: psutil.Process(pid=9422, status='terminated') Removed finished process: psutil.Process(pid=9423, status='terminated') Started process: psutil.Process(pid=9445, name='python', started='13:46:15') Started process: psutil.Process(pid=9446, name='python', started='13:46:15') i: 0 | pid: 9445 | 46.86 MiB i: 0 | pid: 9446 | 77.74 MiB ... i: 2 | pid: 9445 | 117.41 MiB i: 2 | pid: 9446 | 220.99 MiB Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15') Started process: psutil.Process(pid=9450, name='python', started='13:46:21') i: 0 | pid: 9450 | 25.16 MiB i: 3 | pid: 9445 | 148.29 MiB i: 1 | pid: 9450 | 36.47 MiB i: 4 | pid: 9445 | 179.17 MiB i: 2 | pid: 9450 | 45.74 MiB i: 5 | pid: 9445 | 211.14 MiB Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15') i: 3 | pid: 9450 | 55.00 MiB ... i: 9 | pid: 9450 | 110.62 MiB Removed finished process: psutil.Process(pid=9450, status='terminated') Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15') i: 3 | pid: 9446 | 282.75 MiB ... i: 9 | pid: 9446 | 655.35 MiB Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15') i: 6 | pid: 9445 | 242.12 MiB ... i: 9 | pid: 9445 | 334.88 MiB Process finished with exit code 0
parallel --memfree 是为这种情况而建的:
parallel --memfree
parallel --memfree 1G doit ::: {1..100}
如果有&gt;这只会产生一个新进程。 1 GB RAM免费。如果空闲时间小于0.5 * 1 GB,则会终止最年轻的并将该作业重新放回队列。
它被认为只是暂停/暂停最年轻的工作,但经验表明,将该流程交换进去往往比简单地重新启动工作要慢得多。