new file mode 100755
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3
+
+from nvd_api_v2 import NVD_API
+
+
+class CPE_ID:
+ @staticmethod
+ def matches(cpe1, cpe2):
+ """Check if two CPE IDs match each other"""
+ cpe1_elems = cpe1.split(":")
+ cpe2_elems = cpe2.split(":")
+
+ remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
+ zip(cpe1_elems, cpe2_elems))
+ return len(list(remains)) == 0
+
+ @staticmethod
+ def product(cpe):
+ return cpe.split(':')[4]
+
+ @staticmethod
+ def version(cpe):
+ return cpe.split(':')[5]
+
+ @staticmethod
+ def no_version(cpe):
+ return ":".join(cpe.split(":")[:5])
+
+
+class CPE_API(NVD_API):
+ def __init__(self, nvd_path):
+ NVD_API.__init__(self, nvd_path, 'CPEs', 'nvdcpe')
+ self.cpes = list()
+ self.cpes_without_version = dict()
+
+ def init_db(self):
+ cursor = self.connection.cursor()
+
+ cursor.execute('CREATE TABLE IF NOT EXISTS products ( \
+ id TEXT UNIQUE, \
+ name TEXT)')
+
+ cursor.close()
+
+ def save_to_db(self, start_index, total_results, content):
+ cpe_ids_dropped = list()
+ products = list()
+
+ for product in content['products']:
+ if product['cpe']['deprecated']:
+ cpe_ids_dropped.append((product['cpe']['cpeNameId'],))
+ continue
+
+ cpe = product['cpe']
+
+ products.append([cpe['cpeNameId'], cpe['cpeName']])
+
+ cursor = self.connection.cursor()
+
+ # Drop all CPEs that are deprecated, status might have changed
+ cursor.executemany('DELETE FROM products WHERE id = ?', cpe_ids_dropped)
+ cursor.executemany('INSERT OR REPLACE INTO products VALUES (?, ?)', products)
+
+ print("[%07d/%07d]" % (start_index, total_results))
+
+ return True
+
+ def load_ids(self):
+ self.check_for_updates()
+
+ self.connection = self.open_db()
+ cursor = self.connection.cursor()
+
+ ids = list()
+ for row in cursor.execute('SELECT name FROM products'):
+ ids.append(row[0])
+
+ cursor.close()
+ self.connection.close()
+
+ self.cpes = ids
+ return ids
+
+ def generate_partials(self):
+ self.cpes_without_version = dict()
+ for cpe in self.cpes:
+ self.cpes_without_version[CPE_ID.no_version(cpe)] = cpe
+
+ def find_partial(self, cpe_id):
+ cpe_id_without_version = CPE_ID.no_version(cpe_id)
+ if cpe_id_without_version in self.cpes_without_version.keys():
+ return self.cpes_without_version[cpe_id_without_version]
@@ -20,24 +20,7 @@
import distutils.version
import operator
from nvd_api_v2 import NVD_API
-
-
-# Check if two CPE IDs match each other
-def cpe_matches(cpe1, cpe2):
- cpe1_elems = cpe1.split(":")
- cpe2_elems = cpe2.split(":")
-
- remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
- zip(cpe1_elems, cpe2_elems))
- return len(list(remains)) == 0
-
-
-def cpe_product(cpe):
- return cpe.split(':')[4]
-
-
-def cpe_version(cpe):
- return cpe.split(':')[5]
+from cpe import CPE_ID
class CVE:
@@ -71,7 +54,7 @@ class CVE:
@property
def affected_product(self):
"""Name of the affected product"""
- return cpe_product(self.match_criteria)
+ return CPE_ID.product(self.match_criteria)
def affects(self, name, version, cve_ignore_list, cpeid=None):
"""
@@ -93,9 +76,9 @@ class CVE:
# version, as they might be different due to
# <pkg>_CPE_ID_VERSION
else:
- pkg_version = distutils.version.LooseVersion(cpe_version(cpeid))
+ pkg_version = distutils.version.LooseVersion(CPE_ID.version(cpeid))
- if not cpe_matches(self.match_criteria, cpeid):
+ if not CPE_ID.matches(self.match_criteria, cpeid):
return self.CVE_DOESNT_AFFECT
if not self.v_start and not self.v_end:
return self.CVE_AFFECTS
@@ -189,8 +172,8 @@ class CVE_API(NVD_API):
def extract_cpe_match_data(self, cpe_match):
"""Map CPE match information to database fields."""
- product = cpe_product(cpe_match['criteria'])
- version = cpe_version(cpe_match['criteria'])
+ product = CPE_ID.product(cpe_match['criteria'])
+ version = CPE_ID.version(cpe_match['criteria'])
# ignore when product is '-', which means N/A
if product == '-':
return
@@ -28,11 +28,8 @@ import re
import subprocess
import json
import sys
-import time
-import gzip
-import xml.etree.ElementTree
-import requests
-from cve import CVE_API, cpe_product
+from cpe import CPE_API, CPE_ID
+from cve import CVE_API
brpath = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".."))
@@ -41,7 +38,6 @@ from getdeveloperlib import parse_developers # noqa: E402
INFRA_RE = re.compile(r"\$\(eval \$\(([a-z-]*)-package\)\)")
URL_RE = re.compile(r"\s*https?://\S*\s*$")
-CPEDB_URL = "https://static.nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz"
RM_API_STATUS_ERROR = 1
RM_API_STATUS_FOUND_BY_DISTRO = 2
@@ -634,7 +630,7 @@ def check_package_cves(nvd_path, packages):
pkg.status['cve'] = ("na", "no version information available")
continue
if pkg.cpeid:
- product = cpe_product(pkg.cpeid)
+ product = CPE_ID.product(pkg.cpeid)
cpe_product_pkgs[product].append(pkg)
else:
cpe_product_pkgs[pkg.name].append(pkg)
@@ -652,37 +648,8 @@ def check_package_cves(nvd_path, packages):
def check_package_cpes(nvd_path, packages):
- class CpeXmlParser:
- cpes = []
-
- def start(self, tag, attrib):
- if tag == "{http://scap.nist.gov/schema/cpe-extension/2.3}cpe23-item":
- self.cpes.append(attrib['name'])
-
- def close(self):
- return self.cpes
-
- print("CPE: Setting up NIST dictionary")
- if not os.path.exists(os.path.join(nvd_path, "cpe")):
- os.makedirs(os.path.join(nvd_path, "cpe"))
-
- cpe_dict_local = os.path.join(nvd_path, "cpe", os.path.basename(CPEDB_URL))
- if not os.path.exists(cpe_dict_local) or os.stat(cpe_dict_local).st_mtime < time.time() - 86400:
- print("CPE: Fetching xml manifest from [" + CPEDB_URL + "]")
- cpe_dict = requests.get(CPEDB_URL)
- open(cpe_dict_local, "wb").write(cpe_dict.content)
-
- print("CPE: Unzipping xml manifest...")
- nist_cpe_file = gzip.GzipFile(fileobj=open(cpe_dict_local, 'rb'))
-
- parser = xml.etree.ElementTree.XMLParser(target=CpeXmlParser())
- while True:
- c = nist_cpe_file.read(1024*1024)
- if not c:
- break
- parser.feed(c)
- cpes = parser.close()
-
+ cpe_api = CPE_API(nvd_path)
+ cpes = cpe_api.load_ids()
for p in packages:
if not p.cpeid:
continue