Runtime/MainWorker: fix blocking race condition
authorEdouard Tisserant <edouard.tisserant@gmail.com>
Thu, 28 Sep 2023 18:14:57 +0200
changeset 3846 cf027bfe2653
parent 3845 d7f9b6af98ef
child 3847 832c257d5618
Runtime/MainWorker: fix blocking race condition
runtime/Worker.py
--- a/runtime/Worker.py	Thu Sep 28 18:00:21 2023 +0200
+++ b/runtime/Worker.py	Thu Sep 28 18:14:57 2023 +0200
@@ -8,9 +8,7 @@
 # See COPYING.Runtime file for copyrights details.
 
 
-from threading import Lock, Condition, Thread
-
-import _thread
+from threading import Lock, Condition, Thread, get_ident
 
 
 class job(object):
@@ -64,7 +62,7 @@
         """
         meant to be called by worker thread (blocking)
         """
-        self._threadID = _thread.get_ident()
+        self._threadID = get_ident()
         self.mutex.acquire()
         self.enabled = True
         if args or kwargs:
@@ -75,13 +73,10 @@
                 self.reraise(_job)
 
         while not self._finish:
-            self.todo.wait()
-            if self.job is not None:
-                self.job.do()
-                self.done.notify()
-            else:
-                break
-
+            self.todo.wait_for(self.job is not None)
+            self.job.do()
+            self.done.notify()
+            
         self.mutex.release()
 
     def interleave(self, waker, stopper, *args, **kwargs):
@@ -90,39 +85,42 @@
         additionaly, it creates a new thread to wait for new job.
         """
         self.feed = Condition(self.mutex)
-        self._threadID = _thread.get_ident()
+        self._threadID = get_ident()
         self.stopper = stopper
 
+        def do_pending_job():
+            self.mutex.acquire()
+            if self.job is not None:
+                self.job.do()
+                self.done.notify_all()
+            self.mutex.release()
+
         def wakerfeedingloop():
             self.mutex.acquire()
             self.enabled = True
+
+            # Handle first job
             if args or kwargs:
-                def first_job_todo():
-                    _job = job(*args, **kwargs)
-                    _job.do()
-                    if not _job.success:
-                        self.reraise(_job)
-                    self.mutex.acquire()
-                    self.feed.notify()
-                    self.mutex.release()
-                waker(first_job_todo)
-                self.feed.wait()
+                self.job = job(*args, **kwargs)
+                waker(do_pending_job)
+                self.done.wait_for(lambda: self.job.success is not None)
+                if not self.job.success:
+                    self.reraise(_job)
+                self.job = None
+
+            self.free.notify()
 
             while not self._finish:
-                self.todo.wait()
-                def job_todo():
-                    self.mutex.acquire()
-                    if self.job is not None:
-                        self.job.do()
-                        self.feed.notify()
-                        self.done.notify()
-                    self.mutex.release()
+                self.todo.wait_for(lambda: self.job is not None)
                 if self._finish:
                     break
-                waker(job_todo)
-                self.feed.wait()
+                waker(do_pending_job)
+                self.done.wait_for(lambda: self.job.success is not None)
+                self.job = None
+                self.free.notify()
 
             self.mutex.release()
+
         self.own_thread = Thread(target = wakerfeedingloop)
         self.own_thread.start()
 
@@ -135,7 +133,7 @@
         self.enabled = False
         self.job = None
         self.todo.notify()
-        self.done.notify()
+        self.done.notify_all()
         self.mutex.release()
         self.own_thread.join()
 
@@ -149,7 +147,7 @@
 
         _job = job(*args, **kwargs)
 
-        if self._threadID == _thread.get_ident():
+        if self._threadID == get_ident():
             # if caller is worker thread execute immediately
             _job.do()
         else:
@@ -159,13 +157,11 @@
                 self.mutex.release()
                 raise EOFError("Worker is disabled")
 
-            while self.job is not None:
-                self.free.wait()
+            self.free.wait_for(lambda: self.job is None)
 
             self.job = _job
             self.todo.notify()
-            self.done.wait()
-            self.job = None
+            self.done.wait_for(lambda: _job.success is not None)
             self.free.notify()
             self.mutex.release()