三默网为您带来有关“【python】多任务threading & multiprocess & gevent”的文章内容,供您阅读参考。
【python】多任务threading & multiprocess & gevent
2023-01-20 12:35:23
0. 多任务介绍
- 多个任务一起执行
- 并行:真的多任务,两核各自完成任务
- 并发:多任务交替执行,交替的足够快(时间片轮转)
1. 多线程
- 线程:程序运行起来之后,一定有一个执行代码的东西称之为线程
- 示例及说明
import time
import threading # 引入线程
def sing():
for i in range(5):
print('唱')
time.sleep(1)
def dance():
for i in range(5):
print('跳')
time.sleep(1)
def main():
t1 = threading.Thread(target=sing) # target参数为目标函数
t2 = threading.Thread(target=dance)
t1.start() # 启动线程
t2.start()
thread_list = threading.enumerate() # 所有线程列表
print(thread_list)
len_thread_list = len(thread_list) # 线程数
print(len_thread_list)
if __name__ == '__main__':
main()
- 详细说明
- 线程的执行顺序是没有顺序的,随机,可以通过延时的方式让某个线程先执行
- 循环查看当前的线程的方式是:分别执行延时
- 当调用Thread的时候,不会创建线程,当调用Thread创建出来的实例对象的start方法时才会创建线程并让线程运行
- 如果创建Thread时执行的函数运行结束则该子线程结束
- 主线程结束,则程序结束
- 多线程之间是共享全局变量的:在一个函数中对全局变量进行修改的时候,如果修改了执行(即让全局变量指向了一个新的地方),那么必须使用global,如果是修改了指向空间的数据,则不用global
- 创建线程是指定传递的参数:args参数:
t=threading.Thread(target=sing,args=(g_nums,))
# 其中args是用来传参数的 - 多线程共享全局变量的问题:资源竞争,例如两个线程一起写入某一个文件
- 同步概念:协同步调,按预定的先后顺序进行运行
- 互斥锁解决资源竞争问题:某个线程要更改共享数据时,先将其锁定,其他线程不能更改,直到该线程释放资源,将资源的状态变成非锁定,其他线程才能再次锁定资源,保证每次只有一个线程进入写操作,从而保证多线程情况下数据的正确性
# 创建锁 默认没有上锁 mutex=threading.Lock() # 锁定 如果上锁之前已经上锁,会阻塞在这,直到锁借来位置,之前没上锁,则上锁成功 mutex.acquire() # 释放 mutex.release()
- 死锁:多把锁共用的时候容易出现死锁,可以使用ctrl+C退出
- 添加超时时间
- 程序设计时尽量避免(银行家算法)
- 继承Thread类完成创建线程(创建类线程)
class MyThread(threading.Thread): def run(self): pass if __name__=="__main__": t = MyThread() # 自动调用run函数,有其他函数应该使其在run中调用来让其运行 t.start()
- 多线程版udp聊天器
from socket import socket def recv_msg(udp_socket): while True: # 接收数据 recv_data=udp_socket.recvfrom(1024) print(recv_data) def send_msg(udp_socket,dest_ip,dest_port): while True: # 发送数据 send_data=input("请输入要发送的数据:") udp_socket.sendto(send_data.encode('utf-8'),(dest_ip,dest_port)) def main(): udp_socket=socket(AF_INETT,SOCK_DGTAM) udp_socket.bind(('',7890)) dest_id=input("请输入对方的ip:") dest_port=input("请输入对方的端口:") t_recv=threading.Thread(target=recv_msg, args=(udp_socket,)) t_send=threading.Thread(target=recv_msg, args=(udp_socket,dest_ip,dest_port)) t_recv.start() t_send.start() if __name__=="__main__": main()
2. 多进程
- 代码+运行中的程序用到的资源叫做进程,是操作系统分配资源的基本单元,一个程序可以启动多个进程,进程拥有资源
- 进程的状态:新建、就绪、等待(阻塞)、运行、死亡
- 创建进程简单示例
import multiprocessing,time def test1(num): while True: print("1num------------",num) time.sleep(1) def test2(num): while True: print("2num------------",num) time.sleep(1) def main(): p1 = multiprocessing.Process(target=test1, args=(1,)) # test1为调用函数名,args为要传递的参数 p2 = multiprocessing.Process(target=test2, args=(2,)) p1.start() # 启动进程 p2.start() if __name__=="__main__": main()
- 详细说明
- 查看进程命令:
ps -aux
- 杀死进程:
kill PID
- 进程耗费的资源大,进程数不见得越多越好
- 在进程中,能在代码***享就尽量共享-》写时拷贝
- 代码-》进程=》线程-》执行程序
- 查看进程命令:
- 进程与线程对比
- 都能完成多任务,线程轻量
- 进程是资源分配的最小单元,线程是操作系统调度(执行)的最小单元
- 线程不能独立执行,必须依存在进程中
- 线程执行开销小,但不利于资源的管理和保护
- 进程间通信:通过队列方式(Queue)->先进先出-》解耦
from multiprocessing import Queue q = Queue(3) # 初始化一个Queue对象,最多接收三条put消息 q.put() # 放入消息 q.full() # 是否满了 q.empty() # 是否空了 q.qsize() # 消息数 q.get() # 取数据 q.put_nowait() # 异常方式通知阻塞(数据满了) q.get_nowait() # 异常方式通知空闲(没有数据)
- 示例:
import multiprocessing def down_from_web(q): data = [11, 22, 33, 44] for temp in data: q.put(temp) print("--下载并存入队列--") def analysis_data(q): # 从队列中获取数据 waitting = list() while True: data = q.get() waitting.append(data) if q.empty(): break print(waitting) def main(): # 创建队列 q = multiprocessing.Queue() p1 = multiprocessing.Process(target=down_from_web, args=(q,)) p2 = multiprocessing.Process(target=analysis_data, args=(q,)) p1.start() p2.start() if __name__ == "__main__": main()
- 示例:
- 进程池Pool:重复使用进程池中的进程
- 进程池示例
import random,os def worker(msg): t_start = time.time() print('%s开始执行,进程号为%d' % (msg, os.getpid())) time.sleep(random.random() * 2) t_stop = time.time() print(msg, '执行完毕,耗时%0.2f' % (t_stop - t_start)) po = multiprocessing.Pool(3) for i in range(0, 10): # apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker, (i,)) print("---start---") po.close() # 关闭进程池,不再接收新的请求 po.join() # 等待po中所有子进程执行完成,必须放在close之后 print("---stop---")
- 进程池示例
- 案例: 多任务文件夹copy
import multiprocessing import os def copy_file(queue, file_name, old_folder, new_folder): # 完成文件的复制 with open(old_folder + "/" + file_name, 'rb') as old_f: cont = old_f.read() with open(new_folder + "/" + file_name, 'wb') as new_f: new_f.write(cont) # 文件拷贝完,通知主进程 queue.put(file_name) def main(): # 1.获取要拷贝的文件夹的名字 old_folder = input("请输入要copy文件夹的名字") # 2.创建一个新的文件夹 try: new_folder = old_folder + "[复件]" os.mkdir(new_folder) except: pass # 3.获取文件夹的所有的待copy的文件的名字 listdir() file_names = os.listdir(old_folder) # 4.创建进程池 po = multiprocessing.Pool(5) # 创建一个队列 queue = multiprocessing.Manager().Queue() # 5.向进程池中添加copy文件的任务 for file_name in file_names: po.apply_async(copy_file, args=(queue, file_name, old_folder, new_folder)) po.close() # po.join() # 进度条 all_num = len(file_names) while True: file_name = queue.get() if file_name in file_names: file_names.remove(file_name) copy_rate = (all_num - len(file_names)) * 100 / all_num print("\r%.2f...(%s)" % (copy_rate, file_name) + " " * 50, end='') if copy_rate >= 100: break print() # 复制源文件加中的文件,到新文件夹总的文件去 if __name__ == "__main__": main()
3. 协程
- python 通过yield提供了对协程的基本支持,但是不完全
- 生成器和迭代器都是一种生成的方式(保存代码),节省内存
- 迭代器:从集合的第一个元素开始访问,直到所有的元素被访问结束,迭代器只能往前,不会后退
- 可迭代对象:
- 判断某个东西是否可以迭代:
from collections import Iterable isinstance(a,Iterable) # 判断a是否可迭代
- 自定义可迭代类:判断是否可迭代—>调用iter函数得到
__iter__
方法的返回值-》__iter__
方法的返回值是一个迭代器from collections import Iterable, Iterator class Classmate(object): def __init__(self): self.names = list() self.current_num = 0 def add(self, name): self.names.append(name) def __iter__(self): return self def __next__(self): if self.current_num < len(self.names): ret = self.names[self.current_num] self.current_num += 1 return ret else: raise StopIteration classmate = Classmate() classmate.add('张三') classmate.add('王二') classmate.add('赵四') print("判断是否是可迭代的对象:", isinstance(classmate, Iterable)) classmate_iter = iter(classmate) print("判断是否是迭代器:", isinstance(classmate_iter, Iterator)) # for循环检测是否可迭代—>调用__next__方法 # 迭代器一定是可迭代对象,可迭代对象不一定是迭代器
- 判断某个东西是否可以迭代:
- python2中:xrange():返回一个生成list的方式,range():返回生成的列表
- python3中:range():返回一个生成list的方式,迭代器
- 斐波拉契数列:0,1,1,2,3,5,8,13,21,34…
class Fibonacci(object): def __init__(self,all_num): self.all_num=all_num self.current_num=0 self.a=0 self.b=1 def __iter__(self): return self def __next__(self): if self.current_num<self.all_num: ret=self.a self.a,self.b=self.b,self.a++self.b self.current_num+=1 return ret else: raise StopIteration fibo=Fibonacci(10) for num in fibo: print(num)
- list/tuple等也能接收迭代器做参数(转成列表|元祖)
- 生成器:是一种特殊的迭代器(没有next和iter方法,却是迭代器),示例:
lst=[x*2 for x in range(10)]
- 用生成器实现斐波拉契数列
def create_num(all_num): a, b = 0, 1 current_num = 0 while current_num < all_num: yield a # 如果一个函数中有yield语句,则不再是函数,而是一个生成器的模板 # yield导致yield后面的值返回给num并暂停, # 然后下次执行的时候是从yield下面一句开始执行, # 直到再次遇到yield暂停,如此循环 a, b = b, a + b current_num += 1 return 'ok..' # 正常不会调用 # 如果在调用的时候发现函数中有yield,此时不是调用函数,而是创建一个生成器对象 obj = create_num(10) while True: try: ret = next(obj) print(ret) except Exception as ret: print(ret.value) # 捕获异常的方式获取函数最终返回值 break # 多个调用生成器生成的对象相互独立互不影响
- 使用send唤醒(启动生成器)
-
ret = yield a
# 第一次调用next执行到yield a,第二次调用的时候从赋值开始,如果调用方式为ret=obj.send('saf')
,则是以saf作为yield a的值赋给ret ,打印出来的是ret -
obj.send(n)
可以调用从n开始的下一个值 - 第一次用send启动的时候:
obj.send(None)
-
- 使用yield实现多任务
import time def task_1(): while True: print("------1-----") time.sleep(0.1) yield def task_2(): while True: print("------2-----") time.sleep(0.1) yield def main(): t1 = task_1() t2 = task_2() while True: next(t1) next(t2) if __name__ == "__main__": main()
- 用生成器实现斐波拉契数列
- 第三方gevent为python提供了完善的协程支持
- 详细说明
-
gevent是第三方库,通过greenlet实现协程
-
基本思想:当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO
-
使用greenlet完成多任务
- 安装:
pip install greenlet
- 示例
from greenlet import greenlet import time def task_1(): while True: print("------1-----") gr2.switch() # 手动切换到gr2 time.sleep(0.1) def task_2(): while True: print("------2-----") gr1.switch() # 手动切换到gr1 time.sleep(0.1) gr1 = greenlet(task_1) gr2 = greenlet(task_2) # 切换到gr1 gr1.switch() # 最终效果为gr1和gr2交替执行
- 安装:
-
使用gevent(遇到延时就切换(gevent模块的延时才行))
- 安装:
pip install gevent
- 示例:
import gevent def f(n): for i in range(n): print(gevent.getcurrent(), i) # gevent.getcurrent()表示当前对象 gevent.sleep(0.1) # 如果没有该句,则会按顺序执行,遇到延时,才会切换任务 # g1 = gevent.spawn(f, 5) # g2 = gevent.spawn(f, 5) # g3 = gevent.spawn(f, 5) # g1.join()# 等待g1完成 # g2.join() # g3.join() # 上面的代替写法 gevent.joinall([ gevent.spawn(f, 5), gevent.spawn(f, 5) ]) f(10)
- 有耗时需求,想用time.sleep()做延时
from gevent import monkey monkey.patch_all() # 当前代码读取到某个地方去,自动将time.sleep()改成相应的gevent.sleep()
- 案例:图片下载器
from gevent import monkey import gevent import urllib.request monkey.patch_all() def my_download(name, url): resp = urllib.request.urlopen(url) data = resp.read() with open(name, 'wb') as f: f.write(data) def main(): gevent.joinall([ gevent.spawn(my_download, '1.jpg', 'http://www.baidu.com'), gevent.spawn(my_download, '2.jpg', 'http://www.baidu.com') ]) if __name__ == "__main__": main()
- 安装:
-
- 进程、线程、协程对比
- 进程是资源分配的单位
- 线程是操作系统调度的单位
- 协程切换任务资源很小,效率高(利用线程等待的时间)
- 效率低到高:进程 < 线程 < 协程,协程就像调用函数一样简单(协程一定是并发)
- 协程依赖线程,线程依赖进程
- 详细说明