Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

Commit

Permalink
Merge pull request #451 from BlBana/master
Browse files Browse the repository at this point in the history
modified test cases
  • Loading branch information
FeeiCN authored Aug 14, 2017
2 parents fb21d1a + d22acd7 commit 316c891
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 27 deletions.
37 changes: 22 additions & 15 deletions cobra/cve_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
import datetime
import os
import requests
import urllib
import threading
import gzip
import xml.etree.cElementTree as eT
import multiprocessing
from .config import project_directory, Config, config_path
from .log import logger
from .dependencies import Dependencies

try:
from urllib import urlretrieve # Python2
except ImportError:
from urllib.request import urlretrieve # Python3

try:
from configparser import ConfigParser
except ImportError:
Expand Down Expand Up @@ -158,7 +163,8 @@ def rule_xml(self):
rule_path = project_directory + '/rules/CVI-999'
tree.write(rule_path + str(self.year)[1:] + '.xml')
endtime = datetime.datetime.now()
logger.info('CVE-999' + str(self.year)[1:] + '.xml Rule update succeeds, times:%ds' % (endtime - starttime).seconds)
logger.info(
'CVE-999' + str(self.year)[1:] + '.xml Rule update succeeds, times:%ds' % (endtime - starttime).seconds)

def pretty(self, e, level=0):
"""
Expand Down Expand Up @@ -234,7 +240,7 @@ def log_result(self):
for cve_child in self._scan_result[module_]:
cve_id = cve_child
level = self._scan_result[module_][cve_id]
logger.warning('Find the module ' + module_ + ' have ' + cve_id +',level: ' +level)
logger.warning('Find the module ' + module_ + ' have ' + cve_id + ',level: ' + level)
count = len(self._scan_result[module_])
logger.warning('The ' + module_ + ' module have ' + str(count) + ' CVE Vul(s)')

Expand All @@ -247,13 +253,13 @@ def rule_parse():
gz_files = download_rule_gz()
un_gz(gz_files)
pool = multiprocessing.Pool()
for year in range(2002, datetime.datetime.now().year+1):
for year in range(2002, datetime.datetime.now().year + 1):
cve_xml = "../rules/%d.xml" % year
pool.apply_async(rule_single, args=(cve_xml, year))
pool.close()
pool.join()
for year in range(2002, datetime.datetime.now().year+1):
os.remove(project_directory+"/rules/%d.xml" % year)
for year in range(2002, datetime.datetime.now().year + 1):
os.remove(project_directory + "/rules/%d.xml" % year)
logger.info("The rule update success, start scan cve vuls")
return True
else:
Expand All @@ -264,18 +270,19 @@ def download_rule_gz():
threads = []
files = []
start_time = datetime.datetime.now()
for year in range(2002, datetime.datetime.now().year+1):
for year in range(2002, datetime.datetime.now().year + 1):
url = "https://static.nvd.nist.gov/feeds/xml/cve/2.0/nvdcve-2.0-" + str(year) + ".xml.gz"
logger.info("start download " + str(year) + ".xml.gz")
thread = threading.Thread(target=urllib.urlretrieve, args=(url, project_directory+"/rules/"+str(year)+".xml.gz"))
thread = threading.Thread(target=urlretrieve,
args=(url, project_directory + "/rules/" + str(year) + ".xml.gz"))
thread.start()
threads.append(thread)
logger.info('CVE-' + str(year) + " is download success")
files.append(project_directory+"/rules/" + str(year) + ".xml.gz")
files.append(project_directory + "/rules/" + str(year) + ".xml.gz")
for t in threads:
t.join()
end_time = datetime.datetime.now()
logger.info("All CVE xml file already download success, use time:%ds" % (end_time-start_time).seconds)
logger.info("All CVE xml file already download success, use time:%ds" % (end_time - start_time).seconds)
return files


Expand All @@ -286,11 +293,11 @@ def un_gz(gz_files):
for gz_file in gz_files:
f_name = gz_file.replace(".gz", "")
g_file = gzip.GzipFile(gz_file)
open(f_name, "w+").write(g_file.read())
open(f_name, "wb+").write(g_file.read())
g_file.close()
os.remove(gz_file)
end_time = datetime.datetime.now()
logger.info("Decompress success, use time:%ds" % (end_time-start_time).seconds)
logger.info("Decompress success, use time:%ds" % (end_time - start_time).seconds)
return True


Expand All @@ -300,9 +307,9 @@ def rule_single(target_directory, year):

def is_update():
url = "https://static.nvd.nist.gov/feeds/xml/cve/2.0/nvdcve-2.0-modified.meta"
r = requests.get(url)
r = requests.get(url, verify=False)
index = r.text.find('sha256:')
sha256_now = r.text[index+7:].strip()
sha256_now = r.text[index + 7:].strip()
sha256_local = Config(level1='cve', level2='modified').value
if sha256_local != sha256_now:
logger.info("The CVE Rule already update, start update local rule")
Expand All @@ -314,7 +321,7 @@ def is_update():
config.write(fi)
fi.close()
except IOError as e:
logger.warning(e.message)
logger.warning(e)
logger.info("The sha256 been update")
return True
return False
Expand Down
2 changes: 2 additions & 0 deletions cobra/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def md5(content):
:param content:
:return:
"""
content = content.encode('utf8')
return hashlib.md5(content).hexdigest()


Expand Down Expand Up @@ -243,6 +244,7 @@ def path_to_short(path, max_length=36):
return path
paths = path.split('/')
paths = filter(None, paths)
paths = list(paths)
tmp_path = ''
for i in range(0, len(paths)):
logger.debug((i, str(paths[i]), str(paths[len(paths) - i - 1])))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cve_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from cobra.cve_parse import *
from cobra.cve_parse import CveParse, project_directory
try:
from configparser import ConfigParser
from configparser import ConfigParser, NoSectionError
except ImportError:
from ConfigParser import ConfigParser, NoSectionError

Expand Down
18 changes: 7 additions & 11 deletions tests/test_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_get_dict():
extension = ['php', 'js', 'java']
type_num = {}
type_num = detection.get_dict(extension, type_num)
assert isinstance(extension, type(type_num.keys()))
print(type(type_num))
assert type_num['php']['blank'] == 0


def test_project_information():
Expand All @@ -61,37 +62,32 @@ def test_project_information():

def test_count_py_line():
count = Detection.count_py_line(examples_path+'/cloc.py')
type_ = count.keys()
type_count = ['count_blank', 'count_code', 'count_pound']
assert type_ == type_count
assert count['count_code'] == 5


def test_count_php_line():
count = Detection.count_php_line(examples_path+'/cloc.php')
type_ = count.keys()
type_count = ['count_blank', 'count_code', 'count_pound']
assert type_ == type_count
assert count['count_code'] == 2


def test_count_java_line():
count = Detection.count_java_line(examples_path+'/cloc.java')
type_ = count.keys()
type_count = ['count_blank', 'count_code', 'count_pound']
assert type_ == type_count
assert count['count_code'] == 1


def test_count_html_line():
count = Detection.count_html_line(examples_path+'/cloc.html')
type_ = count.keys()
type_count = ['count_blank', 'count_code', 'count_pound']
assert type_ == type_count
assert count['count_code'] == 9


def test_count_data_line():
count = Detection.count_data_line(examples_path+'/param_xml.xml')
type_ = count.keys()
type_count = ['count_blank', 'count_code', 'count_pound']
assert type_ == type_count
assert count['count_code'] == 81


def test_countnum():
Expand Down

0 comments on commit 316c891

Please sign in to comment.