三默网为您带来有关“【python】多任务threading & multiprocess & gevent”的文章内容,供您阅读参考。

【python】多任务threading & multiprocess & gevent

2023-01-20 12:35:23

0. 多任务介绍

  • 多个任务一起执行
  • 并行:真的多任务,两核各自完成任务
  • 并发:多任务交替执行,交替的足够快(时间片轮转)

1. 多线程

  • 线程:程序运行起来之后,一定有一个执行代码的东西称之为线程
  • 示例及说明
import time
import threading  # 引入线程

def sing():
    for i in range(5):
        print('唱')
        time.sleep(1)

def dance():
    for i in range(5):
        print('跳')
        time.sleep(1)

def main():
    t1 = threading.Thread(target=sing)  # target参数为目标函数
    t2 = threading.Thread(target=dance)
    t1.start()  # 启动线程
    t2.start()
    thread_list = threading.enumerate()  # 所有线程列表
    print(thread_list)
    len_thread_list = len(thread_list)  # 线程数
    print(len_thread_list)

if __name__ == '__main__':
    main()
  • 详细说明
    • 线程的执行顺序是没有顺序的,随机,可以通过延时的方式让某个线程先执行
    • 循环查看当前的线程的方式是:分别执行延时
    • 当调用Thread的时候,不会创建线程,当调用Thread创建出来的实例对象的start方法时才会创建线程并让线程运行
    • 如果创建Thread时执行的函数运行结束则该子线程结束
    • 主线程结束,则程序结束
    • 多线程之间是共享全局变量的:在一个函数中对全局变量进行修改的时候,如果修改了执行(即让全局变量指向了一个新的地方),那么必须使用global,如果是修改了指向空间的数据,则不用global
    • 创建线程是指定传递的参数:args参数:t=threading.Thread(target=sing,args=(g_nums,))# 其中args是用来传参数的
    • 多线程共享全局变量的问题:资源竞争,例如两个线程一起写入某一个文件
    • 同步概念:协同步调,按预定的先后顺序进行运行
    • 互斥锁解决资源竞争问题:某个线程要更改共享数据时,先将其锁定,其他线程不能更改,直到该线程释放资源,将资源的状态变成非锁定,其他线程才能再次锁定资源,保证每次只有一个线程进入写操作,从而保证多线程情况下数据的正确性
      # 创建锁 默认没有上锁
      mutex=threading.Lock()
      # 锁定 如果上锁之前已经上锁,会阻塞在这,直到锁借来位置,之前没上锁,则上锁成功
      mutex.acquire()
      # 释放
      mutex.release()
      
    • 死锁:多把锁共用的时候容易出现死锁,可以使用ctrl+C退出
      • 添加超时时间
      • 程序设计时尽量避免(银行家算法)
  • 继承Thread类完成创建线程(创建类线程)
    class MyThread(threading.Thread):
    	def run(self):
    		pass
    if __name__=="__main__":
    	t = MyThread() # 自动调用run函数,有其他函数应该使其在run中调用来让其运行
    	t.start()
    
  • 多线程版udp聊天器
    from socket import socket
    def recv_msg(udp_socket):
    	while True:
    		# 接收数据
    		recv_data=udp_socket.recvfrom(1024)
    		print(recv_data)
    def send_msg(udp_socket,dest_ip,dest_port):
    	while True:
    		# 发送数据
    		send_data=input("请输入要发送的数据:")
    		udp_socket.sendto(send_data.encode('utf-8'),(dest_ip,dest_port))
    
    def main():
    	udp_socket=socket(AF_INETT,SOCK_DGTAM)
    	udp_socket.bind(('',7890))
    	dest_id=input("请输入对方的ip:")
    	dest_port=input("请输入对方的端口:")
    	t_recv=threading.Thread(target=recv_msg, args=(udp_socket,))
    	t_send=threading.Thread(target=recv_msg, args=(udp_socket,dest_ip,dest_port))
    	t_recv.start()
    	t_send.start()
    
    if __name__=="__main__":
    	main()
    

2. 多进程

  • 代码+运行中的程序用到的资源叫做进程,是操作系统分配资源的基本单元,一个程序可以启动多个进程,进程拥有资源
  • 进程的状态:新建、就绪、等待(阻塞)、运行、死亡
  • 创建进程简单示例
    import multiprocessing,time
    def test1(num):
    	while True:
    		print("1num------------",num)
    		time.sleep(1)
    def test2(num):
    	while True:
    		print("2num------------",num)
    		time.sleep(1)
    def main():
    	p1 = multiprocessing.Process(target=test1, args=(1,)) # test1为调用函数名,args为要传递的参数
    	p2 = multiprocessing.Process(target=test2, args=(2,))
    	p1.start() # 启动进程
    	p2.start()
    
    	if __name__=="__main__":
    		main()
    
  • 详细说明
    • 查看进程命令: ps -aux
    • 杀死进程:kill PID
    • 进程耗费的资源大,进程数不见得越多越好
    • 在进程中,能在代码***享就尽量共享-》写时拷贝
    • 代码-》进程=》线程-》执行程序
  • 进程与线程对比
    • 都能完成多任务,线程轻量
    • 进程是资源分配的最小单元,线程是操作系统调度(执行)的最小单元
    • 线程不能独立执行,必须依存在进程中
    • 线程执行开销小,但不利于资源的管理和保护
  • 进程间通信:通过队列方式(Queue)->先进先出-》解耦
    from multiprocessing import Queue
    q = Queue(3) # 初始化一个Queue对象,最多接收三条put消息
    q.put() # 放入消息
    q.full() # 是否满了
    q.empty() # 是否空了
    q.qsize() # 消息数
    q.get() # 取数据
    q.put_nowait() # 异常方式通知阻塞(数据满了)
    q.get_nowait() # 异常方式通知空闲(没有数据)
    
    • 示例:
      import multiprocessing
      def down_from_web(q):
          data = [11, 22, 33, 44]
          for temp in data:
              q.put(temp)
          print("--下载并存入队列--")
      def analysis_data(q):
          # 从队列中获取数据
          waitting = list()
          while True:
              data = q.get()
              waitting.append(data)
              if q.empty():
                  break
          print(waitting)
      def main():
          # 创建队列
          q = multiprocessing.Queue()
          p1 = multiprocessing.Process(target=down_from_web, args=(q,))
          p2 = multiprocessing.Process(target=analysis_data, args=(q,))
          p1.start()
          p2.start()
      if __name__ == "__main__":
          main()
      
  • 进程池Pool:重复使用进程池中的进程
    • 进程池示例
      import random,os
      def worker(msg):
          t_start = time.time()
          print('%s开始执行,进程号为%d' % (msg, os.getpid()))
          time.sleep(random.random() * 2)
          t_stop = time.time()
          print(msg, '执行完毕,耗时%0.2f' % (t_stop - t_start))
      po = multiprocessing.Pool(3)
      for i in range(0, 10):
          # apply_async(要调用的目标,(传递给目标的参数元祖,))
          # 每次循环将会用空闲出来的子进程去调用目标
          po.apply_async(worker, (i,))
      print("---start---")
      po.close()  # 关闭进程池,不再接收新的请求
      po.join()  # 等待po中所有子进程执行完成,必须放在close之后
      print("---stop---")
      
  • 案例: 多任务文件夹copy
    import multiprocessing
    import os
    def copy_file(queue, file_name, old_folder, new_folder):
        # 完成文件的复制
        with open(old_folder + "/" + file_name, 'rb') as old_f:
            cont = old_f.read()
        with open(new_folder + "/" + file_name, 'wb') as new_f:
            new_f.write(cont)
        # 文件拷贝完,通知主进程
        queue.put(file_name)
    def main():
        # 1.获取要拷贝的文件夹的名字
        old_folder = input("请输入要copy文件夹的名字")
        # 2.创建一个新的文件夹
        try:
            new_folder = old_folder + "[复件]"
            os.mkdir(new_folder)
        except:
            pass
        # 3.获取文件夹的所有的待copy的文件的名字 listdir()
        file_names = os.listdir(old_folder)
        # 4.创建进程池
        po = multiprocessing.Pool(5)
        # 创建一个队列
        queue = multiprocessing.Manager().Queue()
        # 5.向进程池中添加copy文件的任务
        for file_name in file_names:
            po.apply_async(copy_file, args=(queue, file_name, old_folder, new_folder))
        po.close()
        # po.join()
        # 进度条
        all_num = len(file_names)
        while True:
            file_name = queue.get()
            if file_name in file_names:
                file_names.remove(file_name)
            copy_rate = (all_num - len(file_names)) * 100 / all_num
            print("\r%.2f...(%s)" % (copy_rate, file_name) + " " * 50, end='')
            if copy_rate >= 100:
                break
        print()
    # 复制源文件加中的文件,到新文件夹总的文件去
    if __name__ == "__main__":
        main()
    

3. 协程

  • python 通过yield提供了对协程的基本支持,但是不完全
    • 生成器和迭代器都是一种生成的方式(保存代码),节省内存
    • 迭代器:从集合的第一个元素开始访问,直到所有的元素被访问结束,迭代器只能往前,不会后退
    • 可迭代对象:
      • 判断某个东西是否可以迭代:
        	from collections import Iterable
        	isinstance(a,Iterable) # 判断a是否可迭代
        
      • 自定义可迭代类:判断是否可迭代—>调用iter函数得到__iter__方法的返回值-》__iter__方法的返回值是一个迭代器
        from collections import Iterable, Iterator
        class Classmate(object):
        def __init__(self):
            self.names = list()
            self.current_num = 0
        
        def add(self, name):
            self.names.append(name)
        
        def __iter__(self):
            return self
        
        def __next__(self):
            if self.current_num < len(self.names):
                ret = self.names[self.current_num]
                self.current_num += 1
                return ret
            else:
                raise StopIteration
        classmate = Classmate()
        classmate.add('张三')
        classmate.add('王二')
        classmate.add('赵四')
        print("判断是否是可迭代的对象:", isinstance(classmate, Iterable))
        classmate_iter = iter(classmate)
        print("判断是否是迭代器:", isinstance(classmate_iter, Iterator))
        # for循环检测是否可迭代—>调用__next__方法
        # 迭代器一定是可迭代对象,可迭代对象不一定是迭代器
        
    • python2中:xrange():返回一个生成list的方式,range():返回生成的列表
    • python3中:range():返回一个生成list的方式,迭代器
    • 斐波拉契数列:0,1,1,2,3,5,8,13,21,34…
      class Fibonacci(object):
      	def __init__(self,all_num):
      		self.all_num=all_num
      		self.current_num=0
      		self.a=0
      		self.b=1
      	def __iter__(self):
      		return self
      	def __next__(self):
      		if self.current_num<self.all_num:
      			ret=self.a
      			self.a,self.b=self.b,self.a++self.b
      			self.current_num+=1
      			return ret
      		else:
      			raise StopIteration
      fibo=Fibonacci(10)
      for num in fibo:
      	print(num)
      
    • list/tuple等也能接收迭代器做参数(转成列表|元祖)
    • 生成器:是一种特殊的迭代器(没有next和iter方法,却是迭代器),示例:lst=[x*2 for x in range(10)]
      • 用生成器实现斐波拉契数列
        def create_num(all_num):
            a, b = 0, 1
            current_num = 0
            while current_num < all_num:
                yield a  # 如果一个函数中有yield语句,则不再是函数,而是一个生成器的模板
                # yield导致yield后面的值返回给num并暂停,
                # 然后下次执行的时候是从yield下面一句开始执行,
                # 直到再次遇到yield暂停,如此循环
                a, b = b, a + b
                current_num += 1
            return 'ok..'  # 正常不会调用
        # 如果在调用的时候发现函数中有yield,此时不是调用函数,而是创建一个生成器对象
        obj = create_num(10)
        while True:
            try:
                ret = next(obj)
                print(ret)
            except Exception as ret:
                print(ret.value)  # 捕获异常的方式获取函数最终返回值
                break
        # 多个调用生成器生成的对象相互独立互不影响
        
      • 使用send唤醒(启动生成器)
        • ret = yield a # 第一次调用next执行到yield a,第二次调用的时候从赋值开始,如果调用方式为ret=obj.send('saf'),则是以saf作为yield a的值赋给ret ,打印出来的是ret
        • obj.send(n)可以调用从n开始的下一个值
        • 第一次用send启动的时候:obj.send(None)
      • 使用yield实现多任务
        import time
        def task_1():
            while True:
                print("------1-----")
                time.sleep(0.1)
                yield
        def task_2():
            while True:
                print("------2-----")
                time.sleep(0.1)
                yield
        def main():
            t1 = task_1()
            t2 = task_2()
            while True:
                next(t1)
                next(t2)
        if __name__ == "__main__":
            main()
        
  • 第三方gevent为python提供了完善的协程支持
    • 详细说明
      • gevent是第三方库,通过greenlet实现协程

      • 基本思想:当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO

      • 使用greenlet完成多任务

        • 安装:pip install greenlet
        • 示例
          from greenlet import greenlet
          import time
          def task_1():
              while True:
                  print("------1-----")
                  gr2.switch() # 手动切换到gr2
                  time.sleep(0.1)
          def task_2():
              while True:
                  print("------2-----")
                  gr1.switch() # 手动切换到gr1
                  time.sleep(0.1)
          gr1 = greenlet(task_1)
          gr2 = greenlet(task_2)
          # 切换到gr1
          gr1.switch() # 最终效果为gr1和gr2交替执行
          
      • 使用gevent(遇到延时就切换(gevent模块的延时才行))

        • 安装:pip install gevent
        • 示例:
          import gevent
          def f(n):
              for i in range(n):
                  print(gevent.getcurrent(), i)  # gevent.getcurrent()表示当前对象
                  gevent.sleep(0.1) # 如果没有该句,则会按顺序执行,遇到延时,才会切换任务
              # g1 = gevent.spawn(f, 5)
              # g2 = gevent.spawn(f, 5)
              # g3 = gevent.spawn(f, 5)
              # g1.join()# 等待g1完成
              # g2.join()
              # g3.join()
              # 上面的代替写法
              gevent.joinall([
                  gevent.spawn(f, 5),
                  gevent.spawn(f, 5)
              ])
          f(10)
          
        • 有耗时需求,想用time.sleep()做延时
          from gevent import monkey
          monkey.patch_all() # 当前代码读取到某个地方去,自动将time.sleep()改成相应的gevent.sleep()
          
        • 案例:图片下载器
          from gevent import monkey
          import gevent
          import urllib.request
          monkey.patch_all()
          def my_download(name, url):
              resp = urllib.request.urlopen(url)
              data = resp.read()
              with open(name, 'wb') as f:
                  f.write(data)
          def main():
              gevent.joinall([
                  gevent.spawn(my_download, '1.jpg', 'http://www.baidu.com'),
                  gevent.spawn(my_download, '2.jpg', 'http://www.baidu.com')
              ])
          if __name__ == "__main__":
              main()
          
    • 进程、线程、协程对比
      • 进程是资源分配的单位
      • 线程是操作系统调度的单位
      • 协程切换任务资源很小,效率高(利用线程等待的时间)
      • 效率低到高:进程 < 线程 < 协程,协程就像调用函数一样简单(协程一定是并发)
      • 协程依赖线程,线程依赖进程