Iohannes
发布于 2025-02-12 / 5 阅读 / 0 评论 / 0 点赞

多线程同步和通信

线程锁(互斥锁)

底层实现

线程锁的底层实现依赖于操作系统的互斥锁机制。在Python中,threading.Lock对象通过调用操作系统提供的互斥锁接口来实现线程同步

核心逻辑

  • 获取锁:当一个线程调用lock.acquire()时,它会尝试获取锁。如果锁已经被其他线程占用,则当前线程会被阻塞,直到锁被释放。
  • 释放锁:当线程完成操作后,调用lock.release()释放锁,允许其他等待的线程获取锁。

性能特点

  • 线程锁的性能开销相对较小,适用于简单的线程同步场景。
  • 在高并发场景下,锁的竞争可能会导致性能下降,尤其是在锁的持有时间较长时。

适用场景

  • 适用于需要保护共享资源的场景,例如对共享变量或数据结构的访问。
  • 适合并发量较小的场景,实现起来简单且能满足基本的同步需求。

信号量

底层实现

信号量的底层实现通常基于操作系统提供的信号量机制。信号量是一个计数器,用于控制多个线程对共享资源的访问

核心逻辑

  • 获取信号量:当线程调用semaphore.acquire()时,信号量的计数值减1。如果计数值小于0,则线程会被阻塞,直到其他线程释放信号量。
  • 释放信号量:当线程完成操作后,调用semaphore.release(),信号量的计数值加1,并唤醒一个等待的线程

性能特点

  • 信号量的操作相对简单,适用于高频次的同步操作。
  • 在资源池管理场景中,信号量的性能表现较好。

适用场景

  • 适用于限制同时访问某个资源的线程数量,如数据库连接池。
  • 适用于生产者-消费者模型,控制生产者和消费者的速率

threading.Event()

底层实现

threading.Event是基于条件变量实现的。它内部维护一个布尔标志,用于线程间的同步

核心逻辑

  • 设置事件:调用event.set()将事件标志设置为True,表示事件已发生。
  • 清除事件:调用event.clear()将事件标志设置为False,表示事件未发生。
  • 等待事件:线程调用event.wait()时,如果事件标志为False,线程会阻塞,直到事件标志被设置为True

性能特点

  • threading.Event()的性能开销较低,适用于简单的线程同步。
  • 在复杂的同步需求中,其性能可能不如信号量或条件变量。

适用场景

  • 适用于等待特定条件的场景,例如线程需要等待某个事件的发生。
  • 适用于需要多个线程在满足特定条件后协同工作的场景

队列

底层实现

Python中的queue.Queue是基于线程安全的队列实现,通常使用锁和条件变量来保证线程安全

核心逻辑

  • 入队:调用queue.put(item)将元素放入队列。如果队列已满且设置了阻塞,则线程会等待,直到队列有空间。
  • 出队:调用queue.get()从队列中取出元素。如果队列为空且设置了阻塞,则线程会等待,直到队列中有元素

性能特点

  • 队列的性能取决于队列的大小和操作频率。
  • 在高并发场景下,队列的线程安全机制会带来一定的性能开销。

适用场景

  • 适用于生产者-消费者模型,确保生产者和消费者之间的数据传递。
  • 适用于任务调度场景,例如后台任务的异步执行。

使用选择

  • 线程锁:适用于简单的线程同步,性能开销小,但在高并发场景下可能需要更精细的控制。
  • 信号量:适用于资源池管理和生产者-消费者模型,性能较好,尤其在高频次同步操作中。
  • threading.Event():适用于简单的条件等待和线程协同工作,性能开销低。
  • 队列:适用于生产者-消费者模型和任务调度,适合需要线程安全的数据传递。

线程安全变量类

import threading
import time


class ThreadVar:
    def __init__(self, value):
        self._value = value
        self._lock = threading.Lock()

    @property
    def value(self):
        rtn = None
        with self._lock:
            rtn = self._value
        return rtn

    @value.setter
    def value(self, new_value):
        with self._lock:
            self._value = new_value


def _call_fun(var):
    # 修改 ThreadVar 对象的 value 属性
    var.value = 10


if __name__ == '__main__':
    var = ThreadVar(3)
    t = threading.Thread(target=_call_fun, args=(var,))
    t.start()  # 启动线程

    while True:
        time.sleep(1)
        print(var.value)  # 打印 ThreadVar 的 value 属性

评论