Source code for covertutils.handlers.basehandler

from abc import ABCMeta, abstractmethod
# from time import sleep
from threading import Thread

from covertutils.helpers import defaultArgMerging



[docs]class BaseHandler : """ Subclassing this class and overriding its methods automatically creates a threaded handler. """ __metaclass__ = ABCMeta Defaults = { 'start' : True, }
[docs] def __init__( self, recv, send, orchestrator, **kw ) : """ :param `function` recv: A **blocking** function that returns every time a chunk is received. The return type must be raw data, directly fetched from the channel. :param `function` send: A function that takes raw data as argument and sends it across the channel. :param `orchestration.SimpleOrchestrator` orchestrator: An Object that is used to translate raw data to `(stream, message)` tuples. """ # print (kw) arguments = defaultArgMerging( BaseHandler.Defaults, kw ) # print (arguments) self.receive_function = recv self.send_function = send self.orchestrator = orchestrator self.preferred_send = self.sendAdHoc self.to_send_list = [] self.to_send_raw = [] self.__protocolThread = Thread( target = self.__protocolThreadFunction ) self.__protocolThread.daemon = True self.on = True if arguments['start'] : self.start()
[docs] def start( self ) : ''' Starts the thread that consumes data and enables the `on*` callback methods. ''' self.__protocolThread.start()
[docs] def stop( self ) : ''' Stops the handler thread making the data consumer to return (if not blocked). ''' self.on = False
[docs] def queueSend( self, message, stream = None ) : """ :param str message: The message that will be stored for sending upon request. :param str stream: The stream where the message will be sent. """ if stream == None : stream = self.orchestrator.getDefaultStream() self.to_send_list.append( (message, stream) )
[docs] def readifyQueue( self ) : if self.to_send_list : message, stream = self.to_send_list.pop(0) chunks = self.orchestrator.readyMessage( message, stream ) self.to_send_raw.extend( chunks ) return True return False
def __consume( self, stream, message ) : """ :param str stream: The stream that the message is a send. :param str message: The message in plaintext. Empty string if not fully-assembled. :rtype: None """ if stream == None : self.onNotRecognised() return self.onChunk( stream, message ) if message : self.onMessage( stream, message )
[docs] @abstractmethod def onChunk( self, stream, message ) : """ **AbstractMethod** This method runs whenever a new recognised chunk is consumed. :param str stream: The recognised stream that this chunk belongs. :param str message: The message that is contained in this chunk. Empty string if the chunk is not the last of a reassembled message. This method will run even to for chunks that will trigger the `onMessage()` method. To stop that you need to add the above code in the beginning. .. code:: python if message != '' : # meaning that the message is assembled, so onMessage() will run return """ pass
[docs] @abstractmethod def onMessage( self, stream, message ) : """ **AbstractMethod** This method runs whenever a new message is assembled. :param str stream: The recognised stream that this chunk belongs. :param str message: The message that is contained in this chunk. """ pass
[docs] @abstractmethod def onNotRecognised( self ) : """ **AbstractMethod** This method runs whenever a chunk is not recognised. :rtype: None """ pass
[docs] def sendAdHoc( self, message, stream = None, assert_len = 0 ) : """ This method uses the object's `SimpleOrchestrator` instance to send raw data to the other side, throught the specified `Stream`. If `stream` is `None`, the default Orchestrator's stream will be used. :param str message: The `message` send to the other side. :param str stream: The `stream` name that will tag the data. :param int assert_len: Do not send if the chunked message exceeds `assert_len` chunks. :rtype: bool `True` is returned when the message is sent, `False` otherwise. """ if stream == None : stream = self.orchestrator.getDefaultStream() chunks = self.orchestrator.readyMessage( message, stream ) if assert_len != 0 : if len(chunks) > assert_len : return False for chunk in chunks : self.send_function( chunk ) return True
def __protocolThreadFunction( self ) : while self.on : raw_data = self.receive_function() stream, message = self.orchestrator.depositChunk( raw_data ) message_consumer = Thread( target = self.__consume, args = ( stream, message ) ) message_consumer.daemon = True message_consumer.start() # self.__consume( stream, message )
[docs] def getOrchestrator( self ) : """ :rtype: `Orchestrator` :return: Returns the Orchestrator object used to create this `Handler` instance. """ return self.orchestrator
[docs] def reset( self ) : self.orchestrator.reset() self.to_send_list = [] self.to_send_raw = []
[docs] def addStream( self, stream ) : self.orchestrator.addStream( stream ) return stream