Fork me on GitHub

Python进阶下

有两种常用的方法可以使得代码并行执行,多线程和多进程 。因为Cython解释器的实现不是线程安全的,具有GIL锁,同一时刻,只有一个线程可以获得解释器的锁。因此,Python利用多核心的CPU只能通过多进程,而多线程只适用于IO密集型的程序。

多进程基础

创建并开启进程

可以使用multiprocessing.Process()来创建进程,它接受两个参数:

  • target,一个可调用的函数,当进程开始时会执行
  • args,一个元组,提供目标函数的参数

使用process.start()来开始执行一个进程

调用process.join()来告诉程序等待进程结束再执行后续代码,主进程将会被阻塞

from multiprocessing import Process
import os

def square_numbers():
    for i in range(1000):
        result = i * i


if __name__ == "__main__":        
    processes = []
    num_processes = os.cpu_count()
    # number of CPUs on the machine. Usually a good choise for the number of processes

    # create processes and asign a function for each process
    for i in range(num_processes):
        process = Process(target=square_numbers)
        processes.append(process)

    # start all processes
    for process in processes:
        process.start()

    # wait for all processes to finish
    # block the main programm until these processes are finished
    for process in processes:
        process.join()
进程间分享数据

因为进程的内存空间不同,需要特殊的共享内存对象来分享数据。

数据可以保存在共享内存变量中,使用Value或者Array

  • Value(type, value)创建一个ctype对象
  • Array(type, value)创建一个ctype类型的列表

如下程序演示年race condition资源竟态,每次执行结果都不一样,例如当两个进程读取同一个值,并对其执行+1操作,然后写会原有地址,其结果并不是预想的加2。

from multiprocessing import Process, Value, Array
import time

def add_100(number):
    for _ in range(100):
        time.sleep(0.001)
        number.value += 1

def add_100_array(numbers):
    for _ in range(100):
        time.sleep(0.01)
        for i in range(len(numbers)):
            numbers[i] += 1


if __name__ == "__main__":

    shared_number = Value('i', 0) 
    print('Value at beginning:', shared_number.value)

    shared_array = Array('d', [0.0, 100.0, 200.0])
    print('Array at beginning:', shared_array[:])

    process1 = Process(target=add_100, args=(shared_number,))
    process2 = Process(target=add_100, args=(shared_number,))

    process3 = Process(target=add_100_array, args=(shared_array,))
    process4 = Process(target=add_100_array, args=(shared_array,))

    process1.start()
    process2.start()
    process3.start()
    process4.start()

    process1.join()
    process2.join()
    process3.join()
    process4.join()

    print('Value at end:', shared_number.value)
    print('Array at end:', shared_array[:])

    print('end main')

"""
    Value at beginning: 0
    Array at beginning: [0.0, 100.0, 200.0]
    Value at end: 144
    Array at end: [134.0, 237.0, 339.0]
    end main
"""
可以使用锁避免资源竟态

锁(也称为互斥锁)是一种同步机制,用于在存在许多执行进程/线程的环境中强制限制对资源的访问。锁具有两种状态:锁定和解锁。 如果状态为锁定,则在再次解除锁定状态之前,不允许其他并发进程/线程进入此代码段。

# import Lock
from multiprocessing import Lock
from multiprocessing import Process, Value, Array
import time

def add_100(number, lock):
    for _ in range(100):
        time.sleep(0.001)
        # lock the state
        lock.acquire()

        number.value += 1

        # unlock the state
        lock.release()

def add_100_array(numbers, lock):
    for _ in range(100):
        time.sleep(0.01)
        for i in range(len(numbers)):
            lock.acquire()
            numbers[i] += 1
            lock.release()


if __name__ == "__main__":

    # create a lock
    lock1 = Lock()
    lock2 = Lock()

    shared_number = Value('i', 0) 
    print('Value at beginning:', shared_number.value)

    shared_array = Array('d', [0.0, 100.0, 200.0])
    print('Array at beginning:', shared_array[:])

    # pass the lock to the target function
    process1 = Process(target=add_100, args=(shared_number, lock1))
    process2 = Process(target=add_100, args=(shared_number, lock1))

    process3 = Process(target=add_100_array, args=(shared_array, lock2))
    process4 = Process(target=add_100_array, args=(shared_array, lock2))

    process1.start()
    process2.start()
    process3.start()
    process4.start()

    process1.join()
    process2.join()
    process3.join()
    process4.join()

    print('Value at end:', shared_number.value)
    print('Array at end:', shared_array[:])

    print('end main')

"""
Value at beginning: 0
Array at beginning: [0.0, 100.0, 200.0]
Value at end: 200
Array at end: [200.0, 300.0, 400.0]
end main
"""
在上下文管理器中使用锁

使用上下文管理器管理锁的获取和释放更加安全

def add_100(number, lock):
    for _ in range(100):
        time.sleep(0.01)
        with lock:
            number.value += 1
多进程使用队列通信

使用队列的操作是进程安全的。多进程队列实现了队列的所有方法。done()和join()除外。

  • q.get():移除队首第一个元素,默认情况,会阻塞直到有元素可用
  • q.put(item)将元素压到队尾,默认情况,阻塞直到队列有空的槽
  • q.empty()如果队列为空,返回True
  • q.close()表明当前进程不会有新的数据放到队列中了
# communicate between processes with the multiprocessing Queue
# Queues are thread and process safe
from multiprocessing import Process, Queue
import time 

def square(numbers, queue):
    for i in numbers:
        time.sleep(0.01)
        queue.put(i*i)

def make_negative(numbers, queue):
    for i in numbers:
        time.sleep(0.01)
        queue.put(i*-1)

if __name__ == "__main__":

    numbers = range(1, 6)
    q = Queue()

    p1 = Process(target=square, args=(numbers,q))
    p2 = Process(target=make_negative, args=(numbers,q))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    # order might not be sequential
    while not q.empty():
        print(q.get())

    print('end main')

"""
1
-1
4
-2
9
-3
16
-4
25
-5
end main
"""
进程池

进程池对象控制一些工作进程worker,可以支持超时和回调以实现异步处理,也有一些并行的map实现。它可以自动管理多个处理器,并将数据分成小块,在多个处理器上并行处理。

重要的函数包括:

  • map(func, iterable[, chunksize])将可迭代对象切分成小块,作为独立任务提交到进程池,并行处理。函数将会阻塞,直到返回结果。
  • close()阻止更多任务添加到进程池,一旦任务完成,worker进程将退出
  • join()等待工作进程退出,在调用join()之前需要调用close()或者terminate()
  • apply(func, args)调用func函数,参数是args。阻塞直到返回结果,func函数只在进程池中一个worker中执行
  • map_async()apply_async()这种非阻塞的异步函数
from multiprocessing import Pool 
import random
import time

def cube(number):
    print("Hi")
    time.sleep(random.randint(1,2))
    return number * number * number


if __name__ == "__main__":
    numbers = range(10)

    p = Pool()

    # by default this allocates the maximum number of available 
    # processors for this task --> os.cpu_count()
    result = p.map(cube,  numbers)

    # or 
    # result = [p.apply(cube, args=(i,)) for i in numbers]

    p.close()
    p.join()

    print(result)

多线程基础

Python多线程相对比较鸡肋,其使用和多进程类似

创建并开始线程

使用threading库实现

from threading import Thread

def square_numbers():
    for i in range(1000):
        result = i * i


if __name__ == "__main__":        
    threads = []
    num_threads = 10

    # create threads and asign a function for each thread
    for i in range(num_threads):
        thread = Thread(target=square_numbers)
        threads.append(thread)

    # start all threads
    for thread in threads:
        thread.start()

    # wait for all threads to finish
    # block the main thread until these threads are finished
    for thread in threads:
        thread.join()
线程间共享数据

线程间可以通过全局变量来共享数据,因为线程间是共享内存空间的

from threading import Thread
import time

# all threads can access this global variable
database_value = 0

def increase():
    global database_value # needed to modify the global value

    # get a local copy (simulate data retrieving)
    local_copy = database_value

    # simulate some modifying operation
    local_copy += 1
    time.sleep(0.1)

    # write the calculated new value into the global variable
    database_value = local_copy


if __name__ == "__main__":

    print('Start value: ', database_value)

    t1 = Thread(target=increase)
    t2 = Thread(target=increase)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('End value:', database_value)

    print('end main')

"""
    Start value:  0
    End value: 1
    end main
"""
使用锁处理资源竟态
# import Lock
from threading import Thread, Lock
import time


database_value = 0

def increase(lock):
    global database_value 

    # lock the state
    lock.acquire()

    local_copy = database_value
    local_copy += 1
    time.sleep(0.1)
    database_value = local_copy

    # unlock the state
    lock.release()


if __name__ == "__main__":

    # create a lock
    lock = Lock()

    print('Start value: ', database_value)

    # pass the lock to the target function
    t1 = Thread(target=increase, args=(lock,)) # notice the comma after lock since args must be a tuple
    t2 = Thread(target=increase, args=(lock,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('End value:', database_value)

    print('end main')

使用上下文管理器

def increase(lock):
    global database_value 

    with lock: 
        local_copy = database_value
        local_copy += 1
        time.sleep(0.1)
        database_value = local_copy
多线程消息队列通信

对队列的操作是线程安全的

from threading import Thread, Lock, current_thread
from queue import Queue

def worker(q, lock):
    while True:
        value = q.get()  # blocks until the item is available

        # do stuff...
        with lock:
            # prevent printing at the same time with this lock
            print(f"in {current_thread().name} got {value}")
        # ...

        # For each get(), a subsequent call to task_done() tells the queue
        # that the processing on this item is complete.
        # If all tasks are done, q.join() can unblock
        q.task_done()


if __name__ == '__main__':
    q = Queue()
    num_threads = 10
    lock = Lock()

    for i in range(num_threads):
        t = Thread(name=f"Thread{i+1}", target=worker, args=(q, lock))
        t.daemon = True  # dies when the main thread dies
        t.start()

    # fill the queue with items
    for x in range(20):
        q.put(x)

    q.join()  # Blocks until all items in the queue have been gotten and processed.

    print('main done')

"""
in Thread1 got 0
    in Thread2 got 1
    in Thread2 got 11
    in Thread2 got 12
    in Thread2 got 13
    in Thread2 got 14
    in Thread2 got 15
    in Thread2 got 16
    in Thread2 got 17
    in Thread2 got 18
    in Thread2 got 19
    in Thread8 got 5
    in Thread4 got 9
    in Thread1 got 10
    in Thread5 got 2
    in Thread6 got 3
    in Thread9 got 6
    in Thread7 got 4
    in Thread10 got 7
    in Thread3 got 8
    main done
"""

参考

Python多线程和多进程

Comments