|
| 1 | +# Copyright 2020, 2025, Microsoft Corporation |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: LGPL-2.1-only |
| 4 | +# |
| 5 | + |
| 6 | +from collections import defaultdict |
| 7 | +import typing |
| 8 | + |
| 9 | +from .. import policyrep |
| 10 | +from ..terulequery import TERuleQuery |
| 11 | + |
| 12 | +from .checkermodule import CheckerModule |
| 13 | +from .descriptors import ConfigSetDescriptor |
| 14 | + |
| 15 | +EXEMPT_WRITE: typing.Final[str] = "exempt_write_domain" |
| 16 | +EXEMPT_LOAD: typing.Final[str] = "exempt_load_domain" |
| 17 | +EXEMPT_FILE: typing.Final[str] = "exempt_file" |
| 18 | + |
| 19 | +__all__: typing.Final[tuple[str, ...]] = ("ReadOnlyKernelModules",) |
| 20 | + |
| 21 | + |
| 22 | +class ReadOnlyKernelModules(CheckerModule): |
| 23 | + |
| 24 | + """Checker module for asserting all kernel modules are read-only.""" |
| 25 | + |
| 26 | + check_type = "ro_kmods" |
| 27 | + check_config = frozenset((EXEMPT_WRITE, EXEMPT_LOAD, EXEMPT_FILE)) |
| 28 | + |
| 29 | + exempt_write_domain = ConfigSetDescriptor[policyrep.Type]( |
| 30 | + "lookup_type_or_attr", strict=False, expand=True) |
| 31 | + exempt_file = ConfigSetDescriptor[policyrep.Type]( |
| 32 | + "lookup_type_or_attr", strict=False, expand=True) |
| 33 | + exempt_load_domain = ConfigSetDescriptor[policyrep.Type]( |
| 34 | + "lookup_type_or_attr", strict=False, expand=True) |
| 35 | + |
| 36 | + def __init__(self, policy: policyrep.SELinuxPolicy, checkname: str, |
| 37 | + config: dict[str, str]) -> None: |
| 38 | + |
| 39 | + super().__init__(policy, checkname, config) |
| 40 | + self.exempt_write_domain = config.get(EXEMPT_WRITE) |
| 41 | + self.exempt_file = config.get(EXEMPT_FILE) |
| 42 | + self.exempt_load_domain = config.get(EXEMPT_LOAD) |
| 43 | + |
| 44 | + def _collect_kernel_mods(self) -> defaultdict[policyrep.Type, set[policyrep.AVRule]]: |
| 45 | + self.log.debug("Collecting list of kernel module types.") |
| 46 | + self.log.debug(f"{self.exempt_load_domain=}") |
| 47 | + query = TERuleQuery(self.policy, |
| 48 | + ruletype=("allow",), |
| 49 | + tclass=("system",), |
| 50 | + perms=("module_load",)) |
| 51 | + |
| 52 | + collected = defaultdict[policyrep.Type, set[policyrep.AVRule]](set) |
| 53 | + for rule in query.results(): |
| 54 | + sources = set(rule.source.expand()) - self.exempt_load_domain |
| 55 | + targets = set(rule.target.expand()) - self.exempt_file |
| 56 | + |
| 57 | + # remove self rules |
| 58 | + targets -= sources |
| 59 | + |
| 60 | + # ignore rule if source or target is an empty attr |
| 61 | + if not sources or not targets: |
| 62 | + self.log.debug(f"Ignoring empty module_load rule: {rule}") |
| 63 | + continue |
| 64 | + |
| 65 | + for t in targets: |
| 66 | + self.log.debug(f"Determined {t} is a kernel module by: {rule}") |
| 67 | + assert isinstance(rule, policyrep.AVRule), \ |
| 68 | + f"Expected AVRule, got {type(rule)}, this is an SETools bug." |
| 69 | + collected[t].add(rule) |
| 70 | + |
| 71 | + return collected |
| 72 | + |
| 73 | + def run(self) -> list[policyrep.Type]: |
| 74 | + self.log.info("Checking kernel modules are read-only.") |
| 75 | + |
| 76 | + query = TERuleQuery(self.policy, |
| 77 | + ruletype=("allow",), |
| 78 | + tclass=("file",), |
| 79 | + perms=("write", "append")) |
| 80 | + kmods = self._collect_kernel_mods() |
| 81 | + failures = defaultdict(set) |
| 82 | + |
| 83 | + for kmod_type in kmods.keys(): |
| 84 | + self.log.debug(f"Checking if kernel module type {kmod_type} is writable.") |
| 85 | + |
| 86 | + query.target = kmod_type |
| 87 | + for rule in sorted(query.results()): |
| 88 | + if set(rule.source.expand()) - self.exempt_write_domain: |
| 89 | + failures[kmod_type].add(rule) |
| 90 | + |
| 91 | + for kmod_type in sorted(failures.keys()): |
| 92 | + self.output.write("\n------------\n\n") |
| 93 | + self.output.write(f"Kernel module type {kmod_type} is writable.\n\n") |
| 94 | + self.output.write("Module load rules:\n") |
| 95 | + for rule in sorted(kmods[kmod_type]): |
| 96 | + self.output.write(f" * {rule}\n") |
| 97 | + |
| 98 | + self.output.write("\nWrite rules:\n") |
| 99 | + for rule in sorted(failures[kmod_type]): |
| 100 | + self.log_fail(str(rule)) |
| 101 | + |
| 102 | + self.log.debug(f"{len(failures)} failure(s)") |
| 103 | + return sorted(failures.keys()) |
0 commit comments