Source code for covertutils.datamanipulation.chunker

from covertutils.exceptions import *

from os import urandom


[docs]class Chunker : """ The Chunker class is used to initialize chunk and de-chunk messages. """ __has_more_tag = '\x00'
[docs] def __init__( self, chunk_length, dechunk_length = None, reverse = False ) : """ :param int chunk_length: This parameter defines the size of the output chunks, containing tagging. :param int dechunk_length: This parameter defines the size of the input chunks, containing tagging. :param bool reverse: If `True` the `chunk_length` and `dechunk_length` are swapped. Useful when setting up 2 instances that have to match. """ self.in_length = dechunk_length if reverse : self.out_length, self.in_length = self.in_length, chunk_length self.tag_length = 1 self.out_length = chunk_length - self.tag_length self.__message = ''
[docs] def chunkMessage( self, payload ) : """ :param str payload: The raw data to be chunked in bytes. :rtype: list :return: A list of chunks containing the chunked `payload`. """ chunk_size = self.out_length chunks = [] for i in xrange(0, len( payload ), chunk_size) : chunk = payload[i:i + chunk_size] tag = self.__has_more_tag last_iteration = i + chunk_size >= len(payload) if last_iteration : padding_length = chunk_size - len(chunk) padding = urandom( padding_length ) tag = chr( len(chunk) ) # print "%s - Length %d, padding %d" % (chunk.encode('hex'), len(chunk), len(padding)) chunk = chunk + padding tagged_chunk = self.__tagChunk( chunk, tag ) chunks.append( tagged_chunk ) return chunks
[docs] def deChunkMessage( self, chunk, ret_chunk = False ) : """ :param str chunk: A part of a chunked message to be assembled. :rtype: tuple :return: The method return a tuple of (status, message). If status is `False` the provided chunk isn't the last part of the message and the message contains an empty string. Else, the assembled message can be found in `message`. """ tag, chunk = self.__dissectTag( chunk ) # print tag is_last = tag != self.__has_more_tag if tag == '' : raise InvalidChunkException("Empty tag supplied to the deChunk method") if is_last : data_left = ord( tag ) chunk = chunk [: data_left] self.__message += chunk if is_last : ret = self.__message self.reset() return True, ret if ret_chunk : return False, chunk return None, None
def __dissectTags( self, chunks ) : return [ self.__dissectTag( chunk ) for chunk in chunks ] def __tagChunk( self, chunk, tag ) : return tag + chunk def __dissectTag( self, chunk ) : return chunk [:self.tag_length], chunk[self.tag_length: ]
[docs] def reset( self ) : """ Resets all partially assembled messages. """ self.__message = ''
[docs] def chunkMessageToStr( self, payload ) : return ''.join(chunkMessage( payload ))