异步回调对象(Deferred)

对于异步编程来说,能否进行异步回调是非常关键的,如果没有办法获取到异步调用的结构,那么整个程序就会失控,不仅仅是维护困难,性能也会受到很大的影响。异步编程也就没有意义了。

Deferred对象就是twisted提供给我们专门用来管理异步回调的,有了defered,即可对任务的执行过程进行监控管理,防止程序在云效过程中 ,由于等待某个任务的完成而陷入阻塞停滞,可以有效的提高程序整体的运行效率。

Deferred实现过程中并没有使用到reactor,所以可以不在事件循环中使用Defred。

基本使用

#引入Deferred
from twisted.internet import defer
import time

def func1(num):
    # 创建一个defer对象,
    deferd =defer.Deferred()
    # 定义的任务执行具体内容 --假设这是未来在reactor中执行的任务
    num += 1
    print("num  计算完成")
    # 将结果放到defer的callback中 (固定写法)
    deferd.callback(num)
    print("func1 结束")
    return deferd

def func1_callBack(num):
    time.sleep(3)
    print("得到函数1的执行结果: num={0}".format(num))
    print(f"得到函数1的执行结果: num={num}")

if __name__ == '__main__':
    defered = func1(10)
    print("异步调用开始了")
    #通过调用回调函数处理结果。
    defered.addCallback(func1_callBack)

Deferred 结合reactor 使用

通过reactor 模拟任务耗时过长。

#引入defer
from twisted.internet import defer
from twisted.internet import reactor

def func1(num):
    # 创建一个Defred 对象,
    deferd =defer.Deferred()
    # 定义的任务执行具体内容 模拟真实任务的耗时(实际工作无意义)
    call = reactor.callLater(3,deferd.callback,num+1) # 异步定时调用一次
    print("func1 结束")
    return deferd

def func1_callBack(num):
    print("得到函数1的执行结果: num={0}".format(num))

if __name__ == '__main__':
    defered = func1(10)
    print("异步调用开始了")
    defered.addCallback(func1_callBack)
    reactor.run()

回调异常

可以通过addErrback 注册异常回调处理函数。

#引入defer
from twisted.internet import defer
from twisted.internet import reactor

def func1(num):
    # 创建一个Defred 对象,
    deferd =defer.Deferred()
    #判断入参,若入参非整形,则模拟抛出异常
    if isinstance(num, int):
        call = reactor.callLater(2, deferd.callback, num+1) 
    else:
        call = reactor.callLater(2, deferd.errback, ValueError('请输入整形数')) 
    print("func1 结束")
    return deferd

def func1_callBack(num):
    print("得到函数1的执行结果: num={0}".format(num))

def func1_errBack(fail):
    print("异常回调.[{0}] {1}".format(
        fail.type.__name__,
        fail.value)
        )

if __name__ == '__main__':
    defered = func1('ddd')
    print("异步调用开始了")
    #正常调用
    defered.addCallback(func1_callBack)    
    #错误调用
    defered.addErrback(func1_errBack)
    #可以同时调用
    #defered.addCallback(func1_callBack).addErrback(func1_errBack)
    print('添加完成了')
    reactor.run()

Deferred链式调用

Deferred 对象使用来管理回调函数的,该对象定义了成功回调链和失败回调链(一个对象有一个callback/errback回调链,该链按照添加的顺序依次排列,列,即按顺序回调),Deferred 只能被激活一次
Deferred 对象一提供了四个方法让我们可以向实例对象中添加callback回调链/errback回调链:

  • addCallback(callback):添加一个成功回调链
  • addErrback(errback): 添加一个失败回调链
  • addCallbacks(callback,errbak):添加一对回调链分别是成功回调和错误回调。
  • addBoth(anyback):在成功回调和失败回调链都添加一个anyback。

全正常的回调链路

每次回调都是正常的。

#引入defer
from twisted.internet import defer
from twisted.internet import reactor
import time

def func1(num):
    # 创建一个Defred 对象,
    deferd =defer.Deferred()
    #判断入参,若入参非整形,则模拟抛出异常
    if isinstance(num, int):
        call = reactor.callLater(2, deferd.callback, num+1) 
    else:
        call = reactor.callLater(2, deferd.errback, ValueError('请输入整形数')) 
    print("func1 结束")
    return deferd

# 模拟成功回调1
def st1(num):
    print("步骤1: num = {0}".format(num))
    num += 1
    return num

def st2(num):
    print("步骤2: num = {0}".format(num))
    num += 1
    return num
def st3(num):
    print("步骤3: num = {0}".format(num))
    num += 1
    return num  

if __name__ == '__main__':
    defered = func1(10)
    print("异步调用开始了")
    #正常调用链式添加三个
    defered.addCallback(st1).addCallback(st2).addCallback(st3)     
    print('添加完成了')
    reactor.run()

成功与失败链交互调用。

#引入defer
from twisted.internet import defer
from twisted.internet import reactor

def func1(num):
    # 创建一个Defred 对象,
    deferd =defer.Deferred()
    #判断入参,若入参非整形,则模拟抛出异常
    if  num < 20:
        call = reactor.callLater(2, deferd.callback, num+5) 
    else:
        call = reactor.callLater(2, deferd.errback, ValueError(num)) 
    print("func1 结束")
    return deferd

# 模拟成功回调1 当数值大于11时
def st1(num):
    print("步骤1: num = {0}".format(num))
    if num < 20 :
        #返回非异常
        return num
    else: 
        #抛出异常
        raise ValueError(num)

# 模拟成功回调2 (本函数应当被跳过调用)
def st2(num):
    print("步骤2: num = {0}".format(num))
    if num > 11 :
        #返回非异常
        return num
    else:
        #抛出异常
        raise ValueError(num)

# 模拟成功回调3 当数值大于11时
def st3(num):
    print("步骤3: num = {0}".format(num))
    num -= 5
    if num > 10 :
        #返回非异常
        return num
    else:  
        #抛出异常
        raise ValueError(num)

# 第一个错误处理
def eb1(fail):
    print(f"错误1 :{fail.type.__name__} : {fail.value} 太大了,调小一下")
    # 返回一个正常值
    res = int(int(str(fail.value)) / 2)
    if res > 5 :
        #返回非异常
        return res
    else:
        #抛出异常
        raise ValueError(res)

# 第二个错误处理    
def eb2(fail):
    print(f"错误2 :{fail.type.__name__} : {fail.value} 太小了,调大一下")
    # 返回错误
    res = int(str(fail.value)) + 5
    if res < 10 :
        #返回非异常
        return res
    else:
        #抛出异常
        raise ValueError(res)
    
# 第三个错误处理
def eb3(fail):
    print(f"错误3 :{fail.type.__name__} : {fail.value} 没救了")

    
if __name__ == '__main__':
    defered = func1(17)
    print("异步调用开始了")
    # 将所有函数按照一定顺序添加到链中。
    defered.addCallback(st1).addCallback(st2).addErrback(eb1).addCallback(st3).addErrback(eb2).addErrback(eb3)       
    print('添加完成了')
    reactor.run()

下图为调用结果,用来方便理解调用过程,(注意:异常需要是抛出)在上述的例子中每一次成功回调的正常返回都是下一次正常回调的输入,每一次异常发生,都是下一次错误回调的输入。并且无论正常还是错误回调,都可以产生正常或异常的结果。
twistedreferred.png

原子性回调链

Deferedlist

当我们需要多份数据全部返回后才能继续操作,例如取用户的不同信息。遇到这种情况时就需要可以使用DeferredList对象
Deferredlist可以在列表中所有的deferred对象就绪后执行指定的回调函数,从本质上说它返回了一个新的deferred对象。
Deferredlist可以保证列表中每个回调链读调用一遍。

语法:

from twisted.internet.defer import Deferredlist
DeferredList(deferrlist, fireOnOneCallback=0, fireOnOneErrback=1, consumeErrors=1).addCallback(deferrlist_suc).addErrback(deferrlist_suc)
#deferlist  等待的defer列表
#fireOnOnecallback 是否列表中任一defered对象callback之后,deferredlist就回调callback
#fireOnOneErrback 是否列表中任一defered对象errback之后,deferredlist就回调errback
#consumeErrors 是否列表中任一defered对象执行过程中出现错误后,如果deferr对象未指定errback,deferredlist就回调errback

实际样例(consumeErrors使用仍存疑)

#引入defer
from twisted.internet import defer

# 模拟回调1 
def st1(res):
    print("步骤1: res = {0}".format(res))
    return "我是步骤1的结果"

# 模拟回调2
def st2(res):
    print("步骤2: res = {0}".format(res))
    if isinstance(res,str):
       return "我是步骤2的结果"
    else:
       raise res
    
# 模拟回调3 
def st3(res):
    print("步骤3: res = {0}".format(res))
    return "我是步骤3的结果"

def func(res):
    print("结果: ",res)

def func_e(res):
    print("错误结果: ",res)
    
if __name__ == '__main__':
    # 创建两个Defred 对象,创建一个列表,并添加回调内容。
    d1 =defer.Deferred()
    d1.addCallback(st1)

    d2 =defer.Deferred()
    d2.addCallback(st2)
    #d2.addErrback(st2)
    
    d3 =defer.Deferred()
    d3.addCallback(st3)
    
    # 声明一个回调列表,返回了一个deferred对象。
    # 第一种情况都不配置, (都为false)  当列表中出现异常回掉的时候,仍执行所有的列表,但是并不会抛出异常结果,且执行的callback
    dl = defer.DeferredList([d1,d2,d3])

    # 第二种情况 配置fireOnOnecallback为true 当第一个执行回调callback成功时立即回调列表指定的callback 回调入参为第一个成功回调的返回值及所在列表位置。
    #dl = defer.DeferredList([d1,d2,d3],fireOnOneCallback=True)        

    # 第三种情况 配置fireOnOneErrback为true 当第一次执行回调出现错误时立即回调列表指定的errback 回调入参为第一个失败错误信息。
    #dl = defer.DeferredList([d1,d2,d3], fireOnOneErrback=True) 

    # 第四种情况 consumeErrors=True  当第一次执行回调出现错误时(抑制err产生),仍执行所有的列表,但是并不会抛出异常结果,且执行的callback
    #dl = defer.DeferredList([d1,d2,d3], consumeErrors=True) 
    
    #模拟进行回调。
    d1.callback(1)  
    d2.errback(Exception("错误2"))
    d3.callback(3)

gatherResults

gatherResults与deferredlist 类似,也是等待多个defered,不同的是,gatherResults只要有一个回调链异常就会触发整体的异常,可以保证列表中所有的对象都正常的回调了才会成功回调。整个会掉链都成功,即整个回调链执行完成,无异常抛出出,才会返回整体的结果,否则执行整体的errback。

#引入defer
from twisted.internet import defer

# 模拟回调1 
def st1(res):
    print("步骤1: res = {0}".format(res))
    return "我是步骤1的结果"

# 模拟回调2
def st2(res):
    print("步骤2: res = {0}".format(res))
    #if isinstance(res,int):
    #   return "我是步骤2的结果"
    #else:
    #   raise res
    return "我是步骤2的结果"
    
# 模拟回调3 
def st3(res):
    print("步骤3: res = {0}".format(res))
    return "我是步骤3的结果"

def func(res):
    print("结果: ",res)

def func_e(res):
    print("错误结果: ",res)
    
if __name__ == '__main__':
    # 创建两个Defred 对象,创建一个列表,并添加回调内容。
    d1 =defer.Deferred()
    d1.addCallback(st1)

    d2 =defer.Deferred()
    #d2.addCallback(st2)
    d2.addErrback(st2)
    
    d3 =defer.Deferred()
    d3.addCallback(st3)
    
    # 声明一个回调列表,返回了一个deferred对象。
    # 只有三个都成功,即整个回调链执行完成,无异常爆出,才会返回整体的结果,否则执行整体的errback。
    dl = defer.gatherResults([d1,d2,d3])

    #模拟进行一次正常回调。
    d1.callback(1)  
    #d2.callback(2)
    d2.errback(Exception("错误2"))
    d3.callback(3)
    
    # 对整体数据判断
    dl.addCallback(func).addErrback(func_e)

内联回调函数

inlineCallbacks 是一个装饰器,他总是装饰生成器函数,如那些使用yield的函数,inlineCallbacks 唯一目的是将一个个生成器转换为一系列的异步回调,每个回调被yield分隔,yield的返回值会传到下一个回调。

该修饰器为我们节省了大量的代码。、

一个简单的示例:

from twisted.internet import defer

def func1(num):
    d = defer.Deferred()
    d.callback("fun1执行了 , {0}".format(num))
    return d

@defer.inlineCallbacks
def main():
    # 此处为我们省去了addcallback的过程,直接输出了结果
    res =  yield func1(1)
    print(res)

if __name__ ==  "__main__":
    main()

实现同步非阻塞

deferToThread 使用线程实现;
twisted.deferToThread 返回一个deferred对象,把回调函数在另一个线程中去处理,主要用于io操作。
扩展 callinThread 和 callfromThread.

"""
去线程里执行 异步任务
"""
from  twisted.internet  import defer,reactor
from twisted.internet.threads import deferToThread
import time

def sleep_run(num):
    print("执行开始了")
    time.sleep(3)
    print("sleep 结束了")
    return num+1
    
def callback(res):
    print(res)

if __name__ == "__main__":
    #sleep_run = functools.partial(sleep_run, num=1)    
    d = deferToThread(sleep_run,1)
    d.addCallback(callback)
    #time.sleep(10)
    print("主线程结束了")
    reactor.run()

Q.E.D.