pydi.reqspool
index
/cygdrive/c/dev/workspace/pydi_open/src/pydi/reqspool.py

Copyright (C) 2008 Eithon Cadag
 
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
 
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.
 
You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
 
reqspool.py - multi-threaded distribution and aggregation of arbitrary requests
 
Eithon Cadag
 
 
TODO: Implement prioritization?
 
Example usage:
 
    # Declare some service that needs running (this one just returns a random integer
    # after some waiting).
    class WaitAndReturnRandomInt(RequestExecutor):
        def __init__(self,reqmsg):
            RequestExecutor.__init__(self,reqmsg)
        
        def execute_request(self):
            import random
            time.sleep(random.randint(2,5))
            return random.randint(0,100)
    
    # Declare some processor to handle the results of dispatched requests.
    class SortingProcessor(RequestProcessor):
        def __init__(self):
            self.evens = []
            self.odds = []
    
        def update(self,msg):
            content = msg.content
            if content % 2 == 0:
                self.evens.append(content)
            else:
                self.odds.append(content)
 
    # Generate some random requests to the service
    ros = []
    for x in xrange(20):
        ros.append(RequestOrder(WaitAndReturnRandomInt(None),None))
    
    # Declare our processor and the request spooler
    proc = SortingProcessor()
    rts = RequestThreadSpooler(proc)
    rts.start()
    
    # Submit the requests; results can be retrieved through the
    # processor methods or attributes, or via the rts.response() call,
    # which clears the response queue and returns the (raw) results
    for x in ros:
        rts.request(x)
 
Note: Each new thread spawned allocates 1984Kb to virtual memory, and ~8-21+Kb are used upon initialization.

 
Modules
       
Queue
pydi.callback
inspect
threading
time
types

 
Classes
       
Queue.Queue
TrackingQueue
exceptions.Exception(exceptions.BaseException)
ActiveSpoolerInterruptException
ClosedSpoolerException
pydi.callback.UpdateCallBackRegister
RequestThread(threading.Thread, pydi.callback.UpdateCallBackRegister)
pydi.callback.UpdateObserver
RequestProcessor
RequestExecutor
RequestOrder
RequestOrderResponse
threading.Thread(threading._Verbose)
RequestThread(threading.Thread, pydi.callback.UpdateCallBackRegister)
RequestThreadSpooler

 
class ActiveSpoolerInterruptException(exceptions.Exception)
    
Method resolution order:
ActiveSpoolerInterruptException
exceptions.Exception
exceptions.BaseException
__builtin__.object

Methods defined here:
__init__(self, msg)

Data descriptors defined here:
__weakref__
list of weak references to the object (if defined)

Data and other attributes inherited from exceptions.Exception:
__new__ = <built-in method __new__ of type object at 0x6cb87ab0>
T.__new__(S, ...) -> a new object with type S, a subtype of T

Methods inherited from exceptions.BaseException:
__delattr__(...)
x.__delattr__('name') <==> del x.name
__getattribute__(...)
x.__getattribute__('name') <==> x.name
__getitem__(...)
x.__getitem__(y) <==> x[y]
__getslice__(...)
x.__getslice__(i, j) <==> x[i:j]
 
Use of negative indices is not supported.
__reduce__(...)
__repr__(...)
x.__repr__() <==> repr(x)
__setattr__(...)
x.__setattr__('name', value) <==> x.name = value
__setstate__(...)
__str__(...)
x.__str__() <==> str(x)

Data descriptors inherited from exceptions.BaseException:
__dict__
args
message
exception message

 
class ClosedSpoolerException(exceptions.Exception)
    
Method resolution order:
ClosedSpoolerException
exceptions.Exception
exceptions.BaseException
__builtin__.object

Methods defined here:
__init__(self, msg)

Data descriptors defined here:
__weakref__
list of weak references to the object (if defined)

Data and other attributes inherited from exceptions.Exception:
__new__ = <built-in method __new__ of type object at 0x6cb87ab0>
T.__new__(S, ...) -> a new object with type S, a subtype of T

Methods inherited from exceptions.BaseException:
__delattr__(...)
x.__delattr__('name') <==> del x.name
__getattribute__(...)
x.__getattribute__('name') <==> x.name
__getitem__(...)
x.__getitem__(y) <==> x[y]
__getslice__(...)
x.__getslice__(i, j) <==> x[i:j]
 
Use of negative indices is not supported.
__reduce__(...)
__repr__(...)
x.__repr__() <==> repr(x)
__setattr__(...)
x.__setattr__('name', value) <==> x.name = value
__setstate__(...)
__str__(...)
x.__str__() <==> str(x)

Data descriptors inherited from exceptions.BaseException:
__dict__
args
message
exception message

 
class RequestExecutor
    Abstract class that represent requests to be done, submittable to a spooler.
 
  Methods defined here:
__init__(self, req)
execute_request(self)
Extending classes should implement this method, and it is the method called by RequestThread
to carry out the request. Method *must* return, or the spawning thread will hang.
set_request(self, req)

 
class RequestOrder
    Base class for wrapping a request and any additional messages, such as for halting.
A request is composed of
- an object: Should be the object that actually executes the request (and as such, the object
should be initialized beforehand). For the object to work with request thread, the object must
extend the RequestExecutor class.
- a message: Any message pertinent to intermediaries handling the RequestOrder. This includes
halting messages to the managing thread.
 
  Methods defined here:
__init__(self, obj, str_msg)
get_content(self)
Returns the stored object in the request.
get_message(self)
set_content(self, c)
set_message(self, m)

Data descriptors defined here:
content
Returns the stored object in the request.
message

Data and other attributes defined here:
HALT = 'HALT'

 
class RequestOrderResponse(RequestOrder)
    Wrapper class to encapsulate responses to requests.
 
  Methods defined here:
__init__(self, obj, str_msg)

Methods inherited from RequestOrder:
get_content(self)
Returns the stored object in the request.
get_message(self)
set_content(self, c)
set_message(self, m)

Data descriptors inherited from RequestOrder:
content
Returns the stored object in the request.
message

Data and other attributes inherited from RequestOrder:
HALT = 'HALT'

 
class RequestProcessor(pydi.callback.UpdateObserver)
    Abstract class to handle callbacks from requests.
 
  Methods defined here:
update(self, msg)
msg is a RequestResponse

 
class RequestThread(threading.Thread, pydi.callback.UpdateCallBackRegister)
    Base class called by the spooler to execute requests.
 
 
Method resolution order:
RequestThread
threading.Thread
threading._Verbose
__builtin__.object
pydi.callback.UpdateCallBackRegister

Methods defined here:
__init__(self, id, req_queue, resp_queue, obs)
halt(self)
isIdle(self)
run(self)
Very simple native implementation of run; just executes the requested orders,
stores the output in the response queue.

Methods inherited from threading.Thread:
__repr__(self)
getName(self)
isAlive(self)
isDaemon(self)
join(self, timeout=None)
setDaemon(self, daemonic)
setName(self, name)
start(self)

Data descriptors inherited from threading._Verbose:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

Methods inherited from pydi.callback.UpdateCallBackRegister:
register = lockfnc(*args, **kwargs)

Data and other attributes inherited from pydi.callback.UpdateCallBackRegister:
processorLock = <thread.lock object at 0x7ff120e0>

 
class RequestThreadSpooler(threading.Thread)
    Class that handles thread management, task delegation.
TODO: Add in prioritization
 
 
Method resolution order:
RequestThreadSpooler
threading.Thread
threading._Verbose
__builtin__.object

Methods defined here:
__init__(self, proc, maxthreads=3)
clear_all_requests(self)
close(self)
Gracefully shuts down active and idle threads; this SHOULD be called before
exit, or expect Python to hang. The message to kill is RequestOrder.HALT.
halt_all(self)
request(self, reqmsg, oplock=None)
oplock is an optional lock to allow a main process to control when the requests are executed, within the
spooler.
response(self)
restart_all(self)
run(self)
spooler_state(self)
Retrieve the active, pending and loaded spooler requests; these
numbers are NOT guaranteed to be completely accurate, due to the nature
of how requests and place and removed from the queues by the delegating
threads.

Methods inherited from threading.Thread:
__repr__(self)
getName(self)
isAlive(self)
isDaemon(self)
join(self, timeout=None)
setDaemon(self, daemonic)
setName(self, name)
start(self)

Data descriptors inherited from threading._Verbose:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
class TrackingQueue(Queue.Queue)
    Extended Queue class with internal counters to keep track of
the number of requests/jobs added over the life of the Queue.
 
  Methods defined here:
__init__(self)
get(self)
Takes a request from the Queue and increments a counter; uses the base Queue.Queue.get method.
number_submitted(self)
number_taken(self)
put(self, req)
Adds a request to the Queue and increments a counter; uses the base Queue.Queue.put method.

Methods inherited from Queue.Queue:
empty(self)
Return True if the queue is empty, False otherwise (not reliable!).
full(self)
Return True if the queue is full, False otherwise (not reliable!).
get_nowait(self)
Remove and return an item from the queue without blocking.
 
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
join(self)
Blocks until all items in the Queue have been gotten and processed.
 
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
 
When the count of unfinished tasks drops to zero, join() unblocks.
put_nowait(self, item)
Put an item into the queue without blocking.
 
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
qsize(self)
Return the approximate size of the queue (not reliable!).
task_done(self)
Indicate that a formerly enqueued task is complete.
 
Used by Queue consumer threads.  For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
 
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
 
Raises a ValueError if called more times than there were items
placed in the queue.