Skip to content
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from vulnerabilities.importers import github_osv
from vulnerabilities.importers import istio
from vulnerabilities.importers import mozilla
from vulnerabilities.pipelines.v2_importers import liferay_importer
from vulnerabilities.importers import openssl
from vulnerabilities.importers import oss_fuzz
from vulnerabilities.importers import postgresql
Expand Down Expand Up @@ -115,5 +116,6 @@
ubuntu_usn.UbuntuUSNImporter,
fireeye.FireyeImporter,
oss_fuzz.OSSFuzzImporter,
liferay_importer.LiferayImporterPipeline,
]
)
203 changes: 203 additions & 0 deletions vulnerabilities/pipelines/v2_importers/liferay_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
import re
from typing import Iterable
from urllib.parse import urljoin

import requests
from bs4 import BeautifulSoup
from packageurl import PackageURL
from univers.version_range import MavenVersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import CVSSV31

logger = logging.getLogger(__name__)


class LiferayImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
spdx_license_expression = "Apache-2.0"
license_url = "https://www.apache.org/licenses/LICENSE-2.0"
pipeline_id = "liferay_importer_v2"
importer_name = "Liferay Importer"

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def advisories_count(self) -> int:
return 1

def collect_advisories(self) -> Iterable[AdvisoryData]:
base_url = "https://liferay.dev/portal/security/known-vulnerabilities"

# 1. Fetch Main Page
try:
main_page = requests.get(base_url)
main_page.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to fetch Liferay main page: {e}")
return

soup = BeautifulSoup(main_page.content, "lxml")

# 2. Find Release Links
release_links = set()
for a in soup.find_all("a", href=True):
href = a["href"]
if "/categories/" in href and "known-vulnerabilities" in href:
release_links.add(urljoin(base_url, href))

for release_url in release_links:
yield from self.process_release_page(release_url)

def process_release_page(self, release_url):
try:
page = requests.get(release_url)
page.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to fetch release page {release_url}: {e}")
return

soup = BeautifulSoup(page.content, "lxml")

# 3. Find Vulnerability Links
vuln_links = set()
for a in soup.find_all("a", href=True):
href = a["href"]
if "/asset_publisher/" in href and "cve-" in href.lower():
vuln_links.add(urljoin(release_url, href))

for vuln_url in vuln_links:
yield from self.process_vulnerability_page(vuln_url)

def process_vulnerability_page(self, vuln_url):
try:
page = requests.get(vuln_url)
page.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to fetch vulnerability page {vuln_url}: {e}")
return

soup = BeautifulSoup(page.content, "lxml")

# Extract Details
title = soup.find("h1")
title_text = title.get_text(strip=True) if title else ""

# CVE ID
cve_match = re.search(r"(CVE-\d{4}-\d{4,})", title_text)
if not cve_match:
cve_match = re.search(r"(CVE-\d{4}-\d{4,})", soup.get_text())

cve_id = cve_match.group(1) if cve_match else ""
if not cve_id:
return

# Description
description_header = soup.find(string=re.compile("Description"))
description = ""
if description_header:
header_elem = description_header.parent
if header_elem.name.startswith("h"):
desc_elem = header_elem.find_next_sibling()
if desc_elem:
description = desc_elem.get_text(strip=True)

# Severity
severity_header = soup.find(string=re.compile("Severity"))
severities = []
if severity_header:
header_elem = severity_header.parent
if header_elem.name.startswith("h"):
sev_elem = header_elem.find_next_sibling()
if sev_elem:
sev_text = sev_elem.get_text(strip=True)
cvss_match = re.search(r"\(CVSS:3\.1/(.*?)\)", sev_text)
if cvss_match:
vector = cvss_match.group(1)
score_match = re.match(r"([\d\.]+)", sev_text)
score = score_match.group(1) if score_match else None

severities.append(
VulnerabilitySeverity(
system=CVSSV31,
value=score,
scoring_elements=f"CVSS:3.1/{vector}"
)
)

# Affected Versions
affected_header = soup.find(string=re.compile("Affected Version"))
affected_packages = []
if affected_header:
header_elem = affected_header.parent
if header_elem.name.startswith("h"):
next_elem = header_elem.find_next_sibling()
if next_elem:
if next_elem.name == "ul":
items = next_elem.find_all("li")
for item in items:
pkg = self.parse_version_text(item.get_text(strip=True))
if pkg:
affected_packages.append(pkg)
else:
lines = next_elem.get_text("\n").split("\n")
for line in lines:
pkg = self.parse_version_text(line.strip())
if pkg:
affected_packages.append(pkg)

# Clean URL
if "?" in vuln_url:
vuln_url = vuln_url.split("?")[0]

yield AdvisoryData(
advisory_id=cve_id,
aliases=[cve_id],
summary=description,
affected_packages=affected_packages,
references_v2=[ReferenceV2(url=vuln_url)],
url=vuln_url,
severities=severities
)

def parse_version_text(self, text):
if not text:
return None

if "DXP" in text:
name = "liferay-dxp"
elif "Portal" in text:
name = "liferay-portal"
else:
name = "liferay-portal"

purl = PackageURL(type="generic", name=name)

version_match = re.search(r"(\d+\.\d+(\.\d+)?)", text)
if version_match:
version = version_match.group(1)
try:
affected_range = MavenVersionRange.from_versions([version])
return AffectedPackageV2(
package=purl,
affected_version_range=affected_range
)
except Exception as e:
logger.error(f"Failed to parse version {version}: {e}")
return None

return None
82 changes: 82 additions & 0 deletions vulnerabilities/tests/test_liferay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from unittest import TestCase
from unittest.mock import patch, MagicMock

from vulnerabilities.pipelines.v2_importers.liferay_importer import LiferayImporterPipeline
from vulnerabilities.importer import AdvisoryData

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data")


class TestLiferayImporterPipeline(TestCase):
@patch("vulnerabilities.pipelines.v2_importers.liferay_importer.requests.get")
def test_collect_advisories(self, mock_get):
importer = LiferayImporterPipeline()

# Mock responses
mock_main_page = MagicMock()
mock_main_page.content = b"""
<html>
<body>
<a href="/portal/security/known-vulnerabilities/-/categories/12345">Liferay Portal 7.4</a>
</body>
</html>
"""

mock_release_page = MagicMock()
mock_release_page.content = b"""
<html>
<body>
<a href="/portal/security/known-vulnerabilities/-/asset_publisher/jekt/content/cve-2023-1234">CVE-2023-1234 Title</a>
</body>
</html>
"""

mock_vuln_page = MagicMock()
mock_vuln_page.content = b"""
<html>
<body>
<h1>CVE-2023-1234 Title</h1>
<h3>Description</h3>
<p>This is a test vulnerability description.</p>
<h3>Severity</h3>
<p>4.8 (CVSS:3.1/AV:N/AC:L/PR:H/UI:R/S:C/C:L/I:L/A:N)</p>
<h3>Affected Version(s)</h3>
<ul>
<li>Liferay Portal 7.4.0</li>
</ul>
</body>
</html>
"""

# Configure side_effect to return different mocks based on URL
def side_effect(url):
if url == "https://liferay.dev/portal/security/known-vulnerabilities":
return mock_main_page
elif "categories" in url:
return mock_release_page
elif "asset_publisher" in url:
return mock_vuln_page
return MagicMock()

mock_get.side_effect = side_effect

advisories = list(importer.collect_advisories())

self.assertEqual(len(advisories), 1)
advisory = advisories[0]
self.assertIsInstance(advisory, AdvisoryData)
self.assertEqual(advisory.aliases, ["CVE-2023-1234"])
self.assertEqual(advisory.summary, "This is a test vulnerability description.")
self.assertEqual(len(advisory.affected_packages), 1)
self.assertEqual(advisory.affected_packages[0].package.name, "liferay-portal")