python--(进程相关(一))

python--(进程相关(一))一.进程的创建import timeimport os#os.getpid() 获取自己进程的id号#os.getppid() 获取自己进程的父进程id号from multiprocessing import Processdef func():print(aaa)time.sleep(1)prin...

python--(进程相关(一))

一.进程的创建

import time
import os

#os.getpid() 获取自己进程的id号
#os.getppid() 获取自己进程的父进程id号

from multiprocessing import Process
def func():
    print("aaa")
    time.sleep(1)
    print("子进程>>>",os.getpid())
    print("该子进程的父进程>>>",os.getppid())
    print(12345)

if __name__ == "__main__":
    p = Process(target=func,)
    p.start()
    print("*" * 10)
    print("主进程>>>",os.getpid())
    print("父进程>>>",os.getppid())

给要执行的函数传参数
import time
from multiprocessing import Process
def func(x,y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == "__main__":
    p = Process(target=func,args=("姑娘","来玩啊"))#这是func需要接受的参数的传输方式
    p.start()
    print("父进程执行结束")

二.join方法

import time
from multiprocessing import Process

验证join方法
global_num = 100
def func1():
    time.sleep(2)
    global global_num
    global_num = 0
    print("子进程全局变量>>>",global_num)
if __name__ == "__main__":
    p1 = Process(target=func1,)
    p1.start()
    print("子进程执行")
    time.sleep(3)
    p1.join()#阻塞住,等待你的p1子进程执行sing结束,主进程的程序才能从这里继续往下执行
    print("主进程的全局变量>>>",global_num)

验证了一下并发的执行时间
import time
from multiprocessing import Process

def func1(n):
    time.sleep(n)
    print("func1",n)
def func2(n):
    time.sleep(n)
    print("func2",n)
def func3(n):
    time.sleep(n)
    print("func3",n)
if __name__ == "__main__":
    p1 = Process(target=func1,args=(1,))
    p2 = Process(target=func2,args=(2,))
    p3 = Process(target=func3,args=(3,))

    p1.start()
    p2.start()
    p3.start()

for循环在创建进程中的应用
import time
from multiprocessing import Process
def func1(n):
    time.sleep(1)
    print(n)

if __name__ == "__main__":
    pro_list = []
    for i in range(10):
        p1 = Process(target=func1,args=(i,))
        p1.start()
        pro_list.append(p1)
        # p1.join()

    # for p in pro_list:
    #     # p.join()
    p1.join()
    print("主进程结束")
View Code

 

  僵尸进程和孤儿进程

import time
import os
from multiprocessing import Process

def func1():
    time.sleep(30)
    print(os.getpid())
    print('子进程')

if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.start()
    # p1.join()
    # time.sleep(2)
    # print(p1.pid)
    print('主进程的ID',os.getpid())
    print('主进程结束')
View Code


三.创建进程的两种方式

import time
from multiprocessing import Process
import os
# import test01
# def func1(n):
#     # time.sleep(1)
#     print(n)
#
# def func2(n):
#     # time.sleep(1)
#     print(n)
#
# def func3(n):
#     # time.sleep(1)
#     print(n)
#
# def func4(n):
#     # time.sleep(1)
#     print(n)
#
# if __name__ == '__main__':
#     p1 = Process(target=func1,args=(1,))
#     p2 = Process(target=func2,args=(2,))
#     p3= Process(target=func3,args=(3,))
#     p4 = Process(target=func4,args=(4,))
#     p1.start() # run()
#     p2.start()
#     p3.start()
#     p4.start()
#     # time.sleep(0.5)
#     print('主进程结束')

    # 之前同步执行的
    # func1(1)
    # func2(2)
    # func3(3)
    # func4(4)

创建进程的第一种方式:
    # p1 = Process(target=func1, args=(1,))
    # p1.start()
创建进行的第二种方式:
    #自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法

# class MyProcess(Process):
#     def __init__(self,n,name):
#         super().__init__()
#         self.n = n
#         self.name = name
#
#     def run(self):
#         # print(1+1)
#         # print(123)
#         print('子进程的进程ID',os.getpid())
#         print('你看看n>>',self.n)
#
# if __name__ == '__main__':
#     p1 = MyProcess(100,name='子进程1')
#     p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
#     print('p1.name',p1.name)
#     print('p1.pid',p1.pid)
#     print('主进程结束')
View Code

 

四.进程的其他方法terminate is_alive.py

import time
from multiprocessing import Process
def func1():
    time.sleep(2)
    print()
    print("子进程")
if __name__ == "__main__":
    p1 = Process(target=func1,)
    p1.start()
    p1.terminate() #给操作系统发了一个关闭p1子进程的信号,关闭进程
    time.sleep(1)
    print("进程是否还活着:",p1.is_alive())#是返回True,否返回False
    print(p1.pid)
    print("主进程结束")
View Code

 

五.守护进程

#守护的子进程跟着主进程走

import time
import os
from multiprocessing import Process
def func():
    time.sleep(5)
    print('子进程', os.getpid())

if __name__ == '__main__':
    p1 = Process(target=func)
    p1.daemon = True   # 设置守护进程, 当主进程结束时全部子进程立即结束
    p1 .start()
    # time.sleep(5.5)
    print('主进程结束')
View Code

 

六.验证进程之间是空间隔离的

import time
from multiprocessing import Process
#进程之间是空间隔离的,不共享资源
global_num = 100
def func1():
    global global_num
    global_num = 0
    print("子进程全局变量>>>",global_num)
if __name__ == "__main__":
    p1 = Process(target=func1,)
    p1.start()
    time.sleep(1)
    print("主进程的全局变量>>>",global_num)
View Code

 

七.子进程中不能使用input

from multiprocessing import Process

def func1():
    s = input('>>>')

if __name__ == '__main__':

    p1 = Process(target=func1,)
    p1.start()

    # a = input('>>>:')
    print('主进程结束')

##报错
View Code


 

八.进程锁

  ticket_lock = Lock()#创建锁  .acquire()#加锁,  .release()#解锁

同步锁的作用:#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 # 虽然可以用文件共享数据实现进程间通信,但问题是:
# 1.效率低(共享数据基于文件,而文件是硬盘上的数据) # 2.需要自己加锁处理
import json
import time
import random
from multiprocessing import Process,Lock

def get_ticket(i,ticket_lock):
    print("我们都到齐了,大家预备!!123")
    time.sleep(1)
    #所有代码 异步执行,到这里等待,同时再去抢下面的代码执行
    ticket_lock.acquire()
    #这里有个门,只有一个人能够抢到这个钥匙,加锁
    with open("ticket","r") as f:
        last_ticket_info = json.load(f)
        #将文件数据load为字典类型的数据
    last_ticket = last_ticket_info["count"]
    print(last_ticket)
    #查看一下余票的信息
    if last_ticket > 0:
        #如果看到余票大于零,说明你可以抢到票
        time.sleep(random.random())
        #模拟网络延迟时间
        last_ticket = last_ticket - 1
        last_ticket_info["count"] = last_ticket
        with open("ticket","w") as f:
            #将修改后的参数写回文件
            json.dump(last_ticket_info,f)
        print("%s号抢到了,丫nb!" % i)
    else:
        print("%s号傻逼,没票了,明年再来" % i)
        ticket_lock.release()
if __name__ ==  "__main__":
    ticket_lock = Lock()
    #创建一个进程锁
    for i in range(10):
        p = Process(target=get_ticket, args=(i, ticket_lock))
        p.start()
进程锁模拟购票系统

 

九.信号量

Semaphore()
互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
import time
import random
from multiprocessing import Process,Semaphore

def dbj(i,s):
    s.acquire()
    print('%s号男主人公来洗脚'%i)
    print('-------------')
    time.sleep(random.randrange(3,6))
    # print(time.time())
    s.release()

if __name__ == '__main__':
    s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.
    for i in range(10):
        p1 = Process(target=dbj,args=(i,s,))
        p1.start()
View Code

十.事件

e = Event()#  e.set()#将e改为True  e.clear()   # 将e改为False
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
from multiprocessing import Process, Event
e = Event() #False True

print(e.is_set())
e.set() #将e事件的状态改为True
print("在这里等待")
e.clar()   #将e事件的状态改为False
print("111")
e.wait()
print("是真的吗")
View Code
import time
from multiprocessing import Process,Event

#模拟红绿灯执行状态的函数
def traffic_lights(e):
    while 1:
        print("红灯啦")
        time.sleep(5)
        e.set() #将e改为True
        print("绿灯了")
        time.sleep(3)
        e.clear()   #将e改为False
def car(i,e):
    if not e.is_set():  #新来的车看到的是红灯
        print("我们在等待....")
        e.wait()
        print("走你")
    else:
        print("可以走了!!!")
if __name__ == "__main__":
    e = Event()
    hld = Process(target=traffic_lights, args=(e,))
    hld.start()
    while 1:
        time.sleep(0.5)
        #创建10个车
        for i in range(3):
            p1 = Process(target=car,args=(i,e,))
            p1.start()
Even模拟红绿灯

 

十一.队列

# 遵循先进先出的原则   q = Queue(3)    创建3个队列 q.put()发送数据  q.get()接受数据
q = Queue([maxsize])  #创建共享的进程队列

q.get( [ block [ ,timeout ] ] )  #返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait( ) #和q.get(False)方法,一样

q.put(item [, block [,timeout ] ] ) #将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() #返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

q.empty() #如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

.full() #如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

q.close() #关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread()  #不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() #连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
队列的相关方法
 
from multiprocessing import Process,Queue
#先进先出
q = Queue(3)
q.put(1)
q.put(2)
# print(q.full()) #q.full()队列满了返回True,不满返回False
q.put(3)
# print('>>>>',q.full())

q.get_nowait()= ()  #不会阻塞住,相当于空队列

# try:
#     q.get(False)  # queue.Empty
#     q.get_nowait() #queue.Empty
# except:
#     print('队列目前是空的')

# while 1:
#     try:
#         q.get(False)  #queue.Empty
#     except:
#         print('队列目前是空的')
View Code

  队列实现进程的通信

import time
from multiprocessing import Process,Queue

def girl(q):
    print('来自boy的信息',q.get())
    print('来自校领导的凝视',q.get())
def boy(q):
    q.put('约吗')

if __name__ == '__main__':
    q = Queue(5)
    boy_p = Process(target=boy,args=(q,))
    girl_p = Process(target=girl,args=(q,))
    boy_p.start()
    girl_p.start()
    time.sleep(1)
    q.put('好好工作,别乱搞')
队列实现进程的通信


十二.生产者消费者模式

#生产者消费者模型总结

    #程序中有两类角色
        一类负责生产数据(生产者)
        一类负责处理数据(消费者)
        
    #引入生产者消费者模型为了解决的问题是:
        平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
        
    #如何实现:
        生产者<-->队列<——>消费者
    #生产者消费者模型实现类程序的解耦和

 

import time
from multiprocessing import Process,Queue
def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生产了包子%s号' % i)
        q.put(i)
    q.put(None)  #针对第三个版本的消费者,往队列里面加了一个结束信号
#版本1
# def consumer(q):
#     while 1:
#         time.sleep(2)
#         s = q.get()
#         print('消费者吃了%s包子' % s)

#版本2
# def consumer(q):
#     while 1:
#         time.sleep(0.5)
#         try:
#             s = q.get(False)
#             print('消费者吃了%s包子' % s)
#         except:
#             break

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消费者吃了%s包子' % s)
生产者消费者模型
生产者消费者模型
import time
from multiprocessing import Process,Queue

def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生产了包子%s号' % i)
        q.put(i)

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消费者吃了%s包子' % s)

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.start()
    pro_p.join()

    q.put(None)
生产者消费者模型主进程发送结束信号
 1 #生产者消费者模型
 2 import time
 3 from multiprocessing import Process,Queue,JoinableQueue
 4 
 5 def producer(q):
 6     for i in range(1,11):
 7         time.sleep(0.5)
 8         print('生产了包子%s号' % i)
 9         q.put(i)
10     q.join()
11     print('在这里等你')
12 def consumer(q):
13     while 1:
14         time.sleep(1)
15         s = q.get()
16         print('消费者吃了%s包子' % s)
17         q.task_done()  #给q对象发送一个任务结束的信号
18 
19 if __name__ == '__main__':
20     #通过队列来模拟缓冲区,大小设置为20
21     q = JoinableQueue(20)
22     #生产者进程
23     pro_p = Process(target=producer,args=(q,))
24     pro_p.start()
25     #消费者进程
26     con_p = Process(target=consumer,args=(q,))
27     con_p.daemon = True #
28     con_p.start()
29     pro_p.join()
30     print('主进程结束')
JoinableQueue的生产者消费者模型

 

 

 

 


 

 


 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文标题为:python--(进程相关(一))

基础教程推荐