Файловый менеджер - Редактировать - /usr/lib/python3/dist-packages/uaclient/livepatch.py
Назад
import datetime import json import logging import re from functools import lru_cache from typing import List, Optional, Tuple from uaclient import ( event_logger, exceptions, messages, serviceclient, system, util, ) from uaclient.data_types import ( BoolDataValue, DataObject, Field, IncorrectTypeError, StringDataValue, data_list, ) from uaclient.files import state_files HTTP_PROXY_OPTION = "http-proxy" HTTPS_PROXY_OPTION = "https-proxy" LIVEPATCH_CMD = "/snap/bin/canonical-livepatch" LIVEPATCH_API_V1_KERNELS_SUPPORTED = "/v1/api/kernels/supported" event = event_logger.get_event_logger() class LivepatchPatchFixStatus(DataObject): fields = [ Field("name", StringDataValue, required=False, dict_key="Name"), Field("patched", BoolDataValue, required=False, dict_key="Patched"), ] def __init__( self, name: Optional[str], patched: Optional[bool], ): self.name = name self.patched = patched class LivepatchPatchStatus(DataObject): fields = [ Field("state", StringDataValue, required=False, dict_key="State"), Field( "fixes", data_list(LivepatchPatchFixStatus), required=False, dict_key="Fixes", ), ] def __init__( self, state: Optional[str], fixes: Optional[List[LivepatchPatchFixStatus]], ): self.state = state self.fixes = fixes class LivepatchStatusStatus(DataObject): fields = [ Field("kernel", StringDataValue, required=False, dict_key="Kernel"), Field( "livepatch", LivepatchPatchStatus, required=False, dict_key="Livepatch", ), Field( "supported", StringDataValue, required=False, dict_key="Supported", ), ] def __init__( self, kernel: Optional[str], livepatch: Optional[LivepatchPatchStatus], supported: Optional[str], ): self.kernel = kernel self.livepatch = livepatch self.supported = supported class LivepatchStatus(DataObject): fields = [ Field( "status", data_list(LivepatchStatusStatus), required=False, dict_key="Status", ), ] def __init__( self, status: Optional[List[LivepatchStatusStatus]], ): self.status = status def status() -> Optional[LivepatchStatusStatus]: if not is_livepatch_installed(): logging.debug("canonical-livepatch is not installed") return None try: out, _ = system.subp([LIVEPATCH_CMD, "status", "--format", "json"]) except exceptions.ProcessExecutionError: with util.disable_log_to_console(): logging.warning( "canonical-livepatch returned error when checking status" ) return None try: status_json = json.loads(out) except json.JSONDecodeError: with util.disable_log_to_console(): logging.warning( "canonical-livepatch status returned invalid json: {}".format( out ) ) return None try: status_root = LivepatchStatus.from_dict(status_json) except IncorrectTypeError: with util.disable_log_to_console(): logging.warning( "canonical-livepatch status returned unexpected " "structure: {}".format(out) ) return None if status_root.status is None or len(status_root.status) < 1: logging.debug("canonical-livepatch has no status") return None return status_root.status[0] class UALivepatchClient(serviceclient.UAServiceClient): cfg_url_base_attr = "livepatch_url" api_error_cls = exceptions.UrlError def is_kernel_supported( self, version: str, flavor: str, arch: str, codename: str ) -> Optional[bool]: """ :returns: True if supported False if unsupported None if API returns error or ambiguous response """ query_params = { "kernel-version": version, "flavour": flavor, "architecture": arch, "codename": codename, } headers = self.headers() try: result, _headers = self.request_url( LIVEPATCH_API_V1_KERNELS_SUPPORTED, query_params=query_params, headers=headers, ) except Exception as e: with util.disable_log_to_console(): logging.warning( "error while checking livepatch supported kernels API" ) logging.warning(e) return None if not isinstance(result, dict): logging.warning( "livepatch api returned something that isn't a dict" ) return None return bool(result.get("Supported", False)) def _on_supported_kernel_cli() -> Optional[bool]: lp_status = status() if lp_status is None: return None if lp_status.supported == "supported": return True if lp_status.supported == "unsupported": return False return None def _on_supported_kernel_cache( version: str, flavor: str, arch: str, codename: str ) -> Tuple[bool, Optional[bool]]: """Check local cache of kernel support :return: (is_cache_valid, result) """ try: cache_data = state_files.livepatch_support_cache.read() except Exception: cache_data = None if cache_data is not None: one_week_ago = datetime.datetime.now( datetime.timezone.utc ) - datetime.timedelta(days=7) if all( [ cache_data.cached_at > one_week_ago, # less than one week old cache_data.version == version, cache_data.flavor == flavor, cache_data.arch == arch, cache_data.codename == codename, ] ): if cache_data.supported is None: with util.disable_log_to_console(): logging.warning( "livepatch kernel support cache has None value" ) return (True, cache_data.supported) return (False, None) def _on_supported_kernel_api( version: str, flavor: str, arch: str, codename: str ) -> Optional[bool]: supported = UALivepatchClient().is_kernel_supported( version=version, flavor=flavor, arch=arch, codename=codename, ) # cache response before returning state_files.livepatch_support_cache.write( state_files.LivepatchSupportCacheData( version=version, flavor=flavor, arch=arch, codename=codename, supported=supported, cached_at=datetime.datetime.now(datetime.timezone.utc), ) ) if supported is None: with util.disable_log_to_console(): logging.warning( "livepatch kernel support API response was ambiguous" ) return supported @lru_cache(maxsize=None) def on_supported_kernel() -> Optional[bool]: """ Checks CLI, local cache, and API in that order for kernel support If all checks fail to return an authoritative answer, we return None """ # first check cli cli_says = _on_supported_kernel_cli() if cli_says is not None: logging.debug("using livepatch cli for support") return cli_says # gather required system info to query support kernel_info = system.get_kernel_info() if ( kernel_info.flavor is None or kernel_info.major is None or kernel_info.minor is None ): logging.warning( "unable to determine enough kernel information to " "check livepatch support" ) return None arch = util.standardize_arch_name(system.get_dpkg_arch()) codename = system.get_platform_info()["series"] lp_api_kernel_ver = "{major}.{minor}".format( major=kernel_info.major, minor=kernel_info.minor ) # second check cache is_cache_valid, cache_says = _on_supported_kernel_cache( lp_api_kernel_ver, kernel_info.flavor, arch, codename ) if is_cache_valid: logging.debug("using livepatch support cache") return cache_says # finally check api logging.debug("using livepatch support api") return _on_supported_kernel_api( lp_api_kernel_ver, kernel_info.flavor, arch, codename ) def unconfigure_livepatch_proxy( protocol_type: str, retry_sleeps: Optional[List[float]] = None ) -> None: """ Unset livepatch configuration settings for http and https proxies. :param protocol_type: String either http or https :param retry_sleeps: Optional list of sleep lengths to apply between retries. Specifying a list of [0.5, 1] tells subp to retry twice on failure; sleeping half a second before the first retry and 1 second before the second retry. """ if not is_livepatch_installed(): return system.subp( [LIVEPATCH_CMD, "config", "{}-proxy=".format(protocol_type)], retry_sleeps=retry_sleeps, ) def configure_livepatch_proxy( http_proxy: Optional[str] = None, https_proxy: Optional[str] = None, retry_sleeps: Optional[List[float]] = None, ) -> None: """ Configure livepatch to use http and https proxies. :param http_proxy: http proxy to be used by livepatch. If None, it will not be configured :param https_proxy: https proxy to be used by livepatch. If None, it will not be configured :@param retry_sleeps: Optional list of sleep lengths to apply between snap calls """ from uaclient.entitlements import LivepatchEntitlement if http_proxy or https_proxy: event.info( messages.SETTING_SERVICE_PROXY.format( service=LivepatchEntitlement.title ) ) if http_proxy: system.subp( [LIVEPATCH_CMD, "config", "http-proxy={}".format(http_proxy)], retry_sleeps=retry_sleeps, ) if https_proxy: system.subp( [LIVEPATCH_CMD, "config", "https-proxy={}".format(https_proxy)], retry_sleeps=retry_sleeps, ) def get_config_option_value(key: str) -> Optional[str]: """ Gets the config value from livepatch. :param key: can be any valid livepatch config option :return: the value of the livepatch config option, or None if not set """ out, _ = system.subp([LIVEPATCH_CMD, "config"]) match = re.search("^{}: (.*)$".format(key), out, re.MULTILINE) value = match.group(1) if match else None if value: # remove quotes if present value = re.sub(r"\"(.*)\"", r"\g<1>", value) return value.strip() if value else None def is_livepatch_installed() -> bool: return system.which(LIVEPATCH_CMD) is not None
| ver. 1.4 |
Github
|
.
| PHP 7.4.3-4ubuntu2.28 | Генерация страницы: 0 |
proxy
|
phpinfo
|
Настройка