diff --git a/errata/migrations/0010_synopsis_textfield.py b/errata/migrations/0010_synopsis_textfield.py new file mode 100644 index 00000000..eae13347 --- /dev/null +++ b/errata/migrations/0010_synopsis_textfield.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.28 on 2026-05-02 03:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('errata', '0009_backfill_cached_counts'), + ] + + operations = [ + migrations.AlterField( + model_name='erratum', + name='synopsis', + field=models.TextField(), + ), + ] diff --git a/errata/models.py b/errata/models.py index ba4baec7..035a2456 100644 --- a/errata/models.py +++ b/errata/models.py @@ -33,7 +33,7 @@ class Erratum(models.Model): name = models.CharField(max_length=255, unique=True) e_type = models.CharField(max_length=255) issue_date = models.DateTimeField() - synopsis = models.CharField(max_length=255) + synopsis = models.TextField() affected_packages = models.ManyToManyField(Package, blank=True, related_name='affected_by_erratum') fixed_packages = models.ManyToManyField(Package, blank=True, related_name='provides_fix_in_erratum') from operatingsystems.models import OSRelease diff --git a/reports/tests/test_parsing.py b/reports/tests/test_parsing.py index f9c94098..e5644a92 100644 --- a/reports/tests/test_parsing.py +++ b/reports/tests/test_parsing.py @@ -18,7 +18,7 @@ from packages.models import Package from reports.utils import ( - _get_package_type, _get_repo_type, parse_packages, parse_repos, + get_package_type, get_repo_type, parse_packages, parse_repos, process_repo_text, ) from repos.models import Repository @@ -122,32 +122,32 @@ def test_parse_repos_strips_quotes(self): CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}} ) class GetPackageTypeTests(TestCase): - """Tests for _get_package_type() function.""" + """Tests for get_package_type() function.""" - def test_get_package_type_deb(self): + def testget_package_type_deb(self): """Test DEB package type detection.""" - self.assertEqual(_get_package_type('deb'), Package.DEB) - self.assertEqual(_get_package_type('DEB'), Package.DEB) - self.assertEqual(_get_package_type('Deb'), Package.DEB) + self.assertEqual(get_package_type('deb'), Package.DEB) + self.assertEqual(get_package_type('DEB'), Package.DEB) + self.assertEqual(get_package_type('Deb'), Package.DEB) - def test_get_package_type_rpm(self): + def testget_package_type_rpm(self): """Test RPM package type detection.""" - self.assertEqual(_get_package_type('rpm'), Package.RPM) - self.assertEqual(_get_package_type('RPM'), Package.RPM) + self.assertEqual(get_package_type('rpm'), Package.RPM) + self.assertEqual(get_package_type('RPM'), Package.RPM) - def test_get_package_type_arch(self): + def testget_package_type_arch(self): """Test Arch package type detection.""" - self.assertEqual(_get_package_type('arch'), Package.ARCH) + self.assertEqual(get_package_type('arch'), Package.ARCH) - def test_get_package_type_gentoo(self): + def testget_package_type_gentoo(self): """Test Gentoo package type detection.""" - self.assertEqual(_get_package_type('gentoo'), Package.GENTOO) + self.assertEqual(get_package_type('gentoo'), Package.GENTOO) - def test_get_package_type_unknown(self): + def testget_package_type_unknown(self): """Test unknown package type returns UNKNOWN.""" - self.assertEqual(_get_package_type(''), Package.UNKNOWN) - self.assertEqual(_get_package_type('invalid'), Package.UNKNOWN) - self.assertEqual(_get_package_type(None), Package.UNKNOWN) + self.assertEqual(get_package_type(''), Package.UNKNOWN) + self.assertEqual(get_package_type('invalid'), Package.UNKNOWN) + self.assertEqual(get_package_type(None), Package.UNKNOWN) @override_settings( @@ -155,30 +155,30 @@ def test_get_package_type_unknown(self): CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}} ) class GetRepoTypeTests(TestCase): - """Tests for _get_repo_type() function.""" + """Tests for get_repo_type() function.""" - def test_get_repo_type_deb(self): + def testget_repo_type_deb(self): """Test DEB repo type detection.""" - self.assertEqual(_get_repo_type('deb'), Repository.DEB) - self.assertEqual(_get_repo_type('DEB'), Repository.DEB) + self.assertEqual(get_repo_type('deb'), Repository.DEB) + self.assertEqual(get_repo_type('DEB'), Repository.DEB) - def test_get_repo_type_rpm(self): + def testget_repo_type_rpm(self): """Test RPM repo type detection.""" - self.assertEqual(_get_repo_type('rpm'), Repository.RPM) - self.assertEqual(_get_repo_type('RPM'), Repository.RPM) + self.assertEqual(get_repo_type('rpm'), Repository.RPM) + self.assertEqual(get_repo_type('RPM'), Repository.RPM) - def test_get_repo_type_arch(self): + def testget_repo_type_arch(self): """Test Arch repo type detection.""" - self.assertEqual(_get_repo_type('arch'), Repository.ARCH) + self.assertEqual(get_repo_type('arch'), Repository.ARCH) - def test_get_repo_type_gentoo(self): + def testget_repo_type_gentoo(self): """Test Gentoo repo type detection.""" - self.assertEqual(_get_repo_type('gentoo'), Repository.GENTOO) + self.assertEqual(get_repo_type('gentoo'), Repository.GENTOO) - def test_get_repo_type_unknown(self): + def testget_repo_type_unknown(self): """Test unknown repo type returns None.""" - self.assertIsNone(_get_repo_type('')) - self.assertIsNone(_get_repo_type('invalid')) + self.assertIsNone(get_repo_type('')) + self.assertIsNone(get_repo_type('invalid')) @override_settings( diff --git a/reports/utils.py b/reports/utils.py index 4fea67d3..45c26d98 100644 --- a/reports/utils.py +++ b/reports/utils.py @@ -34,7 +34,9 @@ from patchman.signals import pbar_start, pbar_update from repos.models import Mirror, MirrorPackage, Repository from repos.utils import get_or_create_repo -from util.logging import debug_message, error_message, info_message +from util.logging import ( + debug_message, error_message, info_message, warning_message, +) def process_repos(report, host): @@ -216,7 +218,7 @@ def parse_repos(repos_string): return repos -def _get_repo_type(type_str): +def get_repo_type(type_str): """ Convert repo type string to Repository constant """ type_str = type_str.lower() @@ -242,6 +244,9 @@ def process_repo(r_type, r_name, r_id, r_priority, urls, arch): for r_url in urls: if r_type == Repository.GENTOO and r_url.startswith('rsync'): r_url = 'https://api.gentoo.org/mirrors/distfiles.xml' + if not r_url.startswith(('http://', 'https://')): + warning_message(text=f'Skipping non-http(s) mirror URL: {r_url}') + continue try: mirror = Mirror.objects.get(url=r_url.strip('/')) except Mirror.DoesNotExist: @@ -259,7 +264,8 @@ def process_repo(r_type, r_name, r_id, r_priority, urls, arch): repository.repo_id = r_id for url in unknown: - Mirror.objects.create(repo=repository, url=url.rstrip('/')) + if url.startswith(('http://', 'https://')): + Mirror.objects.create(repo=repository, url=url.rstrip('/')) for mirror in Mirror.objects.filter(repo=repository).values('url'): mirror_url = mirror.get('url') @@ -364,7 +370,7 @@ def parse_packages(packages_string): return packages -def _get_package_type(type_str): +def get_package_type(type_str): """ Convert package type string to Package constant """ type_str = type_str.lower() if type_str else '' @@ -398,7 +404,7 @@ def process_package_text(pkg): rel = pkg[3] if pkg[3] else '' arch = pkg[4] if pkg[4] else 'unknown' - p_type = _get_package_type(pkg[5]) + p_type = get_package_type(pkg[5]) p_category = pkg[6] if p_type == Package.GENTOO and len(pkg) > 6 else None p_repo = pkg[7] if p_type == Package.GENTOO and len(pkg) > 7 else None @@ -413,7 +419,7 @@ def process_package_json(pkg): ver = pkg.get('version', '') rel = pkg.get('release', '') arch = pkg.get('arch', 'unknown') - p_type = _get_package_type(pkg.get('type', '')) + p_type = get_package_type(pkg.get('type', '')) p_category = pkg.get('category') if p_type == Package.GENTOO else None p_repo = pkg.get('repo') if p_type == Package.GENTOO else None @@ -453,7 +459,7 @@ def process_packages_json(packages_json, host): def process_repo_json(repo, arch): """ Processes a single JSON repo dict and converts to a repo object """ - r_type = _get_repo_type(repo.get('type', '')) + r_type = get_repo_type(repo.get('type', '')) if r_type is None: return None, 0 diff --git a/repos/utils.py b/repos/utils.py index 0d81eb25..87bc2192 100644 --- a/repos/utils.py +++ b/repos/utils.py @@ -197,6 +197,9 @@ def check_for_mirrorlists(repo): Creates MAX_MIRRORS mirrors from list if so. """ for mirror in repo.mirror_set.all(): + if not mirror.url.startswith(('http://', 'https://')): + warning_message(text=f'Skipping non-http(s) mirror URL: {mirror.url}') + continue mirror_urls = get_mirrorlist_urls(mirror.url) if mirror_urls: mirror.mirrorlist = True diff --git a/util/__init__.py b/util/__init__.py index c0632d09..e823efea 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -121,6 +121,8 @@ def get_url(url, headers=None, params=None, session=None): error_message(text=f'Too many redirects - {url}') except ConnectionError: error_message(text=f'Connection error - {url}') + except requests.exceptions.InvalidSchema: + error_message(text=f'Unsupported URL scheme - {url}') return response @@ -194,16 +196,18 @@ def unzstd(contents): """ unzstd contents in memory and return the data """ try: - zstddata = zstd.ZstdDecompressor().stream_reader(contents).read() - return zstddata - except zstd.ZstdError as e: + if hasattr(zstd, 'decompress'): + return zstd.decompress(contents) + return zstd.ZstdDecompressor().stream_reader(contents).read() + except (zstd.ZstdError, Exception) as e: error_message(text=f'zstd: {e}') def extract(data, fmt): """ Extract the contents based on mimetype or file ending. Return the unmodified data if neither mimetype nor file ending matches, otherwise - return the extracted contents. + return the extracted contents. Falls back to unmodified data if + decompression fails (e.g. requests already decompressed the content). """ try: mime = magic.from_buffer(data, mime=True) @@ -212,14 +216,19 @@ def extract(data, fmt): m = magic.open(magic.MAGIC_MIME) m.load() mime = m.buffer(data).split(';')[0] + if mime.startswith('text/'): + return data + extracted = None if mime == 'application/zstd' or fmt.endswith('zst'): - return unzstd(data) - if mime == 'application/x-xz' or fmt.endswith('xz'): - return unxz(data) + extracted = unzstd(data) + elif mime == 'application/x-xz' or fmt.endswith('xz'): + extracted = unxz(data) elif mime == 'application/x-bzip2' or fmt.endswith('bz2'): - return bunzip2(data) + extracted = bunzip2(data) elif mime == 'application/gzip' or fmt.endswith('gz'): - return gunzip(data) + extracted = gunzip(data) + if extracted is not None: + return extracted return data diff --git a/util/context_processors.py b/util/context_processors.py index 97bbcf2c..0e9dc6b2 100644 --- a/util/context_processors.py +++ b/util/context_processors.py @@ -29,7 +29,7 @@ from util import get_setting_of_type -def _get_git_ref(): +def get_git_ref(): """Get current git ref if in a git repo.""" git_dir = Path(__file__).parent.parent / '.git' if not git_dir.exists(): @@ -49,7 +49,7 @@ def _get_git_ref(): return None -def _get_version(): +def get_version(): """Get version from package metadata or VERSION.txt.""" # Try importlib.metadata first (for installed packages) try: @@ -66,8 +66,8 @@ def _get_version(): # Cache version info at module load time (once per process) -_PATCHMAN_VERSION = _get_version() -_PATCHMAN_GIT_REF = _get_git_ref() +_PATCHMAN_VERSION = get_version() +_PATCHMAN_GIT_REF = get_git_ref() if _PATCHMAN_GIT_REF: _PATCHMAN_VERSION_DISPLAY = f'v{_PATCHMAN_VERSION} ({_PATCHMAN_GIT_REF})' else: diff --git a/util/filterspecs.py b/util/filterspecs.py index eac0f747..b2f160c0 100644 --- a/util/filterspecs.py +++ b/util/filterspecs.py @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see +from html import escape from operator import itemgetter from django.db.models.query import QuerySet @@ -70,7 +71,7 @@ def output(self, qs): style = 'list-group-item-success' qs[self.name] = k output += f'{v}\n' + output += f'"list-group-item {style}">{escape(str(v))}\n' output += '' return output