Source code for covertutils.handlers.multi.multihandler

from covertutils.handlers import BufferingHandler
from covertutils.orchestration import Orchestrator
from covertutils.bridges import SimpleBridge

from time import sleep
from functools import wraps

try :
	from queue import Queue
except ImportError:
	from Queue import Queue

[docs]def handlerCallbackHook( instance, on_chunk_function, orch_id ) : # print( "In the Hook" ) @wraps(on_chunk_function) def wrapper( *args, **kwargs ) : # print( "In the Wrapper" ) stream, message = args pseudo_stream = "%s:%s" % (orch_id, stream) if message : # print( stream, message ) # print args instance.onMessage( pseudo_stream, message ) else : instance.onChunk( pseudo_stream, message ) on_chunk_function( *args, **kwargs ) # Not honoring return values return on_chunk_function return wrapper
[docs]class MultiHandler( BufferingHandler ) : """ A class that aggregates multiple :class:`BaseHandler` parented objects, to support parallel session handling. It supports the standard :meth:`onMessage` API of the original :class:`BaseHandler` objects, as well as methods for dispatching `messages` en-masse. """ class __NullOrchestrator(Orchestrator) : def readyMessage( self, message, stream ) : # print "ready" assert False == True # This is dummy, dead code return "%s:%s" % (stream, message) def depositChunk( self, chunk ) : # print "deposit" assert False == True # This is dummy, dead code stream, message = chunk.split(':',1) return stream, message
[docs] def start(self) : pass
[docs] def nullSend( self, message, stream ) : print "nullSend" ; pass
# def onChunk( self, stream, message ) : pass # def onNotRecognised( self ) : pass
[docs] def __init__( self, handlers, **kw ) : assert type(handlers == list) def send_internal(raw) : print "++++++++Send internal run+++++" assert False == True # This is dummy, dead code def recv_internal() : print "=========recv internal run=======" assert False == True # This is dummy, dead code return None orch = MultiHandler.__NullOrchestrator("", 0) super(MultiHandler, self).__init__(recv_internal, send_internal, orch, **kw) # self.preferred_send = self.nullSend self.handlers = {} for handler in handlers : self.addHandler(handler)
[docs] def resolveStream( self, stream_alias ) : orch_id, stream = stream_alias.split(':',1) handler = self.handlers[orch_id]['handler'] return handler, stream
[docs] def preferred_send( self, message, stream ) : handler, stream = self.resolveStream( stream ) handler.preferred_send( message, stream ) print "RUNI"
[docs] def queueSend( self, message, stream ) : pass
[docs] def dispatch( self, orch_ids, stream, message ) : for orch_id in orch_ids : handler = self.handlers[orch_id]['handler'] handler.preferred_send( message, stream )
[docs] def dispatchTo( self, orch_id, stream, message ) : self.dispatch( [orch_id], stream, message )
[docs] def dispatchAll( self, message, stream = 'control' ) : # Make it look for hard_streams when stream = None for orch_id in self.handlers.keys() : if stream in self.handlers[orch_id]['streams'] : handler = self.handlers[orch_id]['handler'] handler.preferred_send( message, stream )
# # def sendTo( self, orch_id, message, stream = 'control', local = True ) : # Make it look for hard_streams when stream = None # for orch_id_ in self.handlers.keys() : # handler = self.handlers[orch_id]['handler'] # if local : # orch = handler.getOrchestrator() # orch_to_check = orch.getIdentity() # else : # for x,y in zip(orch_id, orch_id_) : # if z # # if orch.checkIdentity(orch_id) == # # if stream in self.handlers[orch_id]['streams'] : # handler = self.handlers[orch_id]['handler'] # handler.preferred_send( message, stream )
[docs] def getHandler(self, orch_id) : return self.handlers[orch_id]['handler']
[docs] def getAllHandlers(self) : return [self.handlers[o_id]['handler'] for o_id in self.handlers.keys()]
[docs] def addStream( self, stream ) : for orch_id in self.getOrchestratorIDs() : self.__add_stream(orch_id, stream)
def __add_stream(self, orch_id, stream) : self.handlers[orch_id]['streams'].append(stream) pseudo_stream = "%s:%s" % (orch_id, stream) # not used self.getOrchestrator().addStream(pseudo_stream)
[docs] def addHandler(self, handler) : orch_id = handler.getOrchestrator().getIdentity() self.handlers[orch_id] = {} self.handlers[orch_id]['streams'] = [] buffered_handler = BufferingHandler.bufferize_handler_obj(handler) # self.handlers[orch_id]['bridge'] = SimpleBridge( self, buffered_handler ) self.handlers[orch_id]['handler'] = buffered_handler buffered_handler.onChunk = handlerCallbackHook( self, buffered_handler.onChunk, orch_id ) for stream in handler.getOrchestrator().getStreams() : # print pseudo_stream self.__add_stream(orch_id, stream)
[docs] def getOrchestratorIDs(self) : return self.handlers.keys()