May 16, 2017

[python][twisted][original][note] Twisted really so twisted? The essence of deferred object.

Purpose of this article

  • Understand Deffered class 
  • Know how inlineCallbacks decorator works 
  • How these enable async code flow

Time of reading: 27 mins

Deffered Class

Deffered class is designed with Fluent interface (https://en.wikipedia.org/wiki/Fluent_interface#Python)
which enables the use of @inlineCallbacks.

Example code of Fluent interface:

class FluentPerson(object):
    def run(self):
        print("run\n")
        return self
    def jump(self):
        print("jump\n")
        return self
    def walk(self):
        print("walk\n")
        return self
         
if __name__ == "__main__":
    """prints:
        run
        jump
        walk
    """
    FluentPerson().run().jump().walk()


Deferred Object
Deffered type

4 major member functions
  • addCallback(self, callback, *args, **kw)
  • addErrback(self, errback, *args, **kw)
  • callback(self, result)
  • errback(self, fail=None)
Example
from twisted.internet.defer import (                                           
    Deferred)                                                                  
                                                                                 
                                                                                 
def callback(result, arg_1):                                                   
    """                                                                        
    :param result: argument passed by Deffered.callback(arg)                   
    :param arg_1: as argument passed in by Deffered.addCallback()              
    """                                                                        
    print(result)                                                              
    print(arg_1)                                                               
                                                                                 
                                                                                 
def errcallback(result, arg_1):                                                
    """                                                                        
    :param result: argument passed by Deffered.errback(arg), which will be wrapped in Failure instance.              
    :param arg_1: as argument passed in by Deffered.addErrback()              
    """                                                                        
    print(result)                                                              
    print(arg_1)
     
     
if __name__ == "__main__":                                                     
    """                                                                        
    Print:                                                                     
    42                                                                         
    arg_1                                                                      
    """                                                                        
    d = Deferred()                                                             
    d.addCallback(callback, "arg_1")                                           
    d.callback(42)                                                             
                                                                                 
    # Can not call .callback or errback more than once.                        
    """                                                                        
    Print:                                                                     
    [Failure instance: Traceback (failure with no frames): class twisted.python.failure.defaultexception="": Test fail.                     
    ]                                                                          
    arg_1                                                                      
    """                                                                        
    d = Deferred()                                                             
    d.addErrback(errcallback, "arg_1")                                                                               
    d.errback("Test fail.") # If without Argument, a real exception should have been throwed.

@inlineCallbacks decorator

This is where light weight thread(green thread) begins.

Idea


Utilizing python yield(aka. generator/coroutine) with deferred object to create async programming.

Something about python 'yield'

  • yield is an expression, which will be evaluated during runtime.(i.e not compile time)
  • generator.send(None) will trigger the generator, same as calling next(generator)
  • a generator won't be triggered to run until we call generator.send(None) or next(generator)
  • the left hand side equation of yield expression will get the value from caller calling generator.send(VALUE).

Above four concepts are important for us to understand how inlineCallbacks works.
(and how it's implemented.)
inlineCallbacks

Example
#!/bin/env python                                                              
def returnVal():                                                                                                     
    print("step_2")                                                            
    return 42                                                                  
                                                                                 
                                                                                 
def yieldfunc():                                                               
    result = yield returnVal()  # generator stops here on the right hand side of equation until caller calls .send(VALUE)                                      
    print(result)                                                              
                                                                                 
                                                                                 
if __name__ == "__main__":                                                     
    """                                                                        
    Print:                                                                     
        step_0                                                                 
        step_1                                                                 
        step_2                                                                 
        step_3                                                                 
        42                                                                     
        None                                                                   
        ---                                                                    
        step_2                                                                 
        42                                                                     
        None                                                                   
    """
    print("step_0")                                                            
    g = yieldfunc()                                                            
    print("step_1")                                                            
    # yieldfunc's yield returnVal() won't be evaluated till g.send(None)       
    # being called.                                                            
    result = g.send(None)                                                      
    print("step_3")                                                            
                                                                                 
    print(result)                                                              
                                                                                 
    try:                                                                       
        """                                                                    
        Last call to generator will throw StopIteration exception.             
        However, if using 'for' loop, it will catch StopIteration exception    
            for us.                                                            
        """                                                                    
        g.send(None)                                                           
    except StopIteration as e:                                                 
        pass                                                                   
                                                                                 
    print("-" * 3)                                                             
    # call it again using for loop, no need to catch StopIteration exception.  
    g = yieldfunc()                                                            
    for r in g:                                                                
        print(r)



The power of @inlineCallbacks

The @inlineCallbacks decorated generator function will return deferred object.
Thus, it allows the chaining of deferred objects through this fluent interface design.
(If remember how key_monad is designed, it's the same idea.)

Example:
#!/bin/env python                                                               
from twisted.internet.defer import (                                            
    inlineCallbacks, returnValue)                                              
                                                                                 
                                                                                 
@inlineCallbacks                                                               
def begin():                                                                   
    result = yield 42                                                          
    returnValue(result)                                                        
                                                                                 
                                                                                 
if __name__ == "__main__":                                                     
    """                                                                        
    Print:                                                                     
        42                                                                                                           
    """                                                                        
    deferred = begin()                                                         
    print(deferred.result)

Above code Explained



So how does these add up?

Consider replacing
yield 42
with yielding another deferred object...

And when the last of the deferred object chain's deferred object being called .callback(VALUE)
which will trigger a chain reaction(like function recursive call stack rewinds) by poping
up the 'VALUE' up to the chained deferred objects.

And consider the last deferred object's .callback will be called through epoll.

But wait,
You should argue how this becomes an async call, shouldn't the chain of reaction a blocking call
until epoll has the value returned? Bunch of blocking calls layer by layer?


The magic comes from closure and yield.
https://github.com/twisted/twisted/blob/twisted-16.5.0/src/twisted/internet/defer.py#L1316

That is, each deferred object has the references to it's parent's deferred object's generator
and deferred object.

While child's deferred object has yielded a result, we'll call parent's generator's .send()
to send the result to the left hand side of yield expression.

Since the caller already has the parent's deferred object instance, which the code carries on
the workflow(unblocking), until the epoll trigger the child's result will the parent's deferred object's .callback() being called.

If we had registered callback functions to parent's deferred object,
when epoll has return value from child deferred object,
the parent's registered callback functions will be called with the result.

There's another trick to check if the child has the .callback() being called is that
to check parent's deferred object has data member ".result" or not.
(Not recommended.)

Here the example code for this chain reaction:
#!/bin/env python                                                                                                    
import time                                                                    
import threading                                                               
                                                                                 
from twisted.internet.defer import (                                           
    Deferred, inlineCallbacks, returnValue)                                    
                                                                                 
                                                                                 
@inlineCallbacks                                                               
def begin(defer):                                                              
    """                                                                        
    The most outter defer instance.                                            
    It's chaining the defer instances to the end.                              
    If there's no result from chainning defer instances, simply return a       
    un-called defer instance back to caller.                                   
    """                                                                        
    result = yield defer                                                       
                                                                                 
    """                                                                        
    The result would be the chained defer instances' result.                   
    """                                                                        
    returnValue(result)                                                        
                                                                                 
 
def thread_run():                                                              
    """                                                                        
    This thread function mimic epolling.                                       
    While there's a result from epoll, call the most end defer instance, and   
    triggers a chain reaction all the way back to the outtermost caller.       
    """                                                                        
    time.sleep(3)                                                              
    stand_alone_defer.callback(42)                                             
                                                                                 
                                                                                 
def collect(result):                                                           
    global called                                                              
    called = True                                                              
    # Must return the result for continuous call.                              
    return result
     
if __name__ == "__main__":
    """                                                                        
    Print:                                                                     
        waiting...                                                             
        waiting...                                                             
        waiting...                                                             
        waiting...                                                             
        waiting...                                                             
        waiting...                                                             
        waiting...                                                             
        Got outter most defer instance's result: 42                                                                  
    """
    global stand_alone_defer                                                   
    global called                                                              
    called = False                                                             
                                                                                 
    stand_alone_defer = Deferred()                                             
    stand_alone_defer.addCallback(collect)                                     
                                                                                 
    result = begin(stand_alone_defer)                                          
                                                                                 
    thread = threading.Thread(                                                 
        name="run deferred's callback", target=thread_run)                     
    thread.start()                                                             
                                                                                 
    while not called:                                                          
        print("waiting...")                                                                                          
                                                                                 
    thread.join()                                                              
                                                                                 
    print("Got outter most defer instance's result: {0}".format(result.result))

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.