Python multiprocessing 多线程示例

发布时间: 更新时间: 总字数:1981 阅读时间:4m 作者: IP上海 分享 网址

Python 多线程示例

介绍

多进程间通信,可以使用 multiprocessing 模块提供了 QueuePipes

  • os.fork() 创建子进程,如果返回值为 0,则是子进程,否则是父进程
  • 由于 windows 平台不支持 fork 方法,所以 python 提供 multiprocessing 模块来跨平台多进程,multiprocessing 模块封装了 fork() 调用,multiprocessing模块提供 Process 类代表进程
    • 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个 Process 实例,用 start() 方法启动
  • 如果要启动大量的子进程,可以用进程池的方式批量创建子进程,from multiprocessing import Pool
  • multiprocessing.Pipe() 一般用于进程或线程间的通信,创建管道时返回两个连接对象,代表管道的两端
    • 类似的 os.pipe(),但它主要用来创建两个文件描述符,一个读,一个写,单向通信

示例

multiprocessing.Pipe

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process, Pipe


def send(pipe):
    pipe.send('i am something ...')
    print(f'send got: {pipe.recv()}')
    pipe.close()


def recv(pipe):
    pipe.send(dict(name='xianbin', age=18))
    print(f'recv got: {pipe.recv()}')
    pipe.close()


if __name__ == '__main__':
    (con1, con2) = Pipe()
    sender = Process(target=send, name='send', args=(con1,))
    sender.start()

    recver = Process(target=recv, name='talk', args=(con2,))
    recver.start()

multiprocessing.Pool

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
import random
from multiprocessing import Pool


def sub_task(name):
    spid = os.getpid()
    print('run sub task name is {}, sub pid is {}...'.format(name, spid))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('sub task %s runs %0.2f seconds.' % (name, (end - start)))
    return spid


if __name__=='__main__':
    print('main process %s.' % os.getpid())
    p = Pool(4)
    results = []
    for i in range(10):
        r = p.apply_async(sub_task, args=(i,))
        results.append(r)
    print('Waiting for all subprocesses done...')

    for r in results:
        print("-- result {}".format(r.get()))
    p.close()
    p.join()
    print('All sub proceess done.')
import os
import multiprocessing

def worker_function(arg):
    # 访问加载的环境变量
    print(os.environ["MY_ENV_VAR"])

def initializer(env):
    # 将环境变量加载到工作进程的环境中
    os.environ.update(env)

if __name__ == "__main__":
    # 在主进程中设置环境变量
    env = {"MY_ENV_VAR": "my_value"}

    # 启动进程池并加载环境变量
    pool = multiprocessing.Pool(processes=4, initializer=initializer, initargs=(env,))

    # 向进程池提交工作函数
    pool.map(worker_function, range(10))

    # 关闭进程池
    pool.close()
    pool.join()

multiprocessing.Queue

import time

from multiprocessing import Process, Queue


def reader_proc(queue):
    """Read from the queue; this spawns as a separate Process"""
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break


def writer(count, num_of_reader_procs, queue):
    """Write integers into the queue.  A reader_proc() will read them from the queue"""
    for ii in range(0, count):
        queue.put(ii)  # Put 'count' numbers into queue

    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        queue.put("DONE")


def start_reader_procs(qq, num_of_reader_procs):
    """Start the reader processes and return all in a list to the caller"""
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = Process(target=reader_proc, args=((qq),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc

        all_reader_procs.append(reader_p)

    return all_reader_procs


if __name__ == "__main__":
    num_of_reader_procs = 2
    qq = Queue()  # writer() writes to qq from _this_ process
    for count in [10**4, 10**5, 10**6]:
        assert 0 < num_of_reader_procs < 4
        all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

        writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
        print("All reader processes are pulling numbers from the queue...")

        _start = time.time()
        for idx, a_reader_proc in enumerate(all_reader_procs):
            print("    Waiting for reader_p.join() index %s" % idx)
            a_reader_proc.join()  # Wait for a_reader_proc() to finish

            print("        reader_p() idx:%s is done" % idx)

        print(
            "Sending {0} integers through Queue() took {1} seconds".format(
                count, (time.time() - _start)
            )
        )
        print("")

共享变量

from multiprocessing import Pool, Manager

def worker(d, lock):
    with lock:
        d['count'] += 1

if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        d['count'] = 0
        lock = manager.Lock()

        with Pool(processes=4) as pool:
            for _ in range(1000):
                pool.apply_async(worker, args=(d, lock))
            pool.close()
            pool.join()

        print(d['count'])  # 输出应为1000,因为有1000次调用worker函数

复杂示例

import multiprocessing
from functools import partial
import os
import time

def calculate_square_with_prefix(number, prefix):
    """
    计算一个数字的平方,并添加一个前缀。
    模拟一个耗时操作。
    """
    pid = os.getpid() # 获取当前进程ID
    print(f"进程ID: {pid} 正在处理 {prefix} - {number}...")
    time.sleep(0.1) # 模拟耗时操作
    result = number * number
    print(f"进程ID: {pid} 完成了 {prefix} - {number},结果是 {result}")
    return result

if __name__ == "__main__":
    # 任务数据列表
    numbers_to_process = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    # 我们想要固定的参数值
    my_prefix = "计算任务"

    # 使用 functools.partial 来“冻结”函数中的 prefix 参数。
    # 这创建了一个新的函数,它只需要一个参数 (number)。
    # 这样就可以和 pool.map() 配合使用。
    # 新函数:partial_calculate_func(number)
    partial_calculate_func = partial(calculate_square_with_prefix, prefix=my_prefix)

    # 确定要使用的进程数,这里设置为4个
    process_num = 4
    print(f"主进程ID: {os.getpid()}")
    print(f"开始使用 {process_num} 个进程处理 {len(numbers_to_process)} 个任务...")

    # 使用 with 语句来创建进程池,确保它在使用后被正确关闭
    with multiprocessing.Pool(processes=process_num) as pool:
        # pool.map() 会将 numbers_to_process 列表中的每个元素,
        # 传递给 partial_calculate_func 函数,并并行执行。
        # 结果会按照输入列表的顺序返回。
        results = pool.map(partial_calculate_func, numbers_to_process)

    print("\n所有任务已完成!")
    print("最终结果列表:", results)

运行结果:

主进程ID: 25491
开始使用 4 个进程处理 10 个任务...
进程ID: 25493 正在处理 计算任务 - 1...
...
进程ID: 25496 正在处理 计算任务 - 4...
进程ID: 25494 完成了 计算任务 - 2,结果是 4
...
进程ID: 25494 正在处理 计算任务 - 10...
进程ID: 25496 完成了 计算任务 - 8,结果是 64
进程ID: 25493 完成了 计算任务 - 9,结果是 81
进程ID: 25494 完成了 计算任务 - 10,结果是 100

所有任务已完成!
最终结果列表: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

说明:

  • multiprocessing.Pool(processes=process_num): 这行代码创建了一个包含 4 个独立进程的进程池。这些进程是并行运行的,它们有各自独立的内存空间。
  • functools.partial: calculate_square_with_prefix 函数需要两个参数 (numberprefix)。但 pool.map() 只能接受一个可迭代对象作为输入,并将其中的每个元素作为单个参数传递给函数。partial 解决了这个问题,它将 prefix='计算任务' 固定住了,生成了一个新的单参数函数 partial_calculate_func
  • pool.map(): 这是核心。它高效地将 numbers_to_process 列表中的 10 个任务分发给进程池中的 4 个进程。一旦一个进程完成一个任务,它就会立即领取下一个任务,直到所有任务都被处理完毕。
  • 并行执行: 从输出的进程 ID 可以看出,多个任务是在不同的进程中同时运行的,这比单线程顺序执行要快得多。
  • 结果顺序: 尽管任务是并行处理的,pool.map() 保证了返回的 results 列表的顺序与原始输入 numbers_to_process 的顺序是一致的。

扩展

concurrent.futures

  • 错误日志
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
  • 解决方式,防止 if __name__ == '__main__': 中调用
本文总阅读量 次 本站总访问量 次 本站总访客数
Home Archives Categories Tags Statistics