1010cc时时彩标准版 > 三分时时彩1010CC > 1010cc时时彩标准版:python自动化开发学习,并发

原标题:1010cc时时彩标准版:python自动化开发学习,并发

浏览次数:157 时间:2019-09-23

线程的其他艺术:from threading import Thread,current_thread:currrent_thread().getName() 获取线程的名称current_thread().ident 获获得线程的idcurrent_thread() 当前线程的消息import threading #导入threading模块才干运用上边包车型客车作用:threading.enumerate() 当前正值周转线程对象的列表threading.active_count() 活动的线程import timeimport threadingfrom threading importThread,current_threaddef f1:time.sleepprint('子线程名称',current_thread().getNameprint('%s号线程义务' %n)if __name__ == '__main__':t1 =Thread(target=f1,args=t1.start()print('主线程名称',current_thread().getName###MainThreadprint('主线程ID',current_thread##2260print(current_thread#现阶段线程<_MainThread(MainThread, started 2260)>print(threading.enumerate#[<_MainThread(MainThread, started 2260)>, <Thread(Thread-1, started 2520)>]近年来正值周转线程对象的列表print(threading.active_count# 活动的线程#2线程池fromconcurrent.futuresimport ThreadPoolExecutor,ProcessPoolExecutorp =ThreadPoolExecutor() 暗中同意的线程个数是cpu个数 *5p = ProcessPoolExecutor() 暗中认可的历程个数是cup个数p.map(f1,可迭代的目的) 异步推行deff1: print(n1,n2)p.submitres=p.submit(f1,11,12)异步提交职分,里面无敌传参,不过形参地点必需对应实参接收res.result() 和get方法一致,若无结果,会等待,阻塞程序 shutdown()close join锁定线程池,等待线程池中颇具曾经交付的天职工总会体施行完结import timefrom threading import current_threadfrom concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef f1:time.sleep# print('%s号子线程' %current_thread# printreturn if __name__ == '__main__':tp =ThreadPoolExecutor# tp = ProcessPoolExecutor# tp.map(f1,range#异步提交职分,参数一样是职务名称,可迭代对象res_list = []for i in range:res = tp.submit(f1,i,'藤椒')#submit是给线程池异步提交职务print#res.result()res_list.append# for r in res_list:#print(r.resulttp.shutdown()#主线程等待全部提交给线程池的职责,全体实行达成close joinfor r in res_list:print(r.resultprint队列先进先出queue.Queue() 先进先出队列q.qsize() 当前内容的尺寸q.full()查看队列是不是满了q.empty() 查看队列是还是不是空了q.put_nowait() 归入不等待,程序往下进行q.get_nowait() 获取不等待,程序往下进行import queue#先进先出队列q =queue.Queueq.putq.putprint('当前队列内容长度',q.qsize())q.putprint('查看队列是不是满了',q.full())try:q.put_nowait#报错queue.Fullexcept Exception:printprintprintprint('查看队列是不是为空',q.empty())printprint('查看队列是还是不是为空',q.emptytry:q.get_nowait()#queue.Emptyexcept Exception:print先进后出,可能后进先出,类似于栈fifo:firstinfirstoutqueue.LifoQueue() 先进后出后进先出队列import queueq = queue.LifoQueueq.putq.putq.putprintprintprint优先队列:queue.PriorityQueue()put的多少是多个元组,元组的第叁个参数是预先级数字,数字越下,优先级越高,越先被get到被去出来,第三个参数是put进去的值,吐过说优先级一样,那么直别忘了应该是同等的数据类型,字典不行 假使优先级数字同样,会比较第4个参数的成分的ascii表中的地方,假如数据类型不一致会报错. 假诺说值里面包车型大巴因素是数字类型,那么四个值的先行级一样偶尔候,相比较的是八个值得大小,小的事先被去出来.import queueq = queue.PriorityQueue# q.put()# q.put# q.put# q.put)q.put)#设若说值里面包车型地铁因素是数字类型,那么当三个值的刚开始阶段级一样期,比较的是八个值的分寸,小的先行被抽出来**#固然成分是字符串,那么依次比较每一种字母的ascii表中的地点,小的先行被抽取来# q.put((2,{'x':3}))# q.put)q.put())# q.put((2,'zalex'))# q.put((-1,'yubing'))# q.put((2,{'k':666})) #一经优先级数字一样,假诺数据类型差别会报错# q.put((2,)# q.put)# q.put)# q.put())print协程:轻量型线程. 生成器,Greenlet模块,Gevent模块协程:一个线程里面实现职责的出现:变成生成器,落成七个程序的来往切换,达成并发出成器出版协会程:import timedef f1():for i in range:time.sleepprint('f1>>',i)yielddef f2():g = f1()for i in range:time.sleepprint('f2>>', i)nextf1()f2()Greenlet模块如果导入模块报错,是从未有过加载第三方的模块,首先:在py解释器上找到Terminal输入:pipinstallgreenlet,要是提醒不是里面命令,找到python安装的岗位的不二秘诀,Scripts文件之中的pip,采取路线,增多到景况变量里面(小编的管理器--属性--高端系统设置--情况变量--Path新建增添),在应用管理员命令输入pipinstallgreenlet,最终重启解释器就完事啦,能够兴奋的使用.import timeimport greenletfrom greenlet import greenletdef f1:print('第贰回f1' s)g2.switchtime.sleepprint('第三次f1' s)g2.switch()def f2:print('第二遍f2' s)g1.switch()time.sleepprint('第壹遍f2' s)g1 = greenletg2 = greenletg1.switchGevent模块import geventfrom gevent import monkey;monkey.patch_all()import timeimport threadingdef f1():print# print(threading.current_thread().getName# gevent.sleeptime.sleepprintdef f2():# print(threading.current_thread().getNameprint# gevent.sleeptime.sleepprints = time.time()g1 = gevent.spawn #异步提交了f1任务g2 = gevent.spawn #异步提交了f2任务# g1.join()# g2.join()gevent.joinalle = time.time()print('施行时间:',e-s)print线程池的回调函数from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef f1:return n sdef f2:print('回调函数>>>',n.resultif __name__ == '__main__':tp = ThreadPoolExecutorres = tp.submit.add_done_callback# print(res.result线程的别的办法:from threading import Thread,current_thread:currrent_thread().getName() 获取线程的称呼current_thread().ident 得到到线程的idcurrent_thread() 当前线程的音讯import threading #导入threading模块手艺使用上边包车型地铁功能:threading.enumerate() 当前正在运作线程对象的列表threading.active_count() 活动的线程import timeimport threadingfrom threading importThread,current_threaddef f1:time.sleepprint('子线程名称',current_thread().getNameprint('%s号线程职责' %n)if __name__ == '__main__':t1 =Thread(target=f1,args=t1.start()print('主线程名称',current_thread().getName###MainThreadprint('主线程ID',current_thread##2260print(current_thread#现阶段线程<_MainThread(MainThread, started 2260)>print(threading.enumerate#[<_MainThread(MainThread, started 2260)>, <Thread(Thread-1, started 2520)>]眼下正在运作线程对象的列表print(threading.active_count# 活动的线程#2线程池fromconcurrent.futuresimport ThreadPoolExecutor,ProcessPoolExecutorp =ThreadPoolExecutor() 暗中认可的线程个数是cpu个数 *5p = ProcessPoolExecutor() 暗中认可的进程个数是cup个数p.map(f1,可迭代的靶子) 异步试行deff1: print(n1,n2)p.submitres=p.submit(f1,11,12)异步提交任务,里面无敌传参,不过形参地点必需对应实参接收res.result() 和get方法一样,若无结果,会等待,阻塞程序 shutdown()close join锁定线程池,等待线程池中具备曾经付出的天职工总会体施行完毕import timefrom threading import current_threadfrom concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef f1:time.sleep# print('%s号子线程' %current_thread# printreturn if __name__ == '__main__':tp =ThreadPoolExecutor# tp = ProcessPoolExecutor# tp.map(f1,range#异步提交职分,参数同样是职分名称,可迭代对象res_list = []for i in range:res = tp.submit(f1,i,'藤椒')#submit是给线程池异步提交职务print#res.result()res_list.append# for r in res_list:#print(r.resulttp.shutdown()#主线程等待全部提交给线程池的职责,全部实行达成close joinfor r in res_list:print(r.resultprint队列先进先出queue.Queue() 先进先出队列q.qsize() 当前内容的尺寸q.full()查看队列是不是满了q.empty() 查看队列是不是空了q.put_nowait() 放入不等待,程序往下举行q.get_nowait() 获取不等待,程序往下举行import queue#先进先出队列q =queue.Queueq.putq.putprint('当前队列内容长度',q.qsize())q.putprint('查看队列是或不是满了',q.full())try:q.put_nowait#报错queue.Fullexcept Exception:printprintprintprint('查看队列是不是为空',q.empty())printprint('查看队列是或不是为空',q.emptytry:q.get_nowait()#queue.Emptyexcept Exception:print先进后出,或许后进先出,类似于栈fifo:firstinfirstoutqueue.LifoQueue() 先进后出后进先出队列import queueq = queue.LifoQueueq.putq.putq.putprintprintprint优先队列:queue.PriorityQueue()put的数据是二个元组,元组的第一个参数是预先级数字,数字越下,优先级越高,越先被get到被去出来,第贰个参数是put进去的值,吐过说优先级一样,那么直别忘了应该是一模一样的数据类型,字典不行 即使优先级数字同样,会相比第二个参数的要素的ascii表中的地点,要是数据类型差别会报错. 假设说值里面包车型客车成分是数字类型,那么四个值的先行级相同期,相比较的是三个值得大小,小的优先被去出来.import queueq = queue.PriorityQueue# q.put()# q.put# q.put# q.put)q.put)#倘诺说值里面包车型大巴因素是数字类型,那么当多个值的先行级相同一时候,相比较的是四个值的深浅,小的事先被抽出来**#举例成分是字符串,那么依次比较各样字母的ascii表中的地方,小的事先被抽出来# q.put((2,{'x':3}))# q.put)q.put())# q.put((2,'zalex'))# q.put((-1,'yubing'))# q.put((2,{'k':666})) #若果优先级数字一样,若是数据类型差异会报错# q.put((2,)# q.put)# q.put)# q.put())print协程:轻量型线程. 生成器,Greenlet模块,Gevent模块协程:三个线程里面完结职务的产出:产生生成器,达成多少个程序的来回来去切换,达成并发出成器出版协会程:import timedef f1():for i in range:time.sleepprint('f1>>',i)yielddef f2():g = f1()for i in range:time.sleepprint('f2>>', i)nextf1()f2()Greenlet模块如酚酞入模块报错,是没有加载第三方的模块,首先:在py解释器上找到Terminal输入:pipinstallgreenlet,要是提示不是里面命令,找到python安装的岗位的门径,Scripts文件之中的pip,采纳路线,增加到遭遇变量里面(笔者的计算机--属性--高等系统设置--蒙受变量--Path新建增多),在行使管理员命令输入pipinstallgreenlet,最后重启解释器就造成啦,能够高兴的使用.import timeimport greenletfrom greenlet import greenletdef f1:print('第三遍f1' s)g2.switchtime.sleepprint('第一次f1' s)g2.switch()def f2:print('第二次f2' s)g1.switch()time.sleepprint('第二遍f2' s)g1 = greenletg2 = greenletg1.switchGevent模块import geventfrom gevent import monkey;monkey.patch_all()import timeimport threadingdef f1():print# print(threading.current_thread().getName# gevent.sleeptime.sleepprintdef f2():# print(threading.current_thread().getNameprint# gevent.sleeptime.sleepprints = time.time()g1 = gevent.spawn #异步提交了f1职分g2 = gevent.spawn #异步提交了f2职务# g1.join()# g2.join()gevent.joinalle = time.time()print('推行时间:',e-s)print线程池的回调函数from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef f1:return n sdef f2:print('回调函数>>>',n.resultif __name__ == '__main__':tp = ThreadPoolExecutorres = tp.submit.add_done_callback# print(res.result

python自动化开拓学习 进度, 线程, 协程

 

 

Python(八)进程、线程、协程篇,python

本章内容:

  • 线程(线程锁、threading.伊夫nt、queue 队列、生产者花费者模型、自定义线程池)
  • 经过(数据分享、进度池)
  • 协程

1010cc时时彩标准版 1

线程

Threading用于提供线程相关的操作。线程是应用程序中央银行事的矮小单元,它被含有在经过之中,是进程中的实际运维单位。一条线程指的是经过中三个纯粹顺序的调节流,三个经过中可以并发五个线程,每条线程并行施行区别的任务。

threading 模块建构在 _thread 模块之上。thread 模块以低级、原始的章程来拍卖和决定线程,而 threading 模块通过对 thread 举行二次封装,提供了更有利的 api 来拍卖线程。

import threading
import time

def worker(num):
    time.sleep(1)
    print(num)
    return

for i in range(10):
    t = threading.Thread(target=worker, args=(i,), name="t.%d" % i)
    t.start()

# 继承式调用

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):    #定义每个线程要运行的函数

        print("running on number:%s" %self.num)

        time.sleep(2)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

 

thread方法:

  • t.start() : 激活线程
  • t.getName() : 获取线程的名目
  • t.setName() : 设置线程的名称 
  • t.name : 获取或安装线程的名称
  • t.is_alive() : 判别线程是还是不是为激活状态
  • t.isAlive() :判别线程是不是为激活状态
  • t.setDaemon() 设置为后台线程或前台线程(私下认可:False);通过二个布尔值设置线程是不是为护理线程,必需在举办start()方法在此之前才得以应用。如若是后台线程,主线程推行进度中,后台线程也在举办,主线程实施完成后,后台线程不论成功与否,均结束;倘使是前台线程,主线程实行进程中,前台线程也在开展,主线程实践达成后,等待前台线程也实行到位后,程序停止
  • t.isDaemon() : 判别是还是不是为守护线程
  • t.ident :获取线程的标记符。线程标志符是一个非零整数,独有在调用了start()方法之后该属性才使得,不然它只回去None
  • t.join() :各种施行各类线程,实施完结后继续往下进行,该办法使得多线程变得肤浅
  • t.run() :线程被cpu调整后活动推行线程对象的run方法

 

 线程锁

threading.RLock & threading.Lock

我们选用线程对数据开展操作的时候,借使多少个线程同一时常间修改有些数据,恐怕会产出不足预期的结果,为了保险数据的准确性,引进了锁的定义。

import threading
import time

num = 0

lock = threading.RLock()    # 实例化锁类

def work():
    lock.acquire()  # 加锁
    global num
    num  = 1
    time.sleep(1)
    print(num)
    lock.release()  # 解锁

for i in range(10):
    t = threading.Thread(target=work)
    t.start()

  

threading.RLock和threading.Lock 的区别

景逸SUVLock允许在同一线程中被数十一次acquire。而Lock却不容许这种场所。 假使利用大切诺基Lock,那么acquire和release必需成对出现,即调用了n次acquire,必得调用n次的release能力真正释放所占据的锁。

import threading

lock = threading.Lock()
lock.acquire()
lock.acquire()  # 产生死锁
lock.release()
lock.release()

import threading

rlock = threading.RLock()
rlock.acquire()
rlock.acquire()      # 在同一线程内,程序不会堵塞。
rlock.release()
rlock.release()
print("end.")

  

threading.Event

Event是线程间通讯最间的编写制定之一:三个线程发送叁个event非数字信号,其余的线程则等待那个时限信号。用于主线程调节其余线程的进行。 伊芙nts 管理二个flag,那些flag能够应用set()设置成True恐怕利用clear()重新初始化为False,wait()则用于阻塞,在flag为True以前。flag私下认可为False。

  • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标志位被设为True或超时(假设提供了参数timeout)
  • Event.set() :将标记位设为Ture
  • Event.clear() : 将标记伴设为False
  • Event.isSet() :推断标记位是还是不是为Ture
import threading

def do(event):
    print('start')
    event.wait()
    print('execute')

event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

event_obj.clear()
inp = input('input:')
if inp == 'true':
    event_obj.set()

  当线程实行的时候,倘诺flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本土和长途的并发性。

 

threading.Condition

Python提供的Condition对象提供了对复杂线程同步难题的支撑。Condition被称作条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire八个标准化变量,然后决断一些规格。假若条件不满足则wait;假如基准满意,进行局地甩卖改造法规后,通过notify方法公告任何线程,别的处于wait状态的线程接到公告后会重新判断规范。不断的重新这一进度,从而化解复杂的同台难题。

在天下无双的规划风格里,利用condition变量用锁去通许访谈一些分享状态,线程在获取到它想获得的事态前,会屡屡调用wait()。修改情状的线程在她们状态退换时调用 notify() or notify_all(),用这种措施,线程会尽只怕的收获到想要的三个等待者状态。

import threading
import time

def consumer(cond):
    with cond:
        print("consumer before wait")
        cond.wait()
        print("consumer after wait")

def producer(cond):
    with cond:
        print("producer before notifyAll")
        cond.notifyAll()
        print("producer after notifyAll")

condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))

p = threading.Thread(name="p", target=producer, args=(condition,))

c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

# consumer()线程要等待producer()设置了Condition之后才能继续。

  

queue 队列

适用于二十多线程编制程序的先进先出数据结构,能够用来安全的传递三十二线程音讯。

queue 方法:

  • q = queue.Queue(maxsize=0) # 构造一个进步显出队列,maxsize钦命队列长度,为0 时,表示队列长度无界定。
  • q.join()   # 等到行列为kong的时候,在实践其余操作
  • q.qsize()   # 再次来到队列的大小 (不可靠赖)
  • q.empty()    # 当队列为空的时候,重回True 不然再次回到False (不可相信赖)
  • q.full()     # 当队列满的时候,重回True,不然重返False (不可信)
  • q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必得存在,可以参数block默感觉True,表示当队列满时,会等待队列给出可用地点,为False时为非阻塞,此时假使队列已满,会抓住queue.Full 卓殊。 可选参数timeout,表示 会阻塞设置的年华,过后,借使队列不恐怕给出归入item的职分,则抓住 queue.Full 分外
  • q.get(block=True, timeout=None) # 移除并回到队列底部的三个值,可选参数block默以为True,表示收获值的时候,就算队列为空,则阻塞,为False时,不打断,若此时队列为空,则吸引queue.Empty非凡。 可选参数timeout,表示会阻塞设置的时候,过后,假诺队列为空,则引发Empty万分。
  • q.put_nowait(item) # 等效于 put(item,block=False)
  • q.get_nowait()     # 等效于 get(item,block=False)

劳动者花费者模型

import queue
import threading

que = queue.Queue(10)

def s(i):
    que.put(i)
    # print("size:", que.qsize())

def x(i):
    g = que.get(i)
    print("get:", g)

for i in range(1, 13):
    t = threading.Thread(target=s, args=(i,))
    t.start()

for i in range(1, 11):
    t = threading.Thread(target=x, args=(i,))
    t.start()

print("size:", que.qsize())

# 输出结果:
get: 1
get: 2
get: 3
get: 4
get: 5
get: 6
get: 7
get: 8
get: 9
get: 10
size: 2

 

自定义线程池:

1010cc时时彩标准版 2# 自定义线程池(一) import queue import threading import time class TreadPool: def __init__(self, max_num=20): self.queue = queue.Queue(max_num) for i in range(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) def func(pool, n): time.sleep(1) print(n) pool.add_thread() p = TreadPool(10) for i in range(1, 100): thread = p.get_thread() t = thread(target=func, args=(p, i,)) t.start() 自定义线程池(一) 1010cc时时彩标准版 3# 线程池(二) import queue import threading import contextlib import time StopEvent = object() class Threadpool: def __init__(self, max_num=10): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] # 以创立线程列表 self.free_list = [] # 以创设的线程空闲列表 def run(self, func, args, callback=None): """ 线程池实施一个职务 :param func: 义务函数 :param args: 职分函数所需参数 :param callback: 职务实施倒闭或成功后施行的回调函数,回调函数有多少个参数1、职分函数执生势况;2、职分函数重返值(默感到None,即:不进行回调函数) :return: 若是线程池已经终止,则赶回True不然None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 成立两个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获得职务函数并实施职务函数 """ current_thread = threading.currentThread # 当前线程 self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) status = True except Exception as e: status = False result = e if callback is not None: try: callback(status, result) except Exception as e: pass if self.terminal: event = StopEvent else: with self.worker_state(self.free_list, current_thread): event = self.q.get() # self.free_list.append(current_thread) # event = self.q.get() # self.free_list.remove(current_thread) else: self.generate_list.remove(current_thread) def close(self): """ 推行完全数的天职后,全体线程甘休 """ num = len(self.generate_list) while num: self.q.put(Stop伊芙nt) num -= 1 def terminate(self): """ 无论是不是还大概有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() # 清空队列 @contextlib.contextmanager # with上下文管理 def worker_state(self, frelist, val): """ 用于记录线程中正在等候的线程数 """ frelist.append(val) try: yield finally: frelist.remove(val) def work(i): time.sleep(1) print(i) pool = Threadpool() for item in range(50): pool.run(func=work, args=(item,)) pool.close() # pool.terminate() 自定义线程池(二)

 

进程
# 进程
from multiprocessing import Process

def work(name):
    print("Hello, %s" % name)

if __name__ == "__main__":
    p = Process(target=work, args=("nick",))
    p.start()
    p.join()

  注意:由于经过之间的数量必要各自全部一份,所以创设进度须求的老大大的支出。

 

数码分享

今是昨非进度间内部存储器是不分享的,要想达成七个进度间的数据调换,可以用以下办法:

Shared memory

数量能够用Value或Array存储在一个分享内部存款和储蓄器地图里,如下:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])


# 输出:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

开创num和arr时,“d”和“i”参数由Array模块利用的typecodes创设:“d”表示多个双精度的浮点数,“i”表示二个有标识的大背头,那几个分享对象将被线程安全的拍卖。

1010cc时时彩标准版 4‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short    ‘H’: ctypes.c_ushort   ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint ‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double 类型对应表

from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])

def Foo(i):
    temp[i] = 100 i
    for item in temp:
        print i,'----->',item

for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()

Server process

由Manager()重回的manager提供list, dict, Namespace, Lock, RubiconLock, Semaphore, BoundedSemaphore, Condition, 伊夫nt, Barrier, Queue, Value and Array类型的支撑。

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

# 输出结果:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比 shared memory 更加灵敏,因为它能够帮衬猖狂的对象类型。其他,多个单身的manager能够经过进度在网络上分化的管理器之间分享,可是她比shared memory要慢。

# manage.dict()共享数据
from multiprocessing import Process,Manager

manage = Manager()
dic = manage.dict()

def Foo(i):
    dic[i] = 100 i
    print dic.values()

for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()

当创立进度时(非使用时),分享数据会被获得子进度中,当进度中实行完结后,再赋值给原值。

1010cc时时彩标准版 5#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Array, RLock def Foo(lock,temp,i): """ 将第0个数加100 """ lock.acquire() temp[0] = 100 i for item in temp: print i,'----->',item lock.release() lock = RLock() temp = Array('i', [11, 22, 33, 44]) for i in range(20): p = Process(target=Foo,args=(lock,temp,i,)) p.start() 进度锁实例

 

1. 线程的其他艺术

前言

  在过去单核CPU也得以实施多职分,操作系统轮流让各样任务交替执行,义务1实行0.01秒,切换职分2,职务2举办0.01秒,在切换成职分3,那样反复实施下去,表面上看每一种职责都是轮流施行的,可是出于CPU速度太快,让大家认为全体任务是在同时实行同一。真正的并行实践多义务只可以在多核CPU上,可是由于任务数量远远多于CPU的着力数据,所以,操作系统也会活动把多任务轮流动调查整到各个中央上运维。

  多职分的兑现存两种办法:

  1. 多进度情势
  2. 二十四线程格局
  3. 多进度 三十二线程情势

   要促成多职分,一般会计统计一筹划成Master-Worker方式,Master担当分配任务,Worker负担实行任务,由此,多职务情状下,平时是二个Master四个Worker。假使要用八线程完毕,就是主线程是Master,别的线程是Worker;假若要用多进度完毕,正是主进程是Master,其余进度时Worker。

  多进度的优势:稳固性高,因为多个子经过挂了,不会影响主进度和别的子进度(假诺主进度挂了就都挂了,然则Master一般只担负分配职责,挂掉的可能率低),有名的Apache最初已是用多进度形式。

  多进程的老毛病:成立进程的代价太大,在Unix/Linux系统下,用fork调用幸而,在windows下开垦则极度高大,别的操作系统能同期运行的经过数有限,在内部存款和储蓄器和CPU的范围下,假使有几千个经过同期运营,操作系统连调治都会成为难题。

  多线程的优势:八线程情势平时比多进程快些,但也块不到哪去,在windows下,八线程的作用比多进程高,所以微软的IIS服务器暗中认可使用二十多线程格局。

  四线程的劣势:致命鲜明是其他一个线程挂掉可能间接变成整个进程崩溃,因为多线程会分享进程的内部存款和储蓄器,由于三十二线程存在牢固性难点,所以IIS的安定不比Apache。

算算密集型 VS IO密集型

  总计密集型:职分特点是要开展大气的预计,消耗CPU财富,譬如总结圆周率,对录像举办高清解码等等,全体依赖CPU运算本事,这种总计密集型职务即便也得以用多任务完毕,不过职责越来越多,花在切花任务的岁月就越多,CPU试行职务的频率就越低,所以,要快速的选拔CPU,总括密集型职分同有时候举办的数码应当等于CPU的为主数。

  IO密集型:涉及到网路,磁盘IO的天职都是IO密集型任务,那类使命的风味是CPU消耗很少,任务的繁多小时都是在守候IO操作实现(因为IO的快慢远远小于CPU和内部存款和储蓄器的快慢)。对于IO密集型职责,职务更加的多,CPU效能越高,但也可以有个限度。常见的大部职责都是IO密集型任务,举个例子WEB应用。

说起底总括:

  • IO密集型任务,用多线程
  • 计量密集型职分,用多进度

 


 

十三、并发编制程序之二十二十四线程

进程池

进程池内部维护七个进度类别,当使用时,则去进度池中赢得三个进度,纵然经过池连串中尚无可供使用的进进度,那么程序就能够等待,直到进度池中有可用进程甘休。

方法:

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果回到前会一向不通,由于那个原因,apply_async()更符合出现实行,其他,func函数仅被pool中的二个进度运转。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的二个变体,会回来多少个结出对象。假如callback被钦定,那么callback能够接收贰个参数然后被调用,当结果准备好回调时会调用callback,调用退步时,则用error_callback替换callback。 Callbacks应被随即成功,不然处理结果的线程会被堵塞。

  • close() : 阻止更加多的天职交给到pool,待职责成功后,职业进度会退出。

  • terminate() : 不管职分是不是到位,立刻停止职业进度。在对pool对象进度垃圾回收的时候,会应声调用terminate()。

  • join() : wait专门的学问线程的脱离,在调用join()前,必得调用close() or terminate()。那样是因为被终止的进度供给被父进度调用wait(join等价与wait),不然进程会产生活死人进度

进程池中有五个法子:

  • apply
  • apply_async
from multiprocessing import Pool
import time
def myFun(i):
    time.sleep(2)
    return i 100

def end_call(arg):
    print("end_call",arg)

p = Pool(5)

# print(p.map(myFun,range(10)))

for i in range(10):
    p.apply_async(func=myFun,args=(i,),callback=end_call)

print("end")
p.close()
p.join()

1010cc时时彩标准版 6from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': # 创建4个进程 with Pool(processes=4) as pool: # 打印 "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # 使用大肆顺序输出同样的数字, for i in pool.imap_unordered(f, range(10)): print(i) # 异步实践"f(20)" res = pool.apply_async(f, (20,)) # 只运维三个经过 print(res.get(timeout=1)) # 输出 "400" # 异步试行 "os.getpid()" res = pool.apply_async(os.getpid, ()) # 只运维多少个历程 print(res.get(timeout=1)) # 输出进度的 PID # 运转多少个异步奉行大概会动用多少个进度 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # 是二个经过睡10秒 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("开采二个multiprocessing.TimeoutError非凡") print("近期,池中还也可以有其余的做事") # 退出with块中已经终止的池 print("Now the pool is closed and no longer available") 官方示例

 

协程

  协程又叫微线程,从本事的角度来说,“协程正是您可以暂停实践的函数”。假诺你把它了解成“就如生成器一样”,那么您就想对了。 线程和进程的操作是由程序触发系统接口,最终的实践者是系统;协程的操作则是程序猿。

  协程存在的含义:对于二十多线程应用,CPU通过切成片的方法来切换线程间的实施,线程切换时索要耗费时间(保存景况,下一次三番五次)。协程,则只行使一个线程,在三个线程中分明有些代码块施行顺序。

  协程的适用场景:当程序中留存大气无需CPU的操作时(IO),适用于协程。

# 安装
pip install gevent

# 导入模块
import gevent

 

greenlet

# greenlet
from greenlet import greenlet

def test1():
    print(11)
    gr2.switch()
    print(22)
    gr2.switch()

def test2():
    print(33)
    gr1.switch()
    print(44)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

# 输出结果:
11
33
22
44

  

gevent

# gevent
import gevent

def foo():
    print("Running in foo")
    gevent.sleep(0)
    print("Explicit context switch to foo angin")

def bar():
    print("Explicit context to bar")
    gevent.sleep(0)
    print("Implicit context swich back to bar")

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

# 输出结果:
Running in foo
Explicit context to bar
Explicit context switch to foo angin
Implicit context swich back to bar

 

1010cc时时彩标准版 7# 蒙受IO自动切换 from gevent import monkey monkey.patch_all() import gevent import requests def f(url): print("FET: %s" % url) resp = requests.get(url) data = len(resp.text) print(url, data) gevent.joinall([ gevent.spawn(f, ''), gevent.spawn(f, ''), gevent.spawn(f, ''), ]) 境遇IO操作自动切换

 

本章内容: 线程(线程锁、threading.Event、queue 队列、 生产者成本者模型、自定义线程池 ) 进程(...

import threadingimport timefrom threading import Thread,current_threaddef f1:    time.sleep(1)    print('子线程名称', current_thread().getName # 获取线程名    print('%s号线程任务'%n)if __name__ == '__main__':    t1 = Thread(target=f1,args=(1,))    t1.start()    print('主线程名称',current_thread().getName # 获取线程名    print('主线程ID',current_thread # 获取线程id    print(current_thread # 当前线程对象    print(threading.enumerate # 当前正在运行的线程对象的一个列表 [<_MainThread(MainThread, started 6708)>, <Thread(Thread-1, started 7848)>]    print(threading.active_count # 当前正在运行的线程数

线程

  线程是一点都不大的推行单元,而经过由最少二个线程组成。Python的标准库提供了四个模块,_thread和threading, _thread是起码模块,threading是高级模块,内部对_thread进行了打包。

大多数动静下,大家只须要动用threading那个高等模块。

  创设线程有三种格局(其实本质是平等的):

  1. 最常用的主意

1010cc时时彩标准版 8 最常用简易的创设线程

#运行三个线程就是把一个函数字传送入并创立Thread实例,然后调用start()方法,初阶试行。
import threading
def f1(arg):
print(arg)

t = threading.Thread(target=func, args=(123,)) # 创造线程。Thread是个类
t.start() # 运转线程。start()会调用了run()方法实践了f1函数。
# 输出结果

最常用简短的创始线程

1010cc时时彩标准版 9 压实形象版创立线程

import time, threading

def func(arg):
print("线程%s 正在运作中..." % threading.current_thread().name)
time.sleep(1)
print("%s>>>%s" % (threading.current_thread().name, arg))
time.sleep(1)
print("线程%s 关闭" % threading.current_thread().name)

print("线程%s 正在运维中..." % threading.current_thread().name)
t = threading.Thread(target=func, args=('hello world',)) # 创造三个子线程
t.start() # 运行线程
t.join() # 等待全数子线程施行完职分后,在向下运作程序
print("线程%s 关闭" % threading.current_thread().name)
# 输出结果:
线程MainThread 正在周转中...
线程Thread-1 正在运维中...
Thread-1>>>hello world
线程Thread-1 关闭
线程MainThread 关闭
"""
由于其他进度暗许就能够运维多个线程,大家誉为主线程,主线程又有啥不可运维新的线程,Python的threading 模块
有个 current_thread()函数,它长久重临当前线程的实例。主线程实例的名字叫MainThread,子线程的名字可以在
创建时钦点,即 t = threading.Thread(target=func, args=('hello world',), name='childthread')中的name字段,
名字只是是在打字与印刷时用来彰显的,未有另外意义,假若不点名名字,python就能自行给线程命名称叫Thread-1,Thread-2...
"""

提升形象版创制线程

  2. 自定义七个类,承袭threading.Thread,並且自身重写一个run方法

1010cc时时彩标准版 10 自定义类创立线程

class MyThread(threading.Thread): # 自定义一个MyThread类,承袭了父类threading.Thread
def __init__(self, func, args):
self.func = func # 函数
self.args = args # 函数参数
super(MyThread, self).__init__() # 实行了父类的构造方法
def run(self): # 自定义多个run方法
self.func(self.args)

def f2(arg):
print(arg)

obj = MyThread(f2, 123) # 创造贰个目的,传入f2函数,和参数
obj.start() # start会自动调用MyThread类中的run方法
# 输出结果

自定义类创制线程


 

2. 线程队列

 锁

    二十四线程和多进程最大的不如在于,多进度中,同一个变量各自有一份拷贝寄存在每个进度中.相互不影响,而八线程中,全体变量都由拥有线程分享,所以任何二个变量都得以被其余三个线程修改,从而形成了线程之间分享数据最大的危殆在于三十二线程同期修改一个变量时,出现谬误。故引进了线程锁的概念,当三个线程同有时候实行acquire()时,只有多少个线程能成功的获取锁,然后继续实践代码,其余线程只可以等待直到获得锁停止。获得锁的线程用完后自然要释放锁,不然正在等待锁的经过将永恒等待下去,进而成为了死线程。所以提出选用try...finally语句来确定保证锁一定会被假释。锁的益处是有限支撑了数量的正确性,坏处在于阻止了四线程并发推行,包罗锁的代码只好以单线程形式运维,成效就大大减少了。其次,由于能够存在三个锁,分歧的线程持有不相同的锁,并准备拿走对方全体的锁,大概会招致死锁,导致四个线程全体挂起,既不实践,也力不从心计数,只可以靠操作系统强制结束。

  GIL 全局锁

  Python的线程尽管是实在的线程,但是解释器在实施代码时,有一个GIL锁(Global Interpreter Lock),任何Python线程推行前,必需先获得GIL锁,然后每奉行100条字节码,解释器就能自动释放GIL锁,让别的线程有机缘执行。这些GIL锁实际上把装有线程的实行代码都给上了锁,所以八线程在Python中只可以交替推行,就算九二十一个线程跑在100核CPU上,也只可以采纳到1核。GIL是历史遗留为,在Python中大家得以选取八线程,可是并不是期望有效行使多核,假使须求求通过二十八线程利用多核,这只好通过C扩展来促成,也就那样就失去了Python轻巧易用的特征,就算不可能使用八线程达成多核职责,可是能够通过多进度达成多核任务,多个Python进程各自有相互独立的GIL锁,互不影响。

   (1)线程锁   Lock RLock

是因为线程之间是开展自由调治,何况各类线程大概只举办n条试行之后,当多个线程同不时候修改同八个条数据时可能会现出脏数据,所以,出现了线程锁:同临时刻同意一个线程推行操作。 

threading.Lock() # 单锁

threading.RLock() # 多个锁,帮助二个锁嵌套三个锁,一般相当多应用,辅助Lock

acquire() # 加锁

release() # 解锁

1010cc时时彩标准版 11 线程锁示例

import threading
import time
NUM = 10
def func(arg):
global NUM
arg.acquire() # 上锁
NUM -= 1
time.sleep(1)
print('当前线程为:%s, NUM值为:%s' % (threading.current_thread().name, NUM))
arg.release() # 解锁

lock = threading.RLock() # 定义锁
for i in range(10):
t = threading.Thread(target=func, args=(lock,))
t.start()

# 输出结果
当下线程为:Thread-1, NUM值为:9
此时此刻线程为:Thread-2, NUM值为:8
近些日子线程为:Thread-3, NUM值为:7
这段时间线程为:Thread-4, NUM值为:6
脚下线程为:Thread-5, NUM值为:5
此时此刻线程为:Thread-6, NUM值为:4
日前线程为:Thread-7, NUM值为:3
眼前线程为:Thread-8, NUM值为:2
脚下线程为:Thread-9, NUM值为:1
现阶段线程为:Thread-10, NUM值为:0

线程锁示例

  (2)信号量 Semaphore

互斥锁(线程锁)相同的时间只同意一个线程改换数据,而Semaphore是同不时间允许一定数量的线程更换数据,比方饭铺有3个席位,那最多允许3个人同期用餐,后面包车型大巴人只可以等待空位才得以进食。

1010cc时时彩标准版 12 实信号量示例

import threading, time

NUM = 10
def func(l):
global NUM
l.acquire() # 上锁
NUM -= 1
time.sleep(2)
print("当前线程为:%s,NUM值为:%s" % (threading.current_thread().name, NUM))
l.release() # 解锁

lock = threading.BoundedSemaphore(5) # 允许5个线程同不时间操作

for i in range(10):
t = threading.Thread(target=func, args=(lock,))
t.start()

# 输出结果
眼前线程为:Thread-3,NUM值为:5
脚下线程为:Thread-1,NUM值为:5
现阶段线程为:Thread-2,NUM值为:5
眼下线程为:Thread-5,NUM值为:2
当下线程为:Thread-4,NUM值为:2
时下线程为:Thread-8,NUM值为:0
现阶段线程为:Thread-6,NUM值为:0
眼下线程为:Thread-7,NUM值为:0
当下线程为:Thread-10,NUM值为:0
时下线程为:Thread-9,NUM值为:0

信号量示例

  (3)事件  event

  Python线程的日子用来主线程调节别的线程的执行,事件主要提供了多个点子: set, wait, clear.

事件处理的编写制定:

  • wait : 全局定义了二个“Flag”,假诺"Flag"为False, 那么当程序实行event.wait 方法时就能够堵塞,借使Flag为True,那么event.wait就不在阻塞。
  • clear : 将Flag 设置为False
  • set : 将Flag 设置为True

1010cc时时彩标准版 13 事件示例

import threading

def func(i,e):
print(i)
e.wait() # 表示等待检查实验,假设是红灯则停,绿灯则一而再运转
print(i 100)

event = threading.Event()

for i in range(10):
t = threading.Thread(target=func, args=(i, event,))
t.start()

event.clear() # 设置为红灯
inp = input('>>>')
if inp == "1": # 客户输入1
event.set() # 设置为堵塞同行

# 输出结果
1
3
5
7
9
>>>1
102
103
104
108
107
# 冬季是因为线程的调解时间不均等

事件示例

  (4)条件  Condition

 使线程等待,唯有满意某条件时,才放走N个线程。

1010cc时时彩标准版 14 条件示例

import threading

def func(i, con):
print(threading.current_thread().name) # 输出线程名字

con.acquire() # 上锁
con.wait() # 阻塞等待
print(i 100)
con.release() # 解锁

if __name__ == '__main__':
c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func, args=(i, c,))
t.start()

while True:
inp = input('>>>')
if inp == 'q':
break
c.acquire()
c.notify(int(inp)) # 当满足条件时,释放N个线程
c.release()

# 输出结果

Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
Thread-10
>>>1 # 释放二个线程
>>>2 # 释放八个线程
102
>>>6 # 释放七个线程
105
103
108
>>>4 # 释放多个线程,不过一共唯有13个,最终只剩余1个在堵塞了
>>>q # 结束循环
Process finished with exit code 0

基准示例

1010cc时时彩标准版 15 示例二 

import threading
def condition_func():

ret = False
inp = input('>>>')
if inp == '1':
ret = True

return ret

def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" % n)
con.release()

if __name__ == '__main__':

con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()

# 输出结果
>>>1
run the thread: 0
>>>1
run the thread: 1
>>>1
run the thread: 2
>>>1
run the thread: 3
>>>1
run the thread: 4
>>>1
run the thread: 5
>>>1
run the thread: 6
>>>1
run the thread: 7
>>>1
run the thread: 8
>>>1
run the thread: 9

示例二

  (5)定时器 Timer

停车计时器,内定n秒后试行某操作。

1010cc时时彩标准版 16 计时器示例

from threading import Timer

def hello():
print("hello world")

t = Timer(1, hello)
t.start() # 1秒后,将执行hello函数

# 输出结果
hello wolrd

停车计时器示例

 

线程池

  在动用二十四线程管理任务时亦非线程越多越好,在切换线程的时候,须求切换上下文情形,还是会导致CPU的多量费用。为了缓和那一个难点,线程池应时而生。预先创造号三个相比优化的数量的线程,让过来的天职立刻能够使用,就产生了线程池。在Python中从不放置较好的线程池模块,须要和煦编辑只怕应用第三方模块。

1010cc时时彩标准版 17 容易的线程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang

import queue
import time
import threading

class MyThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)

def get_thread(self):
return self._q.get()

def add_thread(self):
self._q.put(threading.Thread)

def task(i, pool):
print(i)
time.sleep(1)
pool.add_thread()

pool = MyThreadPool(5)
for i in range(100):
t = pool.get_thread()
obj = t(target=task, args=(i,pool))
obj.start()

简短的线程池

1010cc时时彩标准版 18 优化的线程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object() # 成立空对象

class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = []

def run(self, func, args, callback=None):
"""
线程池实行二个职责
:param func: 任务函数
:param args: 职责函数所需参数
:param callback: 职责奉行倒闭或成功后施行的回调函数,回调函数有八个参数1、职分函数执市场价格况;2、任务函数再次来到值(默感觉None,即:不实行回调函数)
:return: 假使线程池已经告一段落,则赶回True不然None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)

def generate_thread(self):
"""
创设一个线程
"""
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
巡回去获取职务函数并进行职责函数
"""
current_thread = threading.currentThread
self.generate_list.append(current_thread)

event = self.q.get()
while event != StopEvent:

func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None

if callback is not None:
try:
callback(success, result)
except Exception as e:
pass

with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:

self.generate_list.remove(current_thread)

def close(self):
"""
进行完全体的职分后,全数线程甘休
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是是不是还会有任务,终止线程
"""
self.terminal = True

while self.generate_list:
self.q.put(StopEvent)

self.q.empty()

@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用以记录线程中正在等候的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)

# How to use
pool = ThreadPool(5)

def callback(status, result):
# status, execute action status
# result, execute action return value
pass

def action(i):
print(i)

for i in range(30):
ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()

优化的线程池

 

理论:

线程队列中三种队列情势,所运用的章程同样,都有put,put_nowait(),get_nowait,full 等那一个方法.就只传一组先进先出的代码

队列

  队列都是在经过的内部存款和储蓄器中存放的,如若经过甘休了,队列也就消失了。

队列的归类:

  1. 先进先出 : queue.Queue
  2. 后进先出 : queue.LifoQueue
  3. 权重队列 : queue.PriorityQueue
  4. 双向队列 : queue.deque

(1)先进先出队列

  故名思议,正是先放入队列的数额,会被事先抽出。

1
2
3
4
5
6
7
8
9
10
11
# 队列中的主要函数说明
# put(item,block=True,timeout=None) 放数据,默认阻塞,设置条件:是否阻塞,超时时间
# get(block=True,timeout=None)取数据,默认阻塞,设置条件:是否阻塞,超时时间
# put_nowait(item)  相当于put(item,False)
# get_nowait()  相当于get(False)
# qsize() 队列中的真实个数,返回队列的大小
# full()  如果队列满了,返回True,反之False
# maxsize 最大支持的个数,与full()的大小相对应
# empty() 检查队列是否为空,空则输出True,否则为False
# join()  阻塞,等到队列为空,再执行别的操作
# task_down()  在完成一项任务后,向任务已完成队列发送一个信号  

1010cc时时彩标准版 19 先进先出队列示例

import queue
q = queue.Queue(5)
q.put(1)
q.put(2)
print("抽取数据:%s" % q.get())
q.task_done()
print("抽取数据:%s" % q.get())
q.task_done()
q.join()
print("检查队列是或不是为空:%s" % q.empty())
print("队列中的个数:%s" % q.qsize())
print("队列中得以存放的最大个数:%s" % q.maxsize)

# 输出结果
收取数据:1
抽取数据:2
检查队列是或不是为空:True
队列中的个数:0
队列中得以寄放的最大个数:5

先进先出队列示例

(2)后进先出队列

1010cc时时彩标准版 20 后进先骑行列示例

import queue

q = queue.LifoQueue()
q.put(1)
q.put(2)
print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join() # 要是不使用join(),则不必要利用task_done()

# 输出结果
1

后进先出队列示例

(3)权重队列

   每一个放入队列的内容都有三个优先级,根据优先级提取,权重值越小优先级越高,权重可认为负数。

1010cc时时彩标准版 21 权重队列示例

import queue

q = queue.PriorityQueue()
q.put((1, 'test1'))
q.put((0, 'test2'))
q.put((-1, 'test3'))
q.put((6, 'test4'))
q.put((2, 'test5'))
print(q.get())

# 输出结果
(-1, ‘test3’)
# 假如有优先级相等的,则依据放入顺序收取,先放先出在一样优先级的景况下

权重队列示例

(4)双向队列

1
2
3
4
append() 是向队列最右端添加
appendleft() 是向队列最左端添加
pop()是从队列右向左取值
popleft()是从队列左向右取值

1010cc时时彩标准版 22 双向队列示例

import queue
q = queue.deque()
q.append(1)
q.append(2)
q.appendleft(3)
q.pop() # 取骑行列中的一个
q.popleft() # 从队列的左边收取三个
# 输出结果
3
# 解释:
队列内容= 3 1 2

双向队列示例

1010cc时时彩标准版 23 轻易的生产花费者模型示例

# 举个例子包子铺形式
import queue
import threading
import time

q = queue.Queue()

def productor(arg):
"""
大师傅制作的包子
"""
while True:
q.put(str(arg) '厨神做的包子')

def consumer(arg):
"""
买主消耗包子
"""
while True:
print(arg, q.get())
time.sleep(2)

# 300用户循环消费包子
for i in range(300):
t = threading.Thread(target=consumer, args=(i,))
t.start()

# 3个出师循环创立包子
for j in range(3):
t = threading.Thread(target=productor, args=(j,))
t.start()

# 输出结果
0大厨做的馒头
1010cc时时彩标准版,2厨子做的馒头
1厨师做的馒头
0厨子做的包子
2厨师做的包子
1厨师做的包子
0大厨做的包子
2厨神做的馒头
2厨子做的馒头
0大厨做的馒头
1厨师做的馒头

粗略的生产开支者模型示例

 


 

连接:

import queue

进程

  Unix/Liunx系统提供了三个fork()系统调用,它那一个的卓殊,普通的函数调用,调用一遍,重返三回,但是fork()调用叁次回到四遍,因为操作系统自动把当下进程(称为父进度)复制了一份(称为子进度),

然后分别在父进程和子进度内重返。子进度永久再次回到0,而父进度再次来到子进度的ID。那样做的理由是:贰个父进程能够fork出多个子进度,所以,父进程要记录下种种子进程的ID,而子进度只须求调用getppid()

就能够得到父进度的ID。Python的os模块封装了大面积的系统调用,当中蕴涵fork,可以在Python程序中轻轻便松创设子进程。

  每种进程至少有一个线程,线程是非常的小的实行单元。

1010cc时时彩标准版 24 简单的进度示例

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
# 输出结果
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

简言之的长河示例

鉴于windows未有fork调用,所以地点代码无法在windows上运维。有了fork调用后,叁个历程在接到新任务时就能够复制出三个子进程来拍卖新职分,常见的Apache正是由父进度监听端口,每当有新的http诉求后,

就fork出三个子经过来拍卖新的http供给。由于Python是跨平台的,自然也会有三个跨平台的多进度协理,multiprocessing模块就出现了。multiprocessing模块提供了一个Process类来表示叁个经过对象。

1010cc时时彩标准版 25 示例一

from multiprocessing import Process
import os

def run_proc(name):
print("Run child process %s (%s)" % (name, os.getpid()))

if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',)) # 创立子进程时,只须要出入二个施行函数和函数的参数,创立一个Process实例,运行即可。
print("Child process will start")
p.start() # 运维进度
p.join() # 等待子进度甘休后在持续向下运转,平时用于进度间的共同
print("Child process end.")

# 输出结果
Parent process 1232.
Child process will start
Run child process test (7400)
Child process end..

示例一

1010cc时时彩标准版 26 示例二

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i, arg):
arg.put(i)
print('say hi', i, arg.qsize())

if __name__ == '__main__':
li = queues.Queue(20, ctx=multiprocessing)
for i in range(10):
p = Process(target=foo, args=(i, li,))
p.start()

# 输出结果
say hi 0 2
say hi 1 2
say hi 2 4
say hi 3 4
say hi 4 5
say hi 5 7
say hi 6 8
say hi 7 8
say hi 8 9
say hi 9 10

示例二

  注意:由于经过之间的数据需求各自具有一份,所以制造进度须要充足大的支出。

  进度池:假设供给运转大气的子进度,可以经过进度池的办法批量创立子进程。

1010cc时时彩标准版 27 进度池示例

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('运转子进度 %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print("子进程%s运行%0.2f 秒." % (name, (end - start)))

if __name__ == '__main__':
print("父进程%s." % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print("等待全部子进度结束中。。。")
p.close()
p.join() # 等待全数子进程甘休后才继续向下运营
print("全部子进程早就终结")
"""
调用join()在此之前必得先调用close(),调用close()之后,就无法再三再四增多新的Process了。
请留心输出结果,子进度0,1,2,3是随即实践的,而子进度4要等待日前某块头进程达成后才
施行,那是因为Pool的进度数设置为了4,约等于最多并且执行4个进程。由于Pool的暗许大小是
CPU的核数,如若您是8核CPU,要高达上述意义,你必要至少运维9个子进度。
"""

# 输出结果
父进程10740.
等候全体子进程甘休中。。。
运维子进度 0 (10256)...
运行子进度 1 (1172)...
运维子进程 2 (10612)...
运转子进度 3 (10292)...
子进程1运行0.11 秒.
运营子进程 4 (1172)...
子进程2运行1.09 秒.
子进程4运行1.38 秒.
子进程0运行1.79 秒.
子进程3运行2.99 秒.
全体子进度已经完成

经过池示例

  子进度:比很多时候,子进程并非自个儿,而是三个外表进程,大家创设了子进度之后,还亟需调整子进度的输入和输出。subprocess模块能够让大家非常平价地运转三个子历程,然后决定其输入和输出。

  进度间通讯是由此Queue,Pipes等完成的。

  进程数据分享:私下认可进度之间不恐怕共享数据,我们能够通过二种特殊的艺术让进度间分享数据。

(1)方法一:Array

1010cc时时彩标准版 28 Array示例

from multiprocessing import Process
from multiprocessing import Array

def foo(i, arg):
arg[i] = i 100
for item in arg:
print(i,"--->", item)
print('==========')

if __name__ == '__main__':
li = Array('i', 4)
for i in range(4):
p = Process(target=foo, args=(i, li, ))
p.start()

# 输出结果
---> 100
---> 0
---> 0

1、线程

先进先出队列:queue.Queue

---> 0

---> 100
---> 0
---> 102

1.1 什么是线程

    进度只是用来把财富聚焦到三只(进度只是二个财富单位,也许说财富聚焦),而线程才是cpu上的施行单位。

二十三十二线程(即多少个调整线程)的概念是,在三个进度中存在多少个调整线程,八个调整线程分享该进程的地点空间,相当于一个车间内有多条流水生产线,都共用二个车间的能源。

q = queue.Queueq.put(1)q.put(2)print('当前队列内容长度',q.qsizeq.put(3)print('查看队列是否满了',q.fulltry:    q.put_nowait(4)  except Exception:    print('队列满了')printprintprintprint('查看队列是否为空',q.emptytry:    q.get_nowait()  except Exception:    print('队列空了')

---> 0

---> 100
---> 101
---> 102

1.2 特点

#1 同一进度内的多少个线程是分享该进度的财富

#2 成立新的线程开支要远远低于开启新的经过

先进后出 / 后进先出队列:queue.LifoQueue

---> 0

---> 100
---> 101
---> 102

1.3 开启线程的章程

先行级队列:

---> 103

Array示例

 对于Array数组来讲,括号内的"i"表示它在那之中的要素都以int型,代表的是项目。列表内的元素得以预先钦命,也足以钦定列表的长短。总来讲之,Array在实例化时务必钦定数组的数据类型户数组的轻重缓急。

1
2
3
4
5
6
7
类型对应表
'c': ctypes.c_char,  'u': ctypes.c_wchar,
'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int,   'I': ctypes.c_uint,
'l': ctypes.c_long,  'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

(2)方法二: manage.dict()

1010cc时时彩标准版 29 manage.dict()示例

from multiprocessing import Process
from multiprocessing import Manager

def Foo(i, dic):
dic[i] = 100 i
print(dic.values())

if __name__ == '__main__':
manage = Manager()
dic = manage.dict()
for i in range(10):
p = Process(target=Foo, args=(i,dic))
p.start()
p.join()

# 输出结果
[100]
[100, 101]
[100, 101, 102]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
[100, 101, 102, 103, 104, 105]
[100, 101, 102, 103, 104, 105, 106]
[100, 101, 102, 103, 104, 105, 106, 107]
[100, 101, 102, 103, 104, 105, 106, 107, 108]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

manage.dict()示例

(3)方法三:queues.Queue

1010cc时时彩标准版 30 queue.Queue示例

import multiprocessing
from multiprocessing import Process
from multiprocessing import queues

def foo(i,arg):
arg.put(i)
print('The Process is ', i, "and the queue's size is ", arg.qsize())

if __name__ == "__main__":
li = queues.Queue(20, ctx=multiprocessing)
for i in range(10):
p = Process(target=foo, args=(i,li,))
p.start()
# 输出结果
The Process is 0 and the queue's size is 2
The Process is 1 and the queue's size is 3
The Process is 2 and the queue's size is 3
The Process is 3 and the queue's size is 4
The Process is 4 and the queue's size is 6
The Process is 5 and the queue's size is 6
The Process is 6 and the queue's size is 8
The Process is 7 and the queue's size is 8
The Process is 8 and the queue's size is 9
The Process is 9 and the queue's size is 10

queue.Queue示例

 

进程锁

  和三十二线程的线程锁类似,在multiprocessing里也可以有同名的锁类,Lock, PRADOLock, Event, Condition, Semaphore, 用法也是平等的。

1010cc时时彩标准版 31 进程锁示例

from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time

def foo(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi',lis[0])
lc.release()

if __name__ == "__main__":
# li = []
li = Array('i', 1)
li[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=foo,args=(i,li,lock))
p.start()
# 输出结果
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0

进度锁示例

 

进程池

  python内置了一个进度池,无需像线程池那样需求自定义,只要加载就可以:from multiprocessing import Pool.

进度池内部维护了贰个进程体系,当使用时,则去进度池中得到二个经过,假使经过池种类中并未有可供使用的进度,那么程序就能等待,直到进程池中有可用进度甘休。

进度池中主要方法:

  • apply     从进程池里取二个经过并进行
  • apply_async   apply的异步版本
  • terminate   立即关闭进度池
  • join   主进度等待全数子进度实践到位,必得在close或terminate之后
  • close  等待全部进度截止后,才关闭进度池

1010cc时时彩标准版 32 进度池示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import time

def f1(args):
time.sleep(1)
print(args)

if __name__ == '__main__':
p = Pool(5)
for i in range(30):
p.apply_async(func=f1, args=(i,))
p.close() # 等子进度实践完成后关闭线程池
# time.sleep(2)
# p.terminate() # 立时关闭线程池
p.join() # 进程池中经超过实际践完结后在关闭,假如注释的话,那么程序直接关门

# 输出结果
1
3
...
29

经过池示例


 

1.3.1 方法一

线程:

from threading import Thread
from multiprocessing import Process

def work(n):
    print('%s is running' %n)

if __name__ == '__main__':
    t=Thread(target=work,args=(1,))
    # t=Process(target=work,args=(1,))     t.start()
    print('主线程')

 1010cc时时彩标准版 33

进程:

 1010cc时时彩标准版 34

queue.priorityQueue

 协程

  协程又叫做“微线程”,德文名“Coroutine”。线程和进程的操作时由程序触发系统接口,最后的施行者是系统,协程的操作则是技士。

协程存在的意义:对于二十多线程应用,CPU通过切条的秘籍来切换线程间的实施,线程切换时索要耗费时间(保存境况,下一次一而再)。协程则运用一个线程,在多少个线程中鲜明某些代码块实施各类。

协程的采纳景况:当程序中存在大批量没有供给CPU的操作时(IO操作),适用于协程。

协程的亮点:

  1. 二个线程实行! 和多线程相比较最大的优势正是协程相当高的进行功效。因为子程序切换不是线程切换,而是由程序本身调控,由此,未有线程切换的开销,和三十二线程比,线程数越多,协程的习性优势越鲜明。
  2. 不要求锁机制!因为唯有二个线程,也不设有同期写变量的争执,在协程中决定分享能源不加锁,只须求看清状态就好了,所以进行功用比二十四线程高比比较多。

协程如何接纳多核CPU呢?最简易的主意是多进度 协程。

协程是通过第三方模块来促成的,主如果greenlet和gevent。本质上,gevent是对greenlet的高档封装,因而多用于gevent。

1010cc时时彩标准版 35 greenlet示例

from greenlet import greenlet

def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()

def test2():
print(56)
gr1.switch()
print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

# 实际上,greenlet正是透过switch方法在分化的天职间开展切换

greenlet示例

1010cc时时彩标准版 36 gevent示例

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
print('GET: %s' % url)
resp = requests.get(url)
data = resp.text
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
gevent.spawn(f, ''),
gevent.spawn(f, ''),
gevent.spawn(f, ''),
])

# 通过joinall将义务f和它的参数进行联合调整,完毕单线程中的协程。

gevent示例

 

1.3.2 方法二

from threading import Thread
from multiprocessing import Process

class MyThread(Thread):
    def __init__(self,n):
        super().__init__()
        self.n=n
    def run(self):
        print('%s is running' % self.n)

if __name__ == '__main__':
    t=MyThread(2)
    t.start()
    print('主线程')

 1010cc时时彩标准版 37

  优先级队列中一旦第三个参数一样,前面包车型地铁可比艺术为上边包车型客车描述.

2 进程和线程的界别

  假诺说值里面包车型地铁因素是数字类型,那么当八个值的先行级相同期,相比的是四个值的大小,小的优先被抽出来.假使成分是字符串,那么依次相比每一个字母的ascii表中的地方,小的优

2.1 进度和线程的分别

1、线程分享创建它的进度的地址空间;过程有投机的地方空间。

2、线程可以一贯访谈进程的数据,子进度独立复制父进度的数据。

3、线程能够与和煦进程中的其余线程沟通,进程必得利用进度间的通信能力与其余进度沟通。

4、新的线程很轻便成立,新的进度须求复制父进度。

5、线程可以决定本进度中的别的线程,进程只好调节子进度

6、更换主线程大概影响进程中别的线程的一坐一起,改换父进度不影响子进度

 

先被抽取来.借使put的数据是二个元组,元组的率先个参数是事先级数字,数字越小优先级越高,越先被get到被抽取来,第贰个参数是put进去的值,假如说优先级一样,那么值别忘了应

2.2 "子"线程与主线程的pid都是均等的

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('%s is running' %os.getpid())

if __name__ == '__main__':
    t=Thread(target=work)
    t=Process(target=work)
    t.start()
    print('主线程',os.getpid())

 1010cc时时彩标准版 38

该是同样的数据类型,字典不行

2.3 同一进程内的多少个线程共享该进度的能源

from threading import Thread
from multiprocessing import Process
import os
n=100

def work():
    global n
    n=0
if __name__ == '__main__':
    t=Thread(target=work)
    # t=Process(target=work)     t.start()
    t.join()
    print('主线程',n)

 1010cc时时彩标准版 39

1010cc时时彩标准版 40

 

3. 线程池

本文由1010cc时时彩标准版发布于三分时时彩1010CC,转载请注明出处:1010cc时时彩标准版:python自动化开发学习,并发

关键词:

上一篇:没有了

下一篇:没有了