''' (c) 2008 Adam Pridgen adam@thecoverofnight.com, The Cover of Night, LLC nessconnect_dmp.py Nessconnect Data Manipulator and Parser Script will parse a nessconnect file and let the person manipulate the data as they see fit this script is meant to be run in an environment like ipython This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''' print '''If used commercially, the tool name and web address must be mentioned in the report along with the following projects. This tool uses data from the following tools: nessconnect http://sourceforge.net/projects/nessconnect/ ''' import sys, codecs from xml.sax import make_parser, handler class NessusFindingBlock: def __init__(self): self.results = {} self.hosts = set() self.hostDetails = {} def addResult(self, result): self.results[result.plugin.name] = result.plugin self.addHosts(result) def addHostDetails(self, desc): if not self.hostDetails.has_key(desc.host): self.hostDetails[desc.host] = set() self.hostDetails[desc.host].add(desc) def addHosts(self, result): for desc in result.descriptions: self.hosts.add(desc.host) class PortInfo: def __init__(self, port): self.portType = '' self.portNum = -1 self.portName = '' self.parsePort(port) def parsePort(self, port): if port.find('general') > -1: self.portNum = 0 self.portName = port.split('/')[0] self.portType = port.split('/')[1] elif port != '': self.portNum = port.split('(')[1].split('/')[0] self.portName = port.split()[0] self.portType = port.split('(')[1].split('/')[1][:-1] else: self.portNum = -1 self.portName = '' self.portType = '' def __str__(self): return "%s (%s/%s)"%(self.portName,self.portNum,self.portType) class HostInfo: def __init__(self, host): self.address = host self.ports = {} self.OS = '' self.NetbiosName = '' self.NetbiosDesc = '' def addPortfromDesc(self, ness_desc): port_info = ness_desc.port if port_info.find('general') > -1: pass elif port_info == '' : pass port = PortInfo(port_info) self.ports[port.portNum] = port def setOSfromDesc(self, ness_desc): os = ness_desc.text.split("Confidence")[0] os = os.split("system :")[1].strip().lstrip() self.OS = os def setOS(self, os): self.OS = os def portOpen(port): if isinstance(port, int): port = str(port) if port in self.ports: return True def setNetbiosInfo(self, name, desc=''): self.NetbiosName = name self.NetbiosDesc = desc def __str__(self): ports = self.ports.keys() ports.sort() s = "Host IP Address: %s\n"%self.address s +="Host OS: %s\n"%self.OS s +="Open Ports:\n%s\n"%"\n".join(str(self.ports[i]) for i in ports) return s class Result: def __init__(self, name, attrs): self.attrs = {"type":'', "plugin-id":''} self.value = '' self.chars = '' self.name = name for i in attrs.keys(): self.check_attr(i, attrs[i]) def check_attr(self, name, value): if name in self.attrs: self.attrs[name] = value def __str__(self): s = "Result Information:\n%s"%"\n".join([ i+": "+j for i,j in self.attrs.iteritems()]) return s class Description: def __init__(self, name, attrs): self.attrs = {"date":'', "host":'', "enabled":'', "port":''} self.text = '' self.name = name for i in attrs.keys(): self.check_attr(i, attrs[i]) self.date = self.attrs['date'] self.host = self.attrs['host'] self.enabled = self.attrs['enabled'] self.port = self.attrs["port"] if self.attrs["port"].find('general') > -1: self.portNum = 0 self.portName = self.attrs["port"].split('/')[0] self.portType = self.attrs["port"].split('/')[1] elif self.attrs["port"] != '': self.portNum = self.attrs["port"].split('(')[1].split('/')[0] self.portName = self.attrs["port"].split()[0] t = self.attrs["port"].split('(')[1] if len(t.split('/')) > 1: self.portType = self.attrs["port"].split('(')[1].split('/')[1].strip().lstrip()[:-1] else: self.portType = self.attrs["port"].split('(')[1].strip().lstrip()[:-1] else: self.portNum = -1 self.portName = '' self.portType = '' def check_attr(self, name, value): if name in self.attrs: self.attrs[name] = value def __str__(self): s = "+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n" s += "%s\n"%"\n".join([ i+":\t\t"+j for i,j in self.attrs.iteritems()]) s += "\n%s\n"%self.text s += "+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n" return s class Results: def __init__(self, name, attrs): self.attrs = {"disabled":'', "name":'', "host":'', "total":'', "port":'', "value":'', "enabled":'', "port":'', "value":''} self.name = name for i in attrs.keys(): self.check_attr(i, attrs[i]) def check_attr(self, name, value): if name in self.attrs: self.attrs[name] = value def __str__(self): s = "Results Name: %s\n"%self.name s += "Results Info:\n%s\n"%"\n".join([ i+":"+j for i,j in self.attrs.iteritems()]) return s class Plugin: def __init__(self, name, attrs): self.attrs = {"category":'', "name":'', "family":'', "enabled":'', "bugtraq":'', "version":'', "cve":'', "id":'', "risk":''} self.summary = '' self.description = '' self.copyright = '' self.name = name for i in attrs.keys(): self.check_attr(i, attrs[i]) self.category = self.attrs['category'] self.name = self.attrs['name'] self.family = self.attrs['family'] self.enabled = self.attrs['enabled'] self.bugtraq = self.attrs['bugtraq'] self.version = self.attrs['version'] self.cve = self.attrs['cve'] self.id = self.attrs['id'] self.risk = self.attrs['risk'] def check_attr(self, name, value): if name in self.attrs: self.attrs[name] = value def __str__(self): s = "Plugin Name: %s\n"%self.name s += "Summary: %s\n"%self.summary s += "Description: \n%s\n"%self.description s += "Info:\n%s\n"%"\n".join([ i+":\t\t"+j for i,j in self.attrs.iteritems()]) s += "Copyright: %s\n"%self.copyright return s class NessconnectResult: def __init__(self): self.result = None self.value = '' self.descriptions = [] self.hosts = {} self.text = '' self.summary = None self.plugin = None def addResult(self, name, attrs): if not self.result is None: print "Diagnostic problem, looks like there can be more than one result for the nessconnect entity" self.result = Result(name, attrs) def addDescription(self, name, attrs): self.descriptions.append(Description(name, attrs)) def addPlugin(self, name, attrs): if not self.plugin is None: print "Diagnostic problem, looks like there can be more than one plugin for the nessconnect entity" self.plugin = Plugin(name, attrs) def addPluginCopyright(self, content): self.plugin.copyright += content def addPluginDesc(self, content): self.plugin.description += content def addPluginSummary(self, content): self.plugin.summary += content def addResultDescription(self, content): self.descriptions[-1].text += content def updateDescriptions(self): if self.plugin.name: for d in self.descriptions: d.name = self.plugin.name self.hosts[d.host] = d def updateHosts(self): for d in self.descriptions: self.hosts[d.host] = d def __str__(self): s = "Nessconnect Result:\n%s\n"%str(self.result) s += "Summary: %s\n"%self.summary s += "Value: %s\n"%self.value s += "Plugin:\n%s\n"%str(self.plugin) s += "Description Instances\n%s\n"%"\n".join([str(i) for i in self.descriptions]) return s def build_host_kb(nparser_results): ''' This function takes a nessconnect parsed data set and produces a knowledge base of scanned hosts. ''' host_info = {} os_id_result = None for result in nparser_results: if result.plugin.name == u'OS Identification' or\ result.plugin.name == 'OS Identification': os_id_result = result break if os_id_result is None: print 'Could not find the "OS Identification" result' for desc in os_id_result.descriptions: h = HostInfo(desc.host) h.setOSfromDesc(desc) host_info[h.address] = h ports_plugin = None for result in nparser_results: if result.plugin.name != "Ports" and\ result.plugin.name != u"Ports": continue ports_plugin = result break for d in ports_plugin.descriptions: if host_info.has_key(d.host): host_info[d.host].addPortfromDesc(d) continue h = HostInfo(d.host) h.addPortfromDesc(d) h.setOS("Host OS Unknown") print "%s was not identified by the scanner"%h.address host_info[d.host] = h return host_info def get_ips_from_list(string): ''' This function takes a new-line delimited list of IPs and puts them into a python set ''' s = set() for i in p.split('\n'): s.add(i.split()[0]) return s def findHostsbyDescKeyword(string, results): ''' This function takes a string and parsed nessconnect results, searches each result elements plugin description (not the host specific description) for the keyword string, and it returns a set of hosts that contain the word in the description. ''' r = set() for result in results: if result.plugin.description.find(string) > -1: print result.plugin.description for i in result.descriptions: r.add(i.host) return r def consolidatebyDescKeyword(string, results): ''' This function takes a string and parsed nessconnect results, searches each result elements plugin dnd host escription for the keyword string, and it returns a NessusFindingBlock that contain the keyword. ''' r = NessusFindingBlock() for result in results: if result.plugin.description.find(string) > -1: r.addResult(result) elif result.plugin.name.find(string) > -1: r.addResult(result) return r def searchDetailedDesc(string, results): ''' This function takes a string and parsed nessconnect results, searches each result elements host description for the keyword string, and it returns a NessusFindingBlock results that contain the keyword. ''' r = NessusFindingBlock() for result in results: for desc in result.descriptions: if desc.text.find(string) > -1: r.addResult(result) r.addHostDetails(desc) return r def dump_host_info(host_info): ''' Dump host information from a dictionary of {ip_addr_str:HostInfo ...} ''' hosts = host_info.keys() hosts.sort() print "Dumping host information." for host in hosts: print host_info[host] print "+"*30, '\n' def addHostsToKB(kb, nparser_results): ''' Add a list of [Results, ... ] to an existing knowledge base from a nessconnect scan ''' host = build_host_kb(nparser_results) for ip in host.keys(): if ip in kb: continue kb[ip] = host[ip] class NessconnectParser(handler.ContentHandler): def __init__(self, Results = None): self.Results = [] self.current_result = None self.nessconn = None self.current_elem_type = [] def startElement(self, name, attrs): if self.nessconn == None: self.nessconn = NessconnectResult() self.current_elem_type.append(name) if name == "plugin": self.nessconn.addPlugin(name, attrs) if name == "results": r = Results(name, attrs) #self.print_attrs(name,attrs) elif name == "summary": pass elif name == "description": if len(self.current_elem_type) > 1 and \ self.current_elem_type[-2] == 'plugin': pass elif len(self.current_elem_type) > 1 and \ self.current_elem_type[-1] == 'description': if self.current_elem_type[-2] == 'description': self.current_elem_type.pop() self.nessconn.addDescription(name, attrs) elif name == "copyright": pass elif name == "result": self.nessconn.addResult(name, attrs) else: pass def print_attrs(self, name, attrs): #print "Dumping %s"%name#, attrs.keys() for name in attrs.keys(): #self.Hosts._attr_types[name] = self.Hosts._attr_types.get(name, 0) + 1 #print name, attrs[name] if attrs[name].find("User") > 0: print name, attrs[name] elif attrs[name].lower().find("group") > 0: print name, attrs[name] #print "End of the dump." def characters(self, content): if len(self.current_elem_type) > 0 and \ self.current_elem_type[-1] == "summary" and \ len(self.current_elem_type) > 1 and \ self.current_elem_type[-2] == 'plugin': self.nessconn.addPluginSummary(content) elif len(self.current_elem_type) > 0 and \ self.current_elem_type[-1] == "description" and \ len(self.current_elem_type) > 1 and \ self.current_elem_type[-2] == 'plugin': self.nessconn.addPluginDesc(content) elif len(self.current_elem_type) > 0 and \ self.current_elem_type[-1] == "copyright": self.nessconn.addPluginCopyright(content) elif len(self.current_elem_type) > 0 and \ self.current_elem_type[-1] == "description": self.nessconn.addResultDescription(content) else:pass def endElement(self, name): o = None if len(self.current_elem_type) != 0: o = self.current_elem_type.pop() if len(self.current_elem_type) > 0 and \ self.current_elem_type[-1] == "description" and \ len(self.current_elem_type) > 1 and \ self.current_elem_type[-2] == 'result': self.current_elem_type[-2].summary = content if o == 'result': self.nessconn.updateDescriptions() self.Results.append(self.nessconn) self.nessconn = None def endDocument(self): print "Finished Parsing the file." def parseMultipleFiles(files): ''' parse multiple files. files is a list [ filenames, ...] ''' allResults = [] for i in files: p = make_parser() nparser = NessconnectParser() p.setContentHandler(nparser) p.parse(i) for i in nparser.Results: allResults.append(i) return allResults def create_port_csv_summary(ports): ''' Takes in a dictionary {portNumStr:PortInfo,...} returns a string the surrounded by quotes and delimited with '\r'. The string include the port number, description, and type (e.g. udp, tcp) ''' port_infos = [] for p,pi in ports.items(): port_infos.append(p+u" "+pi.portName+u" "+pi.portType) return u'"'+u"\r".join(port_infos)+u'"' def create_vuln_csv_summary(summary): ''' Takes in a list of [[Plugin, Description],...] returns a string the surrounded by quotes and delimited with '\r'. The string include the vulnerability risk and name description ''' desc_infos = [] for d in summary: desc_infos.append(u" ".join([d[0].risk, d[0].name])) return u'"'+u"\r".join(desc_infos)+u'"' def create_host_vuln_csv_summary(ip, results): ''' Takes in an IP and a list of NessconnectParser.Results returns a list of [(Plugin, Description)] This allows a vulnerability knowledge base to be built around the host ''' summary = [] for result in results: if ip in result.hosts: summary.append((result.plugin,result.hosts[ip])) return summary def updateResultsNetbios(hostInfo, netbios_infos): ''' Takes in a dictionary of host_info {'ip':HostInfo} and a list of [[ip,netbios name, netbios desc,...] ...] returns a list of [[ip,netbios name, netbios desc]] This function MODIFIES the hostInfo dictionary and updates the Netbios Name and Netbios Description ''' unscanned_hosts = [] for i in netbios_data: if not i[0] in hostInfo: print u"Host not in the NessusScan Results: ",i[0].encode("utf-8"),i[1],i[2] unscanned_hosts.append(i) continue hostInfo[i[0]].setNetbiosInfo(i[1], i[2]) return unscanned_hosts def create_hostinfo_csv_summary(host_info, nparser_results): ''' Takes in a dictionary of host_info {'ip':HostInfo} returns a csv string containing: Host IP Address Netbios Name Open Ports Operating System Vulnerability Count Identified Vulnerabilitys (Risk Vulnerability Name ) Netbios Desc ''' host_ips = host_info.keys() csv_data = '' for ip in host_ips: ports = create_port_csv_summary(host_info[ip].ports) vuln_summary = create_host_vuln_csv_summary(ip, nparser_results) vuln_count = str(len(vuln_summary)).decode("utf-8") summaries = create_vuln_csv_summary(vuln_summary) data = [host_info[ip].address, host_info[ip].NetbiosName, ports, u'"'+host_info[ip].OS.replace(u"\n",u"\r")+u'"', vuln_count, summaries, host_info[ip].NetbiosDesc] csv_data += u",".join(data)+u'\n' return csv_data def create_csv(host_info, results, netbios_data = None): ''' Takes in a dictionary of host_info {'ip':HostInfo} a list of Results [Results, ...] a list of list of strings [IPAddress, NetbiosName, NetbiosDesc ] returns a utf-8 csv string containing a label, the scan data, and unscanned hosts from the netbios info: Host IP Address Netbios Name Open Ports Operating System Vulnerability Count Identified Vulnerabilitys (Risk Vulnerability Name ) Netbios Desc ''' csv_data = u"Host IP Address, Netbios Name, Open Ports, Operating System, Vulnerability Count, Identified Vulnerabilitys (Plugin Name - Risk-ID), Netbios Desc\n" unscanned_hosts = [] if netbios_data: unscanned_hosts = updateResultsNetbios(host_info, netbios_data) # create csv's for the report scanned_host_summary = create_hostinfo_csv_summary(host_info, results) unscanned_host_summary = '' for host in unscanned_hosts: data = [host[0], host[1], u'Host Scan Required',u"OS UNKNOWN (Not scanned)", u'0', u"Not Present During Scan" , host[2]] unscanned_host_summary += u",".join(data)+u'\n' return csv_data+scanned_host_summary+unscanned_host_summary if __name__ == "__main__": if len(sys.argv) > 1: files = sys.argv[1:] else: print "You did not supply any nessconnect results" sys.exit() # Change nb_data to the file name nb_data = None netbios_data = None if nb_data: netbios_data = codecs.open(nb_data, encoding='utf-8').readlines() netbios_data = [i.split(',') for i in netbios_data if i.split(',')[0] != ''] results = parseMultipleFiles(files) host_info = build_host_kb(results) dump_host_info(host_info) csv = create_csv(host_info, results, netbios_data) # Out file can be changer to what ever f = codecs.open("outfile.csv", 'w', encoding='utf-8') f.write(csv) f.close()