mirror of
https://github.com/PartialVolume/shredos.x86_64.git
synced 2026-02-20 17:42:10 +00:00
330 lines
11 KiB
Plaintext
330 lines
11 KiB
Plaintext
|
|
#!/usr/bin/env python3
|
||
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
||
|
|
#
|
||
|
|
# Enriches the input CycloneDX SBOM with vulnerability information from the NVD
|
||
|
|
# database.
|
||
|
|
#
|
||
|
|
# The NVD database is cloned using a mirror of it and the content is compared
|
||
|
|
# locally.
|
||
|
|
#
|
||
|
|
# Example usage:
|
||
|
|
# $ make show-info | utils/generate-cyclonedx | support/script/cve-check --nvd-path dl/buildroot-nvd/
|
||
|
|
from collections import defaultdict
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import TypedDict
|
||
|
|
import argparse
|
||
|
|
import sys
|
||
|
|
import json
|
||
|
|
|
||
|
|
import cve as cvecheck
|
||
|
|
|
||
|
|
|
||
|
|
class Options(TypedDict, total=True):
|
||
|
|
include_resolved: bool
|
||
|
|
|
||
|
|
|
||
|
|
DESCRIPTION = """
|
||
|
|
Enriches the input CycloneDX SBOM with vulnerability information from the NVD
|
||
|
|
database.
|
||
|
|
|
||
|
|
The NVD database is cloned using a mirror of it and the content is compared
|
||
|
|
locally.
|
||
|
|
"""
|
||
|
|
|
||
|
|
|
||
|
|
brpath = Path(__file__).parent.parent.parent
|
||
|
|
|
||
|
|
|
||
|
|
def cve_api_get_lang_from_list(values, lang="en") -> (str | None):
|
||
|
|
for x in values:
|
||
|
|
if x.get("lang") == lang:
|
||
|
|
return x.get("value")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def nvd_cve_weaknesses_to_cdx(weaknesses) -> list[int]:
|
||
|
|
"""
|
||
|
|
See the CycloneDX specification for 'cwes' [1]
|
||
|
|
|
||
|
|
[1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities_items_cwes
|
||
|
|
"""
|
||
|
|
res = []
|
||
|
|
|
||
|
|
for node in weaknesses:
|
||
|
|
value = cve_api_get_lang_from_list(node.get("description", []))
|
||
|
|
if value is None:
|
||
|
|
continue
|
||
|
|
|
||
|
|
cwe = value.replace("CWE-", "")
|
||
|
|
|
||
|
|
if not cwe.isnumeric():
|
||
|
|
continue
|
||
|
|
res.append(int(cwe))
|
||
|
|
|
||
|
|
return res
|
||
|
|
|
||
|
|
|
||
|
|
def nvd_cve_cvss_to_cdx(metrics):
|
||
|
|
"""
|
||
|
|
See the CycloneDX specification for 'ratings' [1]
|
||
|
|
|
||
|
|
[1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities_items_ratings
|
||
|
|
"""
|
||
|
|
|
||
|
|
KEY_METHOD_DICT = {
|
||
|
|
"cvssMetricV40": "CVSSv4",
|
||
|
|
"cvssMetricV31": "CVSSv31",
|
||
|
|
"cvssMetricV3": "CVSSv3",
|
||
|
|
"cvssMetricV2": "CVSSv2"
|
||
|
|
}
|
||
|
|
|
||
|
|
res = []
|
||
|
|
|
||
|
|
for key, values in metrics.items():
|
||
|
|
for value in values:
|
||
|
|
data = value.get("cvssData", {})
|
||
|
|
res.append({
|
||
|
|
"method": KEY_METHOD_DICT.get(key, "other"),
|
||
|
|
**({
|
||
|
|
"score": data["baseScore"],
|
||
|
|
} if "baseScore" in data else {}),
|
||
|
|
**({
|
||
|
|
"severity": data["baseSeverity"].lower(),
|
||
|
|
} if "baseSeverity" in data else {}),
|
||
|
|
**({
|
||
|
|
"vector": data["vectorString"],
|
||
|
|
} if "vectorString" in data else {}),
|
||
|
|
})
|
||
|
|
|
||
|
|
return res
|
||
|
|
|
||
|
|
|
||
|
|
def nvd_cve_references_to_cdx(references):
|
||
|
|
advisories = []
|
||
|
|
|
||
|
|
for ref in references:
|
||
|
|
if not {"url", "tags"}.issubset(ref):
|
||
|
|
continue
|
||
|
|
|
||
|
|
tags = ref["tags"]
|
||
|
|
if not isinstance(tags, list) or len(tags) == 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
advisories.append({
|
||
|
|
"title": next((t for t in tags if "Advisory" not in t), tags[0]),
|
||
|
|
"url": ref["url"]
|
||
|
|
})
|
||
|
|
|
||
|
|
return advisories
|
||
|
|
|
||
|
|
|
||
|
|
def nvd_cve_to_cdx_vulnerability(nvd_cve):
|
||
|
|
"""
|
||
|
|
Turns the CVE object fetched from the NVD API into a CycloneDX
|
||
|
|
vulnerability that fits the spec (see [1]).
|
||
|
|
|
||
|
|
[1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities
|
||
|
|
"""
|
||
|
|
vulnerability = {
|
||
|
|
"bom-ref": nvd_cve["id"],
|
||
|
|
"id": nvd_cve["id"],
|
||
|
|
"description": cve_api_get_lang_from_list(nvd_cve.get("descriptions", [])) or "",
|
||
|
|
"source": {
|
||
|
|
"name": "NVD",
|
||
|
|
"url": "https://nvd.nist.gov/"
|
||
|
|
},
|
||
|
|
**({
|
||
|
|
"published": nvd_cve["published"],
|
||
|
|
} if "published" in nvd_cve else {}),
|
||
|
|
**({
|
||
|
|
"updated": nvd_cve["lastModified"],
|
||
|
|
} if "lastModified" in nvd_cve else {}),
|
||
|
|
**({
|
||
|
|
"cwes": nvd_cve_weaknesses_to_cdx(nvd_cve["weaknesses"]),
|
||
|
|
} if "weaknesses" in nvd_cve else {}),
|
||
|
|
**({
|
||
|
|
"ratings": nvd_cve_cvss_to_cdx(nvd_cve["metrics"]),
|
||
|
|
} if "metrics" in nvd_cve else {}),
|
||
|
|
**({
|
||
|
|
"advisories": nvd_cve_references_to_cdx(nvd_cve["references"]),
|
||
|
|
} if "references" in nvd_cve else {}),
|
||
|
|
}
|
||
|
|
|
||
|
|
return vulnerability
|
||
|
|
|
||
|
|
|
||
|
|
def vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability):
|
||
|
|
"""
|
||
|
|
Append 'vulnerability' passed as argument to the 'vulnerabilities' argument
|
||
|
|
if an entry with the same 'id' doesn't exist yet.
|
||
|
|
If the vulnerability already exists, the input reference is added to the
|
||
|
|
'affects' list of the existing entry.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
vulnerabilities (list): The vulnerabilities array reference retrieved
|
||
|
|
from the input CycloneDX SBOM
|
||
|
|
vulnerability (dict): Vulnerability to add to the 'vulnerabilities' list.
|
||
|
|
"""
|
||
|
|
# Search if a vulnerability with the same identifier already exists in the
|
||
|
|
# SBOM vulnerability list.
|
||
|
|
matching_vuln = next(
|
||
|
|
(vuln for vuln in vulnerabilities if vuln.get("id") == vulnerability["id"]),
|
||
|
|
None
|
||
|
|
)
|
||
|
|
|
||
|
|
# bom-ref to the component is passed to the affects of the vulnerability
|
||
|
|
# passed as argument
|
||
|
|
bom_ref = next((a["ref"] for a in vulnerability.get("affects", [])), None)
|
||
|
|
|
||
|
|
if matching_vuln is not None:
|
||
|
|
# Remove the affect to not use it while updating matching vuln.
|
||
|
|
if "affects" in vulnerability:
|
||
|
|
del vulnerability["affects"]
|
||
|
|
|
||
|
|
if matching_vuln.get("analysis") is not None and "analysis" in vulnerability:
|
||
|
|
# We don't update vulnerabilities that already have an
|
||
|
|
# 'analysis'.
|
||
|
|
# Buildroot ignored vulnerabilities will already have
|
||
|
|
# an analysis and need to remain as such.
|
||
|
|
del vulnerability["analysis"]
|
||
|
|
|
||
|
|
affects = matching_vuln.setdefault("affects", [])
|
||
|
|
|
||
|
|
if bom_ref is not None:
|
||
|
|
ref = next((a["ref"] for a in affects if a["ref"] == bom_ref), None)
|
||
|
|
if ref is None:
|
||
|
|
# Add a 'ref' (bom reference) to the component if not
|
||
|
|
# already present in the 'affects' list.
|
||
|
|
affects.append({
|
||
|
|
"ref": bom_ref
|
||
|
|
})
|
||
|
|
|
||
|
|
# Update the metadata of the vulnerability with the one
|
||
|
|
# downloaded from the database.
|
||
|
|
matching_vuln.update(vulnerability)
|
||
|
|
else:
|
||
|
|
vulnerabilities.append(vulnerability)
|
||
|
|
|
||
|
|
|
||
|
|
def check_package_cve_affects(cve: cvecheck.CVE, cpe_product_pkgs, sbom, opt: Options):
|
||
|
|
vulnerabilities = sbom.setdefault("vulnerabilities", [])
|
||
|
|
|
||
|
|
for product in cve.affected_products:
|
||
|
|
for comp in cpe_product_pkgs.get(product, []):
|
||
|
|
cve_status = cve.affects(comp["name"], comp["version"], comp["cpe"])
|
||
|
|
|
||
|
|
if cve_status == cve.CVE_UNKNOWN:
|
||
|
|
continue
|
||
|
|
|
||
|
|
if cve_status == cve.CVE_DOESNT_AFFECT and not opt["include_resolved"]:
|
||
|
|
continue
|
||
|
|
|
||
|
|
vulnerability = nvd_cve_to_cdx_vulnerability(cve.nvd_cve)
|
||
|
|
|
||
|
|
vulnerability["analysis"] = {
|
||
|
|
"state": "exploitable" if cve_status == cve.CVE_AFFECTS else "resolved"
|
||
|
|
}
|
||
|
|
|
||
|
|
vulnerability["affects"] = [{
|
||
|
|
"ref": comp["bom-ref"]
|
||
|
|
}]
|
||
|
|
|
||
|
|
vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability)
|
||
|
|
|
||
|
|
|
||
|
|
def check_package_cves(nvd_path: Path, sbom, opt: Options):
|
||
|
|
"""
|
||
|
|
Iterate over every entry of the NVD API mirror. Each vulnerability is
|
||
|
|
compared to the set of components passed as argument in the 'sbom'.
|
||
|
|
The vulnerabilities set of that 'sbom' argument is enriched with analysis
|
||
|
|
of vulnerabilities that match that set of components.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
nvd_path (Path): Path of the mirror of the NVD API.
|
||
|
|
sbom (dict): Input SBOM containing a set of vulnerabilities that will be enriched.
|
||
|
|
opt (Options): Options for the analysis.
|
||
|
|
"""
|
||
|
|
cpe_product_pkgs = defaultdict(list)
|
||
|
|
|
||
|
|
for comp in sbom.get("components", []):
|
||
|
|
if comp.get("cpe") and comp.get("version"):
|
||
|
|
cpe_product = cvecheck.CPE(comp["cpe"]).product
|
||
|
|
cpe_product_pkgs[cpe_product].append(comp)
|
||
|
|
|
||
|
|
for cve in cvecheck.CVE.read_nvd_dir(nvd_path):
|
||
|
|
check_package_cve_affects(cve, cpe_product_pkgs, sbom, opt)
|
||
|
|
|
||
|
|
|
||
|
|
def enrich_vulnerabilities(nvd_path: Path, sbom):
|
||
|
|
"""
|
||
|
|
Iterate over the vulnerabilities present in the 'sbom' passed as arguments
|
||
|
|
and enrich the vulnerability with content from the NVD API mirror.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
nvd_path (Path): Path of the mirror of the NVD API.
|
||
|
|
sbom (dict): Input SBOM containing a set of vulnerabilities that will be enriched.
|
||
|
|
"""
|
||
|
|
vulnerabilities = sbom.setdefault("vulnerabilities", [])
|
||
|
|
|
||
|
|
for vuln in vulnerabilities:
|
||
|
|
vuln_id = vuln.get("id")
|
||
|
|
if vuln_id is None or not vuln_id.upper().startswith("CVE-"):
|
||
|
|
continue
|
||
|
|
|
||
|
|
cve = cvecheck.CVE.read_nvd_entry(nvd_path, vuln_id)
|
||
|
|
|
||
|
|
if cve is None:
|
||
|
|
print(f"Warning: '{vuln_id}' doesn't exist in NVD database.", file=sys.stderr)
|
||
|
|
continue
|
||
|
|
|
||
|
|
vulnerability = nvd_cve_to_cdx_vulnerability(cve.nvd_cve)
|
||
|
|
vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability)
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(description=DESCRIPTION)
|
||
|
|
parser.add_argument("-i", "--in-file", nargs="?", type=argparse.FileType("r"),
|
||
|
|
default=(None if sys.stdin.isatty() else sys.stdin))
|
||
|
|
parser.add_argument("-o", "--out-file", nargs="?", type=argparse.FileType("w"),
|
||
|
|
default=sys.stdout)
|
||
|
|
parser.add_argument('--nvd-path', dest='nvd_path',
|
||
|
|
default=brpath / 'dl' / 'buildroot-nvd',
|
||
|
|
help='Path to the local NVD database',
|
||
|
|
type=lambda p: Path(p).expanduser().resolve())
|
||
|
|
parser.add_argument("--enrich-only", default=False, action='store_true',
|
||
|
|
help="Only update metadata for the vulnerabilities currently present " +
|
||
|
|
"in the input CycloneDX SBOM. Don't do an analysis.")
|
||
|
|
parser.add_argument("--include-resolved", default=False, action='store_true',
|
||
|
|
help="Add vulnerabilities already 'resolved' that don't affect a " +
|
||
|
|
"component to the output CycloneDX vulnerabilities analysis.")
|
||
|
|
parser.add_argument("--no-nvd-update", default=False, action='store_true',
|
||
|
|
help="Doesn't update the NVD database.")
|
||
|
|
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
if args.in_file is None or args.nvd_path is None:
|
||
|
|
parser.print_help()
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
sbom = json.load(args.in_file)
|
||
|
|
|
||
|
|
opt = Options(
|
||
|
|
include_resolved=args.include_resolved,
|
||
|
|
)
|
||
|
|
|
||
|
|
args.nvd_path.mkdir(parents=True, exist_ok=True)
|
||
|
|
if not args.no_nvd_update:
|
||
|
|
cvecheck.CVE.download_nvd(args.nvd_path)
|
||
|
|
|
||
|
|
if args.enrich_only:
|
||
|
|
enrich_vulnerabilities(args.nvd_path, sbom)
|
||
|
|
else:
|
||
|
|
check_package_cves(args.nvd_path, sbom, opt)
|
||
|
|
|
||
|
|
args.out_file.write(json.dumps(sbom, indent=2))
|
||
|
|
args.out_file.write('\n')
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|