Replaced Scheduler's internal use of threading.Event with threading.Semaphore
to prevent race condition that caused timeouts.
This commit is contained in:
parent
203a8f9e70
commit
c34811f378
|
@ -1,5 +1,5 @@
|
||||||
from time import time, sleep
|
from time import time, sleep
|
||||||
from threading import Thread, Event
|
from threading import Thread, Semaphore
|
||||||
|
|
||||||
|
|
||||||
class Scheduler( object ):
|
class Scheduler( object ):
|
||||||
|
@ -14,10 +14,11 @@ class Scheduler( object ):
|
||||||
self.__messages = {} # map of thread to list of its incoming messages
|
self.__messages = {} # map of thread to list of its incoming messages
|
||||||
self.__thread = None # currently executing microthread (if any)
|
self.__thread = None # currently executing microthread (if any)
|
||||||
self.__done = False # whether it's time to exit
|
self.__done = False # whether it's time to exit
|
||||||
self.__no_longer_idle = Event()
|
self.__idle = Semaphore( 0 )
|
||||||
self.__last_error = None # used for unit tests
|
self.__last_error = None # used for unit tests
|
||||||
|
|
||||||
self.add( self.__idle_thread() )
|
self.add( self.__idle_thread() )
|
||||||
|
self.__idle.acquire() # don't count the idle thread
|
||||||
|
|
||||||
self.__scheduler_thread = Thread( target = self.run )
|
self.__scheduler_thread = Thread( target = self.run )
|
||||||
self.__scheduler_thread.setDaemon( True )
|
self.__scheduler_thread.setDaemon( True )
|
||||||
|
@ -72,16 +73,15 @@ class Scheduler( object ):
|
||||||
self.add( result )
|
self.add( result )
|
||||||
|
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
|
self.__idle.acquire( blocking = False )
|
||||||
self.__running.remove( thread )
|
self.__running.remove( thread )
|
||||||
self.__messages.pop( thread, None )
|
self.__messages.pop( thread, None )
|
||||||
|
|
||||||
def __idle_thread( self ):
|
def __idle_thread( self ):
|
||||||
while not self.__done:
|
while not self.__done:
|
||||||
# if the idle thread is the only one running, block until there's another running thread
|
# if the idle thread is the only one running, block until there's another running thread
|
||||||
if len( self.__running ) == 1:
|
self.__idle.acquire( blocking = True )
|
||||||
self.__no_longer_idle.wait()
|
self.__idle.release()
|
||||||
self.__no_longer_idle.clear()
|
|
||||||
|
|
||||||
yield None
|
yield None
|
||||||
|
|
||||||
# used for unit tests
|
# used for unit tests
|
||||||
|
@ -98,11 +98,12 @@ class Scheduler( object ):
|
||||||
sleep( self.IDLE_SLEEP_SECONDS )
|
sleep( self.IDLE_SLEEP_SECONDS )
|
||||||
|
|
||||||
def sleep( self, thread ):
|
def sleep( self, thread ):
|
||||||
|
self.__idle.acquire( blocking = False )
|
||||||
self.__sleeping.append( thread )
|
self.__sleeping.append( thread )
|
||||||
self.__running.remove( thread )
|
self.__running.remove( thread )
|
||||||
|
|
||||||
def add( self, thread, *args ):
|
def add( self, thread, *args ):
|
||||||
self.__no_longer_idle.set()
|
self.__idle.release()
|
||||||
|
|
||||||
if thread in self.__sleeping:
|
if thread in self.__sleeping:
|
||||||
self.__sleeping.remove( thread )
|
self.__sleeping.remove( thread )
|
||||||
|
@ -116,7 +117,7 @@ class Scheduler( object ):
|
||||||
|
|
||||||
def shutdown( self ):
|
def shutdown( self ):
|
||||||
self.__done = True
|
self.__done = True
|
||||||
self.__no_longer_idle.set()
|
self.__idle.release()
|
||||||
self.__scheduler_thread.join()
|
self.__scheduler_thread.join()
|
||||||
|
|
||||||
# currently executing microthread (if any)
|
# currently executing microthread (if any)
|
||||||
|
|
Reference in New Issue
Block a user