Deferred based Tasks

I’m not sure how to call this, but this is basically the method of doing multiple things “simultaneously” or more accurately having outstanding operations with a single thread that we used last year in robocup 2009. The example uses predetermined wait actions but imagine these actions are actually “turn head”, “walk forward 1 meter” etc. Last night a conversation led me to write this almost self contained example.

from twisted.internet.defer import Deferred

class Scheduler:
    def __init__(self):
        self.activities = []
        self.clock = 0

    def wait(self, t):
        """ example action - a real world action would start an actual
        action, here we simply record a time when this action should end
        and the deferred we give to the user """
        d = Deferred()
        self.activities.append((self.clock + t, d))
        return d

    def print_activities(self):
        print "%.2f: %s" % (self.clock, ','.join(
            '%.2f, %s' % (et, d2str(d)) for et, d in self.activities))

    def loop(self):
        """ main loop for test. The real loop (burst) actually checks
        for deferred completion, but here we record the time it will
        complete in activities and simply skip to this time directly
        """
        while self.clock < 20:
            deleted = []
            #print_activities()
            self.clock = min(et for et, d in self.activities) # progress time
            for i in xrange(len(self.activities)):
                endt, d = self.activities[i]
                if endt >= self.clock:
                    #print "calling %s" % (d2str(d))
                    d.callback(None)
                    deleted.append(i)
            for i in reversed(deleted):
                del self.activities[i]

sched = Scheduler()

def d2str(d):
    f = d.callbacks[1][0][0]
    return '%s.%s' % (f.im_class.__name__, f.im_func.func_name)


class Task:
    _name = 'unnamed'
    def __init__(self):
        self._active = False
        self._done_tasks = []
        self._act_count = 0
    def onDone(self, task):
        self._done_tasks.append(task)
    def fire(self, _ignored_=None):
        """ do the action if we are not already doing it """
        if self._active: return
        self._active = True
        d = self.act()
        d.addCallback(self._resetActive)
        for t in self._done_tasks:
            d.addCallback(t.fire)
    def act(self):
        self._act_count += 1
        print "%s: %s start" % (self._name, self._act_count)
        return self._act()
    def _act(self):
        raise NotImplementedError
    def _resetActive(self, _ignored_):
        print "%s: %s done" % (self._name, self._act_count)
        self._active = False


class Task1(Task):
    _name = 'task1'
    def _act(self):
        return sched.wait(1.0)

class Task2(Task):
    _name = 'task2'
    def _act(self):
        return sched.wait(1.0)

class Task3(Task):
    _name = 'task3'
    def _act(self):
        return sched.wait(1.0)

def test():
    t1 = Task1()
    t2 = Task2()
    t3 = Task3()
    t1.onDone(t2)
    t2.onDone(t3)
    t2.onDone(t1)
    t3.onDone(t2)
    t1.fire()

if __name__ == '__main__':
    test()
    sched.loop()

* To be self contained it would need to define Deferred itself, which is not hard since I’m using very few features, but just apt-get install python-twisted or yum install python-twisted-core would get you twisted which is a very capable library to know.

Leave a Reply