runtime/Worker.py
changeset 3851 4e1906d119d5
parent 3849 c3f4e114af38
equal deleted inserted replaced
3850:722846bd6680 3851:4e1906d119d5
    64         """
    64         """
    65         self._threadID = get_ident()
    65         self._threadID = get_ident()
    66         self.mutex.acquire()
    66         self.mutex.acquire()
    67         self.enabled = True
    67         self.enabled = True
    68         if args or kwargs:
    68         if args or kwargs:
    69             _job = job(*args, **kwargs)
    69             self.job = job(*args, **kwargs)
    70             _job.do()
    70             self.job.do()
    71             # _job.success can't be None after do()
    71             # fail if first job fails
    72             if not _job.success:
    72             if not self.job.success:
    73                 self.reraise(_job)
    73                 self.reraise(self.job)
       
    74             self.job = None
       
    75 
       
    76         self.free.notify()
    74 
    77 
    75         while not self._finish:
    78         while not self._finish:
    76             self.todo.wait_for(lambda: self.job is not None)
    79             self.todo.wait_for(lambda: self.job is not None)
    77             self.job.do()
    80             self.job.do()
    78             self.done.notify()
    81             self.done.notify()
       
    82             self.job = None
       
    83             self.free.notify()
    79             
    84             
    80         self.mutex.release()
    85         self.mutex.release()
    81 
    86 
    82     def interleave(self, waker, stopper, *args, **kwargs):
    87     def interleave(self, waker, stopper, *args, **kwargs):
    83         """
    88         """
    88         self._threadID = get_ident()
    93         self._threadID = get_ident()
    89         self.stopper = stopper
    94         self.stopper = stopper
    90 
    95 
    91         def do_pending_job():
    96         def do_pending_job():
    92             self.mutex.acquire()
    97             self.mutex.acquire()
    93             if self.job is not None:
    98             self.job.do()
    94                 self.job.do()
    99             self.done.notify_all()
    95                 self.done.notify_all()
       
    96             self.mutex.release()
   100             self.mutex.release()
    97 
   101 
    98         def wakerfeedingloop():
   102         def wakerfeedingloop():
    99             self.mutex.acquire()
   103             self.mutex.acquire()
   100             self.enabled = True
   104             self.enabled = True
   102             # Handle first job
   106             # Handle first job
   103             if args or kwargs:
   107             if args or kwargs:
   104                 self.job = job(*args, **kwargs)
   108                 self.job = job(*args, **kwargs)
   105                 waker(do_pending_job)
   109                 waker(do_pending_job)
   106                 self.done.wait_for(lambda: self.job.success is not None)
   110                 self.done.wait_for(lambda: self.job.success is not None)
       
   111                 # fail if first job fails
   107                 if not self.job.success:
   112                 if not self.job.success:
   108                     self.reraise(_job)
   113                     self.reraise(self.job)
   109                 self.job = None
   114                 self.job = None
   110 
   115 
   111             self.free.notify()
   116             self.free.notify()
   112 
   117 
   113             while not self._finish:
   118             while not self._finish: