Source code for vespa.agent_av

# -*- coding: utf-8 -*-
#
# Module name: agent_av.py
# Version:     1.0
# Created:     29/04/2014 by Aurélien Wailly <aurelien.wailly@orange.com>
#
# Copyright (C) 2010-2014 Orange
#
# This file is part of VESPA.
#
# VESPA is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation version 2.1.
#
# VESPA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with VESPA.  If not, see <http://www.gnu.org/licenses/>.
"""
Agent representation
"""
import socket
from .log_pipe import debug1
from .agent import Agent

EOT_FLAG = "EndOfTransmission"
LIST_ITEM_SEPARATOR = ':'
LIST_SEPARATOR = '\r'


[docs]class Agent_AV(Agent): """Create an Agent able to communicate with the ClamAV backend (need a driver). :return: The Agent instance to offer the ClamAV support :rtype: Node """ def __init__(self, name, host, port, master, vm): super(Agent_AV, self).__init__(name, host, port, master, run=False) self.have_backend = True self.is_backend_reachable = True self.backend = vm
[docs] def send(self, msg): """Overload the internal send to capture and send messages to the backend :param str msg: The massage to process and to send :return: The backend response :rtype: str """ command = msg.split("|")[0] # Preprocessing if command == 'import_list': predefined_list = [("lala", "lolo")] for item, status in predefined_list: msg += item + "#" elif command == 'register_handler': msg += '%s#%s#' % (self.host, self.port) # Normal socket connection if self.is_backend_reachable: data = super(Agent_AV, self).send(msg) # Hypervisor sysrq else: if command == 'clean_image': # conn_local = libvirt.open("qemu+ssh://" + self.agent_hy.host # + "/system") # dom = self.agent_hy.__get_dom_name(self.vm, conn_local) # dom.sendKey(8, 10, [ 0x12, 0x42 ], 2, 0) args = (8, 10, [0x12, 0x42], 2, 0) data = self.sendRemote( self.agent_hy, "send_key|%s#%s" % (self.backend, args)) else: data = super(Agent_AV, self).send(msg) # data = [ "Unsupported sysrq method" ] # self.sendRemote(master, data) return data
[docs] def dump_analyzed_file_list(self): """Gather list of files analyzed bi the ClamAV antivirus :return: The list of analyzed files :rtype: list """ agent_vm = self # self.findAgent('agent_av') raw_msg = agent_vm.send("dump_list|") command = raw_msg.split('|')[0] title_list = raw_msg.split('|')[1] file_list_raw = raw_msg.split('|')[2] file_list = [] file_list.append( (title_list.split(LIST_ITEM_SEPARATOR)[0], title_list.split(LIST_ITEM_SEPARATOR)[1])) # It can be an error if LIST_SEPARATOR not in file_list_raw: return [file_list_raw] # Otherwise, it can be a list for scan_result in file_list_raw.split(LIST_SEPARATOR): # It can be an error, last line is messy if ':' not in scan_result: continue name = ":".join( scan_result.split(LIST_ITEM_SEPARATOR)[ 0:2]).strip() status = scan_result.split(LIST_ITEM_SEPARATOR)[2].strip() file_list.append((name, status)) return file_list
[docs] def isolate_warning(self, vm): """Set up the agent for interactions with the hypervisor :param str vm: The tuple (name, host, port) describing the backend """ self.is_backend_reachable = False self.agent_hy = eval(vm)
[docs] def connect_warning(self): """Set up the agent for interactions with the VM """ self.is_backend_reachable = True self.agent_hy = False