Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions errata/migrations/0010_synopsis_textfield.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.28 on 2026-05-02 03:49

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('errata', '0009_backfill_cached_counts'),
]

operations = [
migrations.AlterField(
model_name='erratum',
name='synopsis',
field=models.TextField(),
),
]
2 changes: 1 addition & 1 deletion errata/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Erratum(models.Model):
name = models.CharField(max_length=255, unique=True)
e_type = models.CharField(max_length=255)
issue_date = models.DateTimeField()
synopsis = models.CharField(max_length=255)
synopsis = models.TextField()
affected_packages = models.ManyToManyField(Package, blank=True, related_name='affected_by_erratum')
fixed_packages = models.ManyToManyField(Package, blank=True, related_name='provides_fix_in_erratum')
from operatingsystems.models import OSRelease
Expand Down
62 changes: 31 additions & 31 deletions reports/tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from packages.models import Package
from reports.utils import (
_get_package_type, _get_repo_type, parse_packages, parse_repos,
get_package_type, get_repo_type, parse_packages, parse_repos,
process_repo_text,
)
from repos.models import Repository
Expand Down Expand Up @@ -122,63 +122,63 @@ def test_parse_repos_strips_quotes(self):
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}
)
class GetPackageTypeTests(TestCase):
"""Tests for _get_package_type() function."""
"""Tests for get_package_type() function."""

def test_get_package_type_deb(self):
def testget_package_type_deb(self):
"""Test DEB package type detection."""
self.assertEqual(_get_package_type('deb'), Package.DEB)
self.assertEqual(_get_package_type('DEB'), Package.DEB)
self.assertEqual(_get_package_type('Deb'), Package.DEB)
self.assertEqual(get_package_type('deb'), Package.DEB)
self.assertEqual(get_package_type('DEB'), Package.DEB)
self.assertEqual(get_package_type('Deb'), Package.DEB)

def test_get_package_type_rpm(self):
def testget_package_type_rpm(self):
"""Test RPM package type detection."""
self.assertEqual(_get_package_type('rpm'), Package.RPM)
self.assertEqual(_get_package_type('RPM'), Package.RPM)
self.assertEqual(get_package_type('rpm'), Package.RPM)
self.assertEqual(get_package_type('RPM'), Package.RPM)

def test_get_package_type_arch(self):
def testget_package_type_arch(self):
"""Test Arch package type detection."""
self.assertEqual(_get_package_type('arch'), Package.ARCH)
self.assertEqual(get_package_type('arch'), Package.ARCH)

def test_get_package_type_gentoo(self):
def testget_package_type_gentoo(self):
"""Test Gentoo package type detection."""
self.assertEqual(_get_package_type('gentoo'), Package.GENTOO)
self.assertEqual(get_package_type('gentoo'), Package.GENTOO)

def test_get_package_type_unknown(self):
def testget_package_type_unknown(self):
"""Test unknown package type returns UNKNOWN."""
self.assertEqual(_get_package_type(''), Package.UNKNOWN)
self.assertEqual(_get_package_type('invalid'), Package.UNKNOWN)
self.assertEqual(_get_package_type(None), Package.UNKNOWN)
self.assertEqual(get_package_type(''), Package.UNKNOWN)
self.assertEqual(get_package_type('invalid'), Package.UNKNOWN)
self.assertEqual(get_package_type(None), Package.UNKNOWN)


@override_settings(
CELERY_TASK_ALWAYS_EAGER=True,
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}
)
class GetRepoTypeTests(TestCase):
"""Tests for _get_repo_type() function."""
"""Tests for get_repo_type() function."""

def test_get_repo_type_deb(self):
def testget_repo_type_deb(self):
"""Test DEB repo type detection."""
self.assertEqual(_get_repo_type('deb'), Repository.DEB)
self.assertEqual(_get_repo_type('DEB'), Repository.DEB)
self.assertEqual(get_repo_type('deb'), Repository.DEB)
self.assertEqual(get_repo_type('DEB'), Repository.DEB)

def test_get_repo_type_rpm(self):
def testget_repo_type_rpm(self):
"""Test RPM repo type detection."""
self.assertEqual(_get_repo_type('rpm'), Repository.RPM)
self.assertEqual(_get_repo_type('RPM'), Repository.RPM)
self.assertEqual(get_repo_type('rpm'), Repository.RPM)
self.assertEqual(get_repo_type('RPM'), Repository.RPM)

def test_get_repo_type_arch(self):
def testget_repo_type_arch(self):
"""Test Arch repo type detection."""
self.assertEqual(_get_repo_type('arch'), Repository.ARCH)
self.assertEqual(get_repo_type('arch'), Repository.ARCH)

def test_get_repo_type_gentoo(self):
def testget_repo_type_gentoo(self):
"""Test Gentoo repo type detection."""
self.assertEqual(_get_repo_type('gentoo'), Repository.GENTOO)
self.assertEqual(get_repo_type('gentoo'), Repository.GENTOO)

def test_get_repo_type_unknown(self):
def testget_repo_type_unknown(self):
"""Test unknown repo type returns None."""
self.assertIsNone(_get_repo_type(''))
self.assertIsNone(_get_repo_type('invalid'))
self.assertIsNone(get_repo_type(''))
self.assertIsNone(get_repo_type('invalid'))


@override_settings(
Expand Down
20 changes: 13 additions & 7 deletions reports/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
from patchman.signals import pbar_start, pbar_update
from repos.models import Mirror, MirrorPackage, Repository
from repos.utils import get_or_create_repo
from util.logging import debug_message, error_message, info_message
from util.logging import (
debug_message, error_message, info_message, warning_message,
)


def process_repos(report, host):
Expand Down Expand Up @@ -216,7 +218,7 @@ def parse_repos(repos_string):
return repos


def _get_repo_type(type_str):
def get_repo_type(type_str):
""" Convert repo type string to Repository constant
"""
type_str = type_str.lower()
Expand All @@ -242,6 +244,9 @@ def process_repo(r_type, r_name, r_id, r_priority, urls, arch):
for r_url in urls:
if r_type == Repository.GENTOO and r_url.startswith('rsync'):
r_url = 'https://api.gentoo.org/mirrors/distfiles.xml'
if not r_url.startswith(('http://', 'https://')):
warning_message(text=f'Skipping non-http(s) mirror URL: {r_url}')
continue
try:
mirror = Mirror.objects.get(url=r_url.strip('/'))
except Mirror.DoesNotExist:
Expand All @@ -259,7 +264,8 @@ def process_repo(r_type, r_name, r_id, r_priority, urls, arch):
repository.repo_id = r_id

for url in unknown:
Mirror.objects.create(repo=repository, url=url.rstrip('/'))
if url.startswith(('http://', 'https://')):
Mirror.objects.create(repo=repository, url=url.rstrip('/'))

for mirror in Mirror.objects.filter(repo=repository).values('url'):
mirror_url = mirror.get('url')
Expand Down Expand Up @@ -364,7 +370,7 @@ def parse_packages(packages_string):
return packages


def _get_package_type(type_str):
def get_package_type(type_str):
""" Convert package type string to Package constant
"""
type_str = type_str.lower() if type_str else ''
Expand Down Expand Up @@ -398,7 +404,7 @@ def process_package_text(pkg):
rel = pkg[3] if pkg[3] else ''
arch = pkg[4] if pkg[4] else 'unknown'

p_type = _get_package_type(pkg[5])
p_type = get_package_type(pkg[5])
p_category = pkg[6] if p_type == Package.GENTOO and len(pkg) > 6 else None
p_repo = pkg[7] if p_type == Package.GENTOO and len(pkg) > 7 else None

Expand All @@ -413,7 +419,7 @@ def process_package_json(pkg):
ver = pkg.get('version', '')
rel = pkg.get('release', '')
arch = pkg.get('arch', 'unknown')
p_type = _get_package_type(pkg.get('type', ''))
p_type = get_package_type(pkg.get('type', ''))
p_category = pkg.get('category') if p_type == Package.GENTOO else None
p_repo = pkg.get('repo') if p_type == Package.GENTOO else None

Expand Down Expand Up @@ -453,7 +459,7 @@ def process_packages_json(packages_json, host):
def process_repo_json(repo, arch):
""" Processes a single JSON repo dict and converts to a repo object
"""
r_type = _get_repo_type(repo.get('type', ''))
r_type = get_repo_type(repo.get('type', ''))
if r_type is None:
return None, 0

Expand Down
3 changes: 3 additions & 0 deletions repos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def check_for_mirrorlists(repo):
Creates MAX_MIRRORS mirrors from list if so.
"""
for mirror in repo.mirror_set.all():
if not mirror.url.startswith(('http://', 'https://')):
warning_message(text=f'Skipping non-http(s) mirror URL: {mirror.url}')
continue
mirror_urls = get_mirrorlist_urls(mirror.url)
if mirror_urls:
mirror.mirrorlist = True
Expand Down
27 changes: 18 additions & 9 deletions util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def get_url(url, headers=None, params=None, session=None):
error_message(text=f'Too many redirects - {url}')
except ConnectionError:
error_message(text=f'Connection error - {url}')
except requests.exceptions.InvalidSchema:
error_message(text=f'Unsupported URL scheme - {url}')
return response


Expand Down Expand Up @@ -194,16 +196,18 @@ def unzstd(contents):
""" unzstd contents in memory and return the data
"""
try:
zstddata = zstd.ZstdDecompressor().stream_reader(contents).read()
return zstddata
except zstd.ZstdError as e:
if hasattr(zstd, 'decompress'):
return zstd.decompress(contents)
return zstd.ZstdDecompressor().stream_reader(contents).read()
except (zstd.ZstdError, Exception) as e:
error_message(text=f'zstd: {e}')


def extract(data, fmt):
""" Extract the contents based on mimetype or file ending. Return the
unmodified data if neither mimetype nor file ending matches, otherwise
return the extracted contents.
return the extracted contents. Falls back to unmodified data if
decompression fails (e.g. requests already decompressed the content).
"""
try:
mime = magic.from_buffer(data, mime=True)
Expand All @@ -212,14 +216,19 @@ def extract(data, fmt):
m = magic.open(magic.MAGIC_MIME)
m.load()
mime = m.buffer(data).split(';')[0]
if mime.startswith('text/'):
return data
extracted = None
if mime == 'application/zstd' or fmt.endswith('zst'):
return unzstd(data)
if mime == 'application/x-xz' or fmt.endswith('xz'):
return unxz(data)
extracted = unzstd(data)
elif mime == 'application/x-xz' or fmt.endswith('xz'):
extracted = unxz(data)
elif mime == 'application/x-bzip2' or fmt.endswith('bz2'):
return bunzip2(data)
extracted = bunzip2(data)
elif mime == 'application/gzip' or fmt.endswith('gz'):
return gunzip(data)
extracted = gunzip(data)
if extracted is not None:
return extracted
return data


Expand Down
8 changes: 4 additions & 4 deletions util/context_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from util import get_setting_of_type


def _get_git_ref():
def get_git_ref():
"""Get current git ref if in a git repo."""
git_dir = Path(__file__).parent.parent / '.git'
if not git_dir.exists():
Expand All @@ -49,7 +49,7 @@ def _get_git_ref():
return None


def _get_version():
def get_version():
"""Get version from package metadata or VERSION.txt."""
# Try importlib.metadata first (for installed packages)
try:
Expand All @@ -66,8 +66,8 @@ def _get_version():


# Cache version info at module load time (once per process)
_PATCHMAN_VERSION = _get_version()
_PATCHMAN_GIT_REF = _get_git_ref()
_PATCHMAN_VERSION = get_version()
_PATCHMAN_GIT_REF = get_git_ref()
if _PATCHMAN_GIT_REF:
_PATCHMAN_VERSION_DISPLAY = f'v{_PATCHMAN_VERSION} ({_PATCHMAN_GIT_REF})'
else:
Expand Down
3 changes: 2 additions & 1 deletion util/filterspecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with Patchman. If not, see <http://www.gnu.org/licenses/>

from html import escape
from operator import itemgetter

from django.db.models.query import QuerySet
Expand Down Expand Up @@ -70,7 +71,7 @@ def output(self, qs):
style = 'list-group-item-success'
qs[self.name] = k
output += f'<a href="{get_query_string(qs)}" class='
output += f'"list-group-item {style}">{v}</a>\n'
output += f'"list-group-item {style}">{escape(str(v))}</a>\n'
output += '</div></div></div>'
return output

Expand Down
Loading