必看:简析Python中的四种队列

Python官方文档 — Python 3.12.4 documentation
管道Pipe用于在两个进程间传递消息,而队列能够在多个生产者和消费者之间通信。

Pipe

  • 管道可以是全双工(默认),自然可以单工,这取决于duplex参数。
  • 管道k可以发送send()和接收消息recv()
  • 管道主要用于具有亲缘关系的进程之间的通信(如父子进程或兄弟进程)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from multiprocessing import Process, Pipe

    def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

    if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv()) # prints "[42, None, 'hello']"
    p.join()
    parent_conn, child_conn = Pipe()返回的两个连接对象 Pipe(), 表示管道的两端,通过将两个链接对象作为参数传入函数来启动进程 p = Process(target=f, args=(child_conn,)),使得两个进程可以通信。每个连接对象都有 send()recv() 方法。不能两个进程同时从一个管道的一端写,但是可以从不同端写。

SimpleQueue

put()get()

Queue

  • put()get()
  • qsize()empty()full()方法可能因多线程/多进程语义而不完全可靠。
  • 没有task_done,只要get就会删除

JoinableQueue

  • put()get()
  • qsize()empty()full()方法可能因多线程/多进程语义而不完全可靠。
  • task_done()join()方法:用于向队列告知处理过,并进行清理
    只有task_done才删除

Python标准库中包含了四种队列,分别是queue.Queue / asyncio.Queue / multiprocessing.Queue / collections.deque

multiprocessing.Queue是基于消息传递(message passing)机制实现的,而不是像某些其他队列实现那样基于共享内存

当APSheduler两个任务同时触发,它会使用多线程还是多进程执行?

调度器类型

  • BlockingScheduler:通常在主程序中运行,会阻塞当前线程,直到调度器被明确关闭。它主要用于简单的应用场景,如脚本。
  • BackgroundScheduler:在后台线程中运行,不会阻塞主线程。它适用于需要同时执行其他任务的应用程序。
  • AsyncIOScheduler:在asyncio事件循环中运行,适用于异步编程场景。

执行器类型

  • ThreadPoolExecutor:使用线程池来执行任务,适用于I/O密集型任务。
  • ProcessPoolExecutor:使用进程池来执行任务,适用于CPU密集型任务,因为Python的全局解释器锁(GIL)会限制多线程在执行CPU密集型任务时的并行性。

关于任务执行模型

  1. 多线程执行
    • 当配置为使用ThreadPoolExecutor时,APScheduler将使用多线程来执行任务。这意味着如果两个任务几乎同时触发,它们可能会在不同的线程中并行执行(取决于线程池的大小和任务的执行时间)。
  2. 多进程执行
    • 当配置为使用ProcessPoolExecutor时,APScheduler将使用多进程来执行任务。这允许任务在物理上并行执行,特别是在CPU密集型任务中,可以绕过Python的GIL限制。

multiprocessing

concurrent.futures

concurrent.futures是对多进程和多线程的抽象。它是一个Python库,提供了一个高层次的接口,用于异步执行callable对象。这个库的主要模块是Executor,它有两个子类:ThreadPoolExecutor和ProcessPoolExecutor。ThreadPoolExecutor用于实现多线程,而ProcessPoolExecutor用于实现多进程。这两个子类都继承了相同的接口,包括submit,map,shutdown等方法

multiprocessing.shared_memory共享内存

反射

反射是指在运行时检查、修改或为对象新增属性和方法,以及通过名称访问对象的特性、方法以及其他成员的能力。Python的内置反射机制包括getattrsetattrdelattrhasattr等函数,以及dirvars函数。这些工具让Python程序能够动态地访问和操作对象的属性和方法。例如,getattr(obj, name)函数可以获取对象obj的属性或方法namesetattr(obj, name, value)函数可以设置对象obj的属性name的值为valuedelattr(obj, name)函数可以删除对象obj的属性namehasattr(obj, name)函数可以检查对象obj是否具有属性或方法name。这些函数可以让我们在运行时动态地访问和修改对象的属性和方法,从而提高了编程的智能化和适应性

三类进程启动方式

  1. Spawn

    • 当使用spawn方法时,父进程会启动一个新的Python解释器实例作为子进程。这意味着子进程会从一个干净的状态开始,仅继承运行Process对象的run()方法所必需的资源。
    • 不必要的文件描述符和句柄不会被子进程继承,这有助于避免资源泄漏和提高安全性。
    • spawn方法相对较慢,因为它需要重新加载和初始化整个Python环境。
    • 这是Windows和macOS平台上的默认设置,并且在多线程环境中更安全。
  2. Fork

    • 在POSIX系统上,fork方法使用os.fork()来复制当前进程,创建一个几乎完全相同的子进程。
    • 子进程继承了父进程的所有资源,包括打开的文件描述符和内存映射。
    • fork在多线程环境下可能引发问题,因为os.fork()在某些系统上不是线程安全的,可能导致数据不一致。
    • 自Python 3.12版本开始,如果检测到多线程环境,os.fork()将发出DeprecationWarning警告,建议使用其他启动方法。
    • fork是当前大多数POSIX系统的默认设置,但计划在Python 3.14中改变这一默认值。
  3. Forkserver

    • forkserver方法首先在程序启动时创建一个服务器进程。随后,当需要新进程时,父进程向服务器请求创建新进程。
    • 服务器进程是单线程的,因此可以安全地使用os.fork(),除非系统库或预加载的导入无意中创建了线程。
    • fork相比,forkserver不会继承不必要的资源,提供了更好的性能和隔离性。
    • 只有在支持通过Unix管道传递文件描述符的POSIX平台上可用,如Linux。

在 Unix 上通过 spawnforkserver 方式启动多进程会同时启动一个 资源追踪 进程,负责追踪当前程序的进程产生的、并且不再被使用的命名系统资源(如命名信号量以及 SharedMemory 对象)。当所有进程退出后,资源追踪会负责释放这些仍被追踪的的对象。通常情况下是不会有这种对象的,但是假如一个子进程被某个信号杀死,就可能存在这一类资源的“泄露”情况。(泄露的信号量以及共享内存不会被释放,直到下一次系统重启,对于这两类资源来说,这是一个比较大的问题,因为操作系统允许的命名信号量的数量是有限的,而共享内存也会占据主内存的一片空间)

进程开始方法设置方法

1
2
3
4
5
6
7
# 直接设置统一的进程开始方法
mp.set_start_method('spawn')

#通过上下文切换不同的进程开始方法
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))

Queue对象只能使用继承(inheritance)的方式共享。这是因为Queue本身基于unix的Pipe对象实现,而Pipe对象的共享需要通过继承。因此,在一个典型的应用实现模型当中,应该是父进程创建Queue,然后创建子进程共享该Queue,由父进程和子进程分别读写。

clickhouse logger级别修改:

/etc/clickhouse-server/config.xml