python进程-进阶

作者: 编程应用  发布:2019-09-26

进程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event

  在计算机中,有一些硬件和软件,例如处理器、打印机等,都属于竞争类资源,当有需求时,很多进程都要争抢这些资源,而对于这类资源,就属于临界资源。当多进程共同处理某一个数据时,这个数据也就属于一个临界资源。操作系统对计算机内各种资源都使其在竞争中有序化,但是对于数据来说,尤其是用户动态产生的数据,当处理时就变成了临界资源,所以我们作为程序猿来说,需要对临界资源加以保护,否则就会出现数据混乱现象。这是在提高程序效率的优势下,带来的一个隐患。

什么是进程(process)?

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于,程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

multiprocessing.Lock

  当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

图片 1图片 2

from multiprocessing import Processimport randomimport timedef func:    print('我是%s'%addr)    time.sleep(random.random    print('谢谢!')if __name__ == '__main__':    l = ['四川的','湖南的','河南的','江苏的']    for addr in l:        p = Process(target=func,args=        p.start()    time.sleep(2)    print('nn我选%s'%random.choice# 关于抢占输出资源的事情,是指多进程并发执行时,并不是一个进程执行完任务后其他进程再执行。# 比如 此程序会输出:我是四川的  我是河南的 我是江苏的 谢谢!谢谢!我是湖南的 谢谢! 谢谢!# 而不是 : 我是四川的 谢谢! 我是河南的 谢谢! ...多进程关于抢占输出资源的事情

多进程关于抢占输出资源的事情图片 3图片 4

from multiprocessing import Processimport randomimport timefrom multiprocessing import Lockdef func(addr,lock):    lock.acquire()    print('我是%s'%addr)    time.sleep(random.random    print('谢谢!')    lock.release()if __name__ == '__main__':    lock = Lock()    l = ['四川的','湖南的','河南的','江苏的']    for addr in l:        p = Process(target=func,args=(addr,lock))        p.start()    time.sleep(4)    print('nn我选%s'%random.choice使用锁维护输出资源

使用锁维护输出资源

上面这种情况,使用了加锁的形式确保了程序的顺序执行,但是执行又变成了串行,降低了效率,但是不得不说,它确保了数据的安全性。

下面举例来说锁的重要性:模拟12306抢票问题。模拟银行账户的存取款问题。

图片 5图片 6

# 注意,文件中存储需要以{'c':1}这种形式,c的引号一定要带# 否则json识别不出来# 此代码的效果,并发执行,但是多进程同时读写同一个文件数据,造成数据混乱from multiprocessing import Process,Lockimport jsonimport timedef check:    with open('a.txt','r',encoding='utf-8') as f:        dic = json.load    print('第%s个人在查票,余票为%s' % (i, dic['c']))    paydef pay:    with open('a.txt','r',encoding='utf-8') as f:        dic = json.load    time.sleep(0.5)# 模拟网络延迟,当购买过程中也会有网络延迟    if dic['c']:        print('第%s个人买到票了 '%i)        dic['c'] -= 1    else:        print('第%s个人没买到票'%i)    with open('a.txt','w') as f:        json.dumpif __name__ == '__main__':    l = Lock()    for i in range(10):        p = Process(target=check,args=(i+1,l))        p.start()多个人同时抢票

多个人同时抢票

很明显,上述例子中,因为多进程同时对一个临界资源进行了读写操作,使文件内数据混乱,也造成了余票为1张,但是很多人都抢到票的假象。那就加锁来解决它吧

图片 7图片 8

from multiprocessing import Process,Lockimport jsonimport timedef check:    with open('a.txt','r',encoding='utf-8') as f:        dic = json.load    print('第%s个人在查票,余票为%s' % (i, dic['c']))    l.acquire()    pay# 为什么在这里加锁? 因为每个人都可以查票,读取数据,不会造成数据混乱,但是当买票的时候,就需要对临界资源的写入,所以对写操作加锁,使某一个进程在写文件时候,其他进程不能碰此文件。    l.release()def pay:    with open('a.txt','r',encoding='utf-8') as f:        dic = json.load    time.sleep(0.5)# 模拟网络延迟,当购买过程中也会有网络延迟    if dic['c']:        print('第%s个人买到票了 '%i)        dic['c'] -= 1    else:        print('第%s个人没买到票'%i)    with open('a.txt','w') as f:        json.dumpif __name__ == '__main__':    l = Lock()    for i in range(10):        p = Process(target=check,args=(i+1,l))        p.start()

加锁解决买票问题

关于银行存取款的问题。同一个账户,某个人一直存,某个人在同一时间一直取,如果不对数据进行保护起来,就会造成的一种数据混乱问题。

from multiprocessing import Process, Lock,Valuedef save_money:    for i in range(100):        time.sleep(0.05)        num.value += 1def draw_money:    for i in range(100):        time.sleep(0.05)        num.value -= 1if __name__ == '__main__':    num = Value('i',1000)# 多进程中共享数据,一个int类型的数据,1000    man = Process(target=save_money,args=    woman = Process(target=draw_money,args=    man.start()    woman.start()    time.sleep(6)    print(num.value)

from multiprocessing import Process, Lock,Valuedef save_money:    for i in range(100):        time.sleep(0.05)        l.acquire()        num.value += 1        l.release()def draw_money:    for i in range(100):        time.sleep(0.05)        l.acquire()# 在操作存取款的数据时,先将数据锁住,不允许其他人更改此数据        num.value -= 1        l.release()if __name__ == '__main__':    l = Lock()    num = Value('i',1000)# 多进程中共享数据,一个int类型的数据,1000    man = Process(target=save_money,args=    woman = Process(target=draw_money,args=    man.start()    woman.start()    time.sleep(6)    print(num.value)这样才对!!!

什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

multiprocessing.Semaphore

上述讲的Lock,属于互斥锁,也就是一把钥匙配备一把锁,同时只允许锁住某一个数据。而信号量则是多把钥匙配备多把锁,也就是说同时允许锁住多个数据。

比如在一个粉红发廊,里边有5位服务人员,那么这个发廊最多就同时允许进入5位客人,当又有第6位客人来的时候,就需要在门外等待;当服务人员服务完某位客人后,才允许后续的人再进来一个,换句话说,这个发廊最多同时接待5位客人,多的客人必须等待。

信号量同步基于内部计数器,用户初始化一个计数器初值(比如上述例子中就初始化为5),每调用一次acquire(),计数器减1;每调用一次release(),计数器加1。当计数器为0时,acquire()调用被阻塞。这是迪科斯彻信号量概念P的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

举个栗子:

from multiprocessing import Semaphorefrom multiprocessing import Processimport timeimport randomdef sing:    se.acquire()# 每次进来一位客人,信号量内部计数器减1    print('%s进入小黑屋'%i)    time.sleep(random.randint(1,3))    print('%s交钱走人'%i)    se.release()# 每次离开一位客人,信号量内部计数器加1if __name__ == '__main__':    se = Semaphore# 初始化5把钥匙配备5把锁    for i in range: # 模拟10个人要进入小黑屋子        p = Process(target=sing,args=        p.start()

进程与线程的区别?

线程共享内存空间,进程的内存是独立的。

同一个进程的线程之间可以直接交流,但两个进程相互通信必须通过一个中间代理。

创建一个新的线程很简单,创建一个新的进程需要对其父进程进行一次克隆。

一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程。

multiprocessing.Event

python中的事件机制,主要用于主进程控制其他进程的执行,事件主要提供了三个方法 set、wait、clear。

    e = Event()
    e.set() #将is_set()设为True
    e.clear() # 将is_set()设为False
    e.wait() #判断is_set的bool值,如果bool为True,则非阻塞,bool值为False,则阻塞
    e.is_set() # 标识
    事件是通过is_set()的bool值,去标识e.wait() 的阻塞状态
    当is_set()的bool值为False时,e.wait()是阻塞状态
    当is_set()的bool值为True时,e.wait()是非阻塞状态
    当使用set()时,是把is_set的bool变为True
    当使用clear()时,是把is_set的bool变为False

举个栗子:

from multiprocessing import Process, Eventimport timedef tra:    while 1: #红绿灯需要一直亮着,要么红灯,要么绿灯        if e.is_set(): #True代表绿灯了,表示可以过车            time.sleep#睡5秒,让车在这5秒的时间内通过            print('33[31m红灯亮33[0m')#绿灯亮5秒后提示红灯亮            e.clear()#把is_set设置为False        else:            time.sleep#此时代表红灯亮,应该红灯亮5秒.在此等5秒            print('33[32m绿灯亮33[0m')#红灯亮够5秒该绿灯亮了            e.set()#将is_set设置为Truedef Car:    e.wait()#车等在红绿灯,此时要看是红灯还剩绿灯,如果is_set = True 就可以过车    print('第%s辆车过去了' % i)if __name__ == '__main__':    e = Event()    triff_light = Process(target=tra,args=#信号灯的进程    triff_light.start()    for i in range:#描述50辆车的进程        if i % 3 == 0:            time.sleep(2)        car = Process(target=Car,args=(i+1,e,))        car.start()

Python GIL(Global Interpreter Lock)

无论开启多少个线程,有多少个CPU,python在执行的时候在同一时刻只允许一个线程允许。

生产者消费者模型

第一种:

from multiprocessing import Queue,Processdef producer(q,product):    for i in range(20):        info = product + '的娃娃%s号' % i        q.put    q.putdef consumer:    while 1:        info = q.get()        if info:            print('%s拿走了%s' % (name,info))        else:            breakif __name__ == '__main__':    q = Queue(20)    p_pro = Process(target=producer,args=(q,'炫彩'))    p_con = Process(target=consumer,args=(q,'corn'))    p_pro.start()    p_con.start()

第二种:

from multiprocessing import Queue,Processdef producer(q,product):    for i in range(20):        info = product + '的娃娃%s号' % str        q.putdef consumer(q,name,color):    while 1:        info = q.get()        if info:            print('%s,%s拿走了%s33[0m' % (color,name,info))        else:# 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识            break# 此时消费者结束即可if __name__ == '__main__':    q = Queue()    p_pro1 = Process(target=producer,args=(q,'炫彩'))    p_pro2 = Process(target=producer,args=(q,'苍井井'))    p_pro3 = Process(target=producer,args=(q,'波多多'))    p_con1 = Process(target=consumer,args=(q,'alex','33[31m'))    p_con2 = Process(target=consumer,args=(q,'wusir','33[32m'))    p_l = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]    [i.start() for i in p_l]    p_pro1.join()    p_pro2.join()    p_pro3.join()    q.put# 几个消费者就要接受几个结束标识    q.put

Python threading模块

进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

进程间通信--IPC(Inter-Process Communication)

直接调用

  1. import threading,time

  2.  

  3. def run_num(num):

  4.     """

  5.     定义线程要运行的函数

  6.     :param num:

  7.     :return:

  8.     """

  9.     print("running on number:%s"%num)

  10.     time.sleep(3)

  11.  

  12. if __name__ == '__main__':

  13.     # 生成一个线程实例t1

  14.     t1 = threading.Thread(target=run_num,args=(1,))

  15.     # 生成一个线程实例t2

  16.     t2 = threading.Thread(target=run_num,args=(2,))

  17.     # 启动线程t1

  18.     t1.start()

  19.     # 启动线程t2

  20.     t2.start()

  21.     # 获取线程名

  22.     print(t1.getName())

  23.     print(t2.getName())

  24. 输出:

  25. running on number:1

  26. running on number:2

  27. Thread-1

  28. Thread-2

队列(multiprocess.Queue)

import queue # 不能进行多进程之间的数据传输

from multiprocessing import Queue 借助Queue解决生产者消费者模型,队列是安全的。
  q = Queue
  num : 队列的最大长度
  q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
  q.put()# 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

  q.get_nowait()# 不阻塞,如果有数据直接获取,没有数据就报错
  q.put_nowait()# 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

from multiprocessing import JoinableQueue#可连接的队列

  JoinableQueue是继承Queue,所以可以使用Queue中的方法

  并且JoinableQueue又多了两个方法
  q.join()# 用于生产者。等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
  q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。

继承式调用

  1. import threading,time

  2.  

  3. class MyThread(threading.Thread):

  4.     def __init__(self,num):

  5.         threading.Thread.__init__(self)

  1.         self.num = num

  2.     # 定义每个线程要运行的函数,函数名必须是run

  3.     def run(self):

  4.         print("running on number:%s"%self.num)

  1.         time.sleep(3)

  2.  

  3. if __name__ == '__main__':

  4.     t1 = MyThread(1)

  5.     t2 = MyThread(2)

  6.     t1.start()

  7.     t2.start()

  8. 输出:

  9. running on number:1

  10. running on number:2

管道(multiprocess.Pipe)

from multiprocessing import Pipe

con1,con2 = Pipe()

 管道是不安全的.

 管道是用于多进程之间通信的一种方式.

 如果在单进程中使用管道,那么就是con1收数据,就是con2发数据.

            如果是con1发数据,就是con2收数据

 如果在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发

                  父进程使用con1发,子进程就必须使用con2收

                  父进程使用con2收,子进程就必须使用con1发

                  父进程使用con2发,子进程就必须使用con1收

在管道中有一个著名的错误叫做EOFError.是指,父进程如果关闭了发送端,子进程还继续接收数据,就会产生EOFError错误

进程间的共享内存(Value,Manager)

 from multiprocessing import Manager
 m = Manager()
 num = m.dict
 num = m.list

Join and Daemon

进程池

Join

Join的作用是阻塞主进程,无法执行join后面的程序。

多线程多join的情况下,依次执行各线程的join方法,前面一个线程执行结束才能执行后面一个线程。

无参数时,则等待该线程结束,才执行后续的程序。

设置参数后,则等待该线程设定的时间后就执行后面的主进程,而不管该线程是否结束。

  1. import threading,time

  2.  

  3. class MyThread(threading.Thread):

  4.     def __init__(self,num):

  5.         threading.Thread.__init__(self)

  1.         self.num = num

  2.     # 定义每个线程要运行的函数,函数名必须是run

  3.     def run(self):

  4.         print("running on number:%s"%self.num)

  1.         time.sleep(3)

  2.         print("thread:%s"%self.num)

  3.  

  4. if __name__ == '__main__':

  5.     t1 = MyThread(1)

  6.     t2 = MyThread(2)

  7.     t1.start()

  8.     t1.join()

  9.     t2.start()

  10.     t2.join()

  11. 输出:

  12. running on number:1

  13. thread:1

  14. running on number:2

  15. thread:2

设置参数效果如下:

  1. if __name__ == '__main__':

  2.     t1 = MyThread(1)

  3.     t2 = MyThread(2)

  4.     t1.start()

  5.     t1.join(2)

  6.     t2.start()

  7.     t2.join()

  8. 输出:

  9. running on number:1

  10. running on number:2

  11. thread:1

  12. thread:2

 含义:

   进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。

 因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数

 开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让
 cpu帮你调度它。

 进程池还会帮程序员去管理池中的进程。

Daemon

默认情况下,主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)。方法是通过调用线程类的setDaemon()方法。

  1. import time,threading

  2.  

  3. def run(n):

  4.     print("%s".center(20,"*")%n)

  5.     time.sleep(2)

  6.     print("done".center(20,"*"))

  7.  

  8. def main():

  9.     for i in range(5):

  10.         t = threading.Thread(target=run,args=(i,))

  11.         t.start()

  12.         t.join(1)

  13.         print("starting thread",t.getName())

  14.  

  15. m = threading.Thread(target=main,args=())

  16. # 将main线程设置位Daemon线程,它作为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完成

  1. m.setDaemon(True)

  2. m.start()

  3. m.join(3)

  4. print("main thread done".center(20,"*"))

  5. 输出:

  6. *********0*********

  1. starting thread Thread-2

  2. *********1*********

  1. ********done********
  1. starting thread Thread-3

  2. *********2*********

  1. **main thread done**

 方法:

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据就需要线程锁。

  1. import time,threading

  2.  

  3. def addNum():

  4.     # 在每个线程中都获取这个全局变量

  1.     global num

  2.     print("--get num:",num)

  3.     time.sleep(1)

  4.     # 对此公共变量进行-1操作

  5.     num -= 1

  6. # 设置一个共享变量

  7. num = 100

  8. thread_list = []

  9. for i in range(100):

  10.     t = threading.Thread(target=addNum)

  1.     t.start()

  2.     thread_list.append(t)

  3. # 等待所有线程执行完毕

  4. for t in thread_list:

  5.     t.join()

  6.  

  7. print("final num:",num)

加锁版本

Lock时阻塞其他线程对共享资源的访问,且同一线程只能acquire一次,如多于一次就出现了死锁,程序无法继续执行。

  1. import time,threading

  2.  

  3. def addNum():

  4.     # 在每个线程中都获取这个全局变量

  1.     global num

  2.     print("--get num:",num)

  3.     time.sleep(1)

  4.     # 修改数据前加锁

  5.     lock.acquire()

  6.     # 对此公共变量进行-1操作

  7.     num -= 1

  8.     # 修改后释放

  9.     lock.release()

  10. # 设置一个共享变量

  11. num = 100

  12. thread_list = []

  13. # 生成全局锁

  14. lock = threading.Lock()

  15. for i in range(100):

  16.     t = threading.Thread(target=addNum)

  1.     t.start()

  2.     thread_list.append(t)

  3. # 等待所有线程执行完毕

  4. for t in thread_list:

  5.     t.join()

  6.  

  7. print("final num:",num)

 1).map(func,iterable)

    func:进程池中的进程执行的任务函数
    iterable: 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数

GIL VS Lock

GIL保证同一时间只能有一个线程来执行。lock是用户级的lock,与GIL没有关系。

图片 9

 2).apply(func,args=同步执行任务

    func:进程池中的进程执行的任务函数
    args: 可迭代对象型的参数,是传给任务函数的参数

    同步处理任务时,不需要close和join

    同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

RLock(递归锁)

Rlock允许在同一线程中被多次acquire,线程对共享资源的释放需要把所有锁都release。即n次acquire,需要n次release。

  1. def run1():

  2.     print("grab the first part data")

  3.     lock.acquire()

  4.     global num

  5.     num += 1

  6.     lock.release()

  7.     return num

  8.  

  9. def run2():

  10.     print("grab the second part data")

  11.     lock.acquire()

  12.     global num2

  13.     num2 += 1

  14.     lock.release()

  15.     return num2

  16.  

  17. def run3():

  18.     lock.acquire()

  19.     res = run1()

  20.     print("between run1 and run2".center(50,"*"))

  1.     res2 = run2()

  2.     lock.release()

  3.     print(res,res2)

  4.  

  5. if __name__ == '__main__':

  6.     num,num2 = 0,0

  7.     lock = threading.RLock()

  8.     for i in range(10):

  9.         t = threading.Thread(target=run3)

  10.         t.start()

  11.  

  12. while threading.active_count() != 1:

  13.     print(threading.active_count())

  1. else:
  1.     print("all threads done".center(50,"*"))

  2.     print(num,num2)

这两种锁的主要区别是,RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意,如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。

 3).apply_async(func,args=(),callback=None)异步执行任务

    func:进程池中的进程执行的任务函数
    args: 可迭代对象型的参数,是传给任务函数的参数
    callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的

    异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)

    异步处理任务时,必须要加上close和join

Semaphore(信号量)

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如售票处有3个窗口,那最多只允许3个人同时买票,后面的人只能等前面任意窗口的人离开才能买票。

  1. import threading,time

  2.  

  3. def run(n):

  4.     semaphore.acquire()

  5.     time.sleep(1)

  6.     print("run the thread:%s"%n)

  7.     semaphore.release()

  8.  

  9. if __name__ == '__main__':

  10.     # 最多允许5个线程同时运行

  11.     semaphore = threading.BoundedSemaphore(5)

  12.     for i in range(20):

  13.         t = threading.Thread(target=run,args=(i,))

  14.         t.start()

  15.  

  16. while threading.active_count() != 1:

  17.     # print(threading.active_count())

  1.     pass

  2. else:

  1.     print("all threads done".center(50,"*"))

回调函数

  进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作

回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数

Timer(定时器)

Timer隔一定时间调用一个函数,如果想实现每隔一段时间就调用一个函数,就要在Timer调用的函数中,再次设置Timer。Timer是Thread的一个派生类。

  1. import threading

  2.  

  3. def hello():

  4.     print("hello,world!")

  5. # delay 5秒之后执行hello函数

  6. t = threading.Timer(5,hello)

  7. t.start()

Event

Python提供了Event对象用于线程间通信,它是有线程设置的信号标志,如果信号标志位为假,则线程等待指导信号被其他线程设置为真。Event对象实现了简单的线程通信机制,它提供了设置信号、清除信号、等待等用于实现线程间的通信。

1. 设置信号

使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的转态,当使用event对象的set()方法后,isSet()方法返回真。

1. 清除信号

使用Event的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear()方法后,isSet()方法返回假。

1. 等待

Event的wait()方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志为假时,则wait()方法一直等待其为真时才返回。

通过Event来实现两个或多个线程间的交互,下面以红绿灯为例,即启动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红停绿行的规则。

  1. import threading,time,random

  2.  

  3. def light():

  4.     if not event.isSet():

  5.         event.set()

  6.     count = 0

  7.     while True:

  8.         if count < 5:

  9.             print("33[42;1m--green light on--33[0m".center(50,"*"))

  10.         elif count < 8:

  11.             print("33[43;1m--yellow light on--33[0m".center(50,"*"))

  12.         elif count < 13:

  13.             if event.isSet():

  14.                 event.clear()

  15.             print("33[41;1m--red light on--33[0m".center(50,"*"))

  16.         else:

  17.             count = 0

  18.             event.set()

  19.         time.sleep(1)

  20.         count += 1

  21.  

  22.  

  23. def car(n):

  24.     while 1:

  25.         time.sleep(random.randrange(10))

  1.         if event.isSet():

  2.             print("car %s is running..."%n)

  3.         else:

  4.             print("car %s is waiting for the red light..."%n)

  5.  

  6. if __name__ == "__main__":

  1.     event = threading.Event()

  2.     Light = threading.Thread(target=light,)

  3.     Light.start()

  4.  

  5.     for i in range(3):

  6.         t = threading.Thread(target=car,args=(i,))

  7.         t.start()

queue队列

Python中队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块。

创建一个队列对象

  1. import queue

  2.  

  3. q = queue.Queue(maxsize = 10)

queue.Queue类是一个队列的同步实现。队列长度可以无限或者有限。可以通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1表示队列长度无限。

将一个值放入队列中

  1. q.put("a")

调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put()方法将引发Full异常。

将一个值从队列中取出

  1. q.get()

调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直到有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Python Queue模块有三种队列及构造函数

  1. # 先进先出

  2. class queue.Queue(maxsize=0)

  3. # 先进后出

  4. class queue.LifoQueue(maxsize=0)

  5. # 优先级队列级别越低越先出

  6. class queue.PriorityQueue(maxsize=0)

常用方法

  1. q = queue.Queue()

  2. # 返回队列的大小

  3. q.qsize()

  4. # 如果队列为空,返回True,反之False

  1. q.empty()

  2. # 如果队列满了,返回True,反之False

  1. q.full()

  2. # 获取队列,timeout等待时间

  3. q.get([block[,timeout]])

  4. # 相当于q.get(False)

  5. q.get_nowait()

  6. # 等到队列为空再执行别的操作

  7. q.join()

生产者消费者模型

在开发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不再等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

最基本的生产者消费者模型的例子。

  1. import queue,threading,time

  2.  

  3. q = queue.Queue(maxsize=10)

  4.  

  5. def Producer():

  6.     count = 1

  7.     while True:

  8.         q.put("骨头%s"%count)

  9.         print("生产了骨头",count)

  10.         count += 1

  11.  

  12. def Consumer(name):

  13.     while q.qsize() > 0:

  14.         print("[%s] 取到[%s]并且吃了它..."%(name,q.get()))

  15.         time.sleep(1)

  16.  

  17. p = threading.Thread(target=Producer,)

  1. c1 = threading.Thread(target=Consumer,args=("旺财",))

  2. c2 = threading.Thread(target=Consumer,args=("来福",))

  3. p.start()

  4. c1.start()

  5. c2.start()

多线程multiprocessing

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start()、run()、join()的方法。此外multiprocessing包中也有Lock、Event、Semaphore、Condition类(这些对象可以像多线程那样,通过参数传递各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部分与threading使用同一套API,只不过换到了多进程的情景。

注意:

在UNIX平台上,当某个进程结束以后,该进程需要被父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法(实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。

multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock、Event、Semaphore、Condition等同步方式(因为它们占据的不是用户进程的资源)。

多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

下面各个线程和进程都做同一件事——打印PID。但是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出,这样输出的字符会混合在一起无法阅读。使用Lock同步,在一个任务输出完成之后,在允许另一个任务输出,可以避免多个任务同时向终端输出。

  1. import os,threading,multiprocessing
  1.  

  2. def info(sign,lock):

  3.     lock.acquire()

  4.     print(sign,os.getpid())

  5.     lock.release()

  6.  

  7. print("Main:",os.getpid())

  8.  

  9. if __name__ == "__main__":

  1.     record = []

  2.     lock = threading.Lock()

  3.     for i in range(5):

  4.         thread = threading.Thread(target=info,args=("thread",lock))

  5.         thread.start()

  6.         record.append(thread)

  7.  

  8.     for i in record:

  9.         i.join()

  10.     print("---------------")

  11.     record2 = []

  12.     lock2 = multiprocessing.Lock()

  13.     for i in range(5):

  14.         process = multiprocessing.Process(target=info,args=("process",lock2))

  15.         process.start()

  16.         record.append(process)

  17.  

  18.     for i in record2:

  19.         i.join()

所有的Thread的PID都与主程序相同,而每个Process都有一个不同的PID。

进程间通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以使用Queue和 Pipe。

Pipe

Pipe可以是单向(half-duplex),也可以是双向(duplex)。通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从pipe一端输入对象,然后被pipe另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

  1. from multiprocessing import Process,Pipe

  2.  

  3. def f(conn):

  4.     conn.send([42,None,"Hello"])

  5.     conn.close()

  6.  

  7. if __name__ == "__main__":

  1.     parent_conn,child_conn = Pipe()
  1.     p = Process(target=f,args=(child_conn,))

  2.     p.start()

  3.     print(parent_conn.recv())

  4.     p.join()

Pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe的一端(Connection对象)。对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

Queue

使用方法跟threading里的queue差不多。

Queue与Pipe相类似,都是先进先出的结构。但Queue允许多进程放入,多个进程从队列取出对象。Queue使用multiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一个进程从Queue中取出,并打印自己的PID以及get()的字符串。

  1. from multiprocessing import Process,Queue

  2.  

  3. def f(q):

  4.     q.put([44,None,"Hello"])

  5.  

  6. if __name__ == "__main__":

  1.     q = Queue()

  2.     p = Process(target=f,args=(q,))

  1.     p.start()

  2.     print(q.get())

  3.     p.join()

共享变量Managers

Python中进程间共享数据,处理基本的Queue和Pipe外,还提供了更高层次的封装。使用multiprocessing.Managers可以简单地使用这些高级接口。

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问,从而达到多进程数据通信且安全。

Manager支持的类型有list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value和Array。

  1. from multiprocessing import Manager,Process

  2. import os

  3.  

  4. def f(d,l):

  5.     d[os.getpid()] = os.getpid()

  1.     l.append(os.getpid())

  2.     print(l)

  3.  

  4. if __name__ == "__main__":

  1.     with Manager() as manager:

  2.         d = manager.dict()

  3.         l = manager.list(range(5))

  1.         p_list = []

  2.         for i in range(10):

  3.             p = Process(target=f,args=(d,l,))

  4.             p.start()

  5.             p_list.append(p)

  6.         for res in p_list:

  7.             res.join()

  8.  

  9.         print(d)

  10.         print(l)

进程同步

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

  1. from multiprocessing import Process,Lock

  2.  

  3. def f(l,i):

  4.     l.acquire()

  5.     try:

  6.         print("hello word",i)

  7.     finally:

  8.         l.release()

  9.  

  10. if __name__ == "__main__":

  1.     lock = Lock()

  2.     for i in range(10):

  3.         Process(target=f,args=(lock,i)).start()

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程为止。

进程池中有两个方法:apply(阻塞)和apply_async(非阻塞)。

apply(func[,args[,kwds]])

apply用于传递不定参数,主进程会阻塞与函数。主进程的执行流程同单进程一致。

apply_async(func[,args[,kwds[,callback]]])

与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。主进程循环运行过程中不等待apply_async的返回结果,在主进程结束后,即使子进程还未返回整个程序也会退出。虽然apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,如使用result.get()会阻塞主进程。如果对返回结果不感兴趣,那么可以在主进程中使用pool.close与pool.join来防止主进程退出。注意join方法一定要close或terminate之后调用。

  1. from multiprocessing import Process,Pool,freeze_support

  2. import time,os

  3.  

  4. def Foo(i):

  5.     time.sleep(2)

  6.     print("in process",os.getpid())

  7.     return i + 100

  8.  

  9. def Bar(arg):

  10.     print("-->exec done:",arg,os.getpid())

  1.  

  2. if __name__ == "__main__":

  1.     # freeze_support()

  2.     # 允许进程池同时放入3个进程

  1.     pool = Pool(processes=3)

  2.     print("主进程",os.getpid())

  3.     for i in range(10):

  4.         # callback回调函数,每个进程结束的时候调用

  5.         pool.apply_async(func=Foo,args=(i,),callback=Bar)

  1.         # 串行

  2.         # pool.apply(func=Foo,args=(i,))

  3.     print('end')

  4.     pool.close()

  5.     # 进程池中进程执行完毕后再关闭,如果注释程序直接关闭

  6.     pool.join()

close()

关闭pool,使其不再接受新的任务。

terminate()

结束工作进程,不再处理未处理的任务。

join()

主进程阻塞等待子进程的退出,join方法要在close或terminate之后使用。

本文由金沙澳门官网送注册58发布于编程应用,转载请注明出处:python进程-进阶

关键词:

上一篇:没有了
下一篇:MVC中将数据从Controller传递到视图