[ Avaa Bypassed ]




Upload:

Command:

www-data@18.218.241.211: ~ $
import datetime
import json
import logging
import re
from functools import lru_cache
from typing import List, Optional, Tuple

from uaclient import (
    event_logger,
    exceptions,
    messages,
    serviceclient,
    system,
    util,
)
from uaclient.data_types import (
    BoolDataValue,
    DataObject,
    Field,
    IncorrectTypeError,
    StringDataValue,
    data_list,
)
from uaclient.files import state_files

HTTP_PROXY_OPTION = "http-proxy"
HTTPS_PROXY_OPTION = "https-proxy"

LIVEPATCH_CMD = "/snap/bin/canonical-livepatch"

LIVEPATCH_API_V1_KERNELS_SUPPORTED = "/v1/api/kernels/supported"

event = event_logger.get_event_logger()


class LivepatchPatchFixStatus(DataObject):
    fields = [
        Field("name", StringDataValue, required=False, dict_key="Name"),
        Field("patched", BoolDataValue, required=False, dict_key="Patched"),
    ]

    def __init__(
        self,
        name: Optional[str],
        patched: Optional[bool],
    ):
        self.name = name
        self.patched = patched


class LivepatchPatchStatus(DataObject):
    fields = [
        Field("state", StringDataValue, required=False, dict_key="State"),
        Field(
            "fixes",
            data_list(LivepatchPatchFixStatus),
            required=False,
            dict_key="Fixes",
        ),
    ]

    def __init__(
        self,
        state: Optional[str],
        fixes: Optional[List[LivepatchPatchFixStatus]],
    ):
        self.state = state
        self.fixes = fixes


class LivepatchStatusStatus(DataObject):
    fields = [
        Field("kernel", StringDataValue, required=False, dict_key="Kernel"),
        Field(
            "livepatch",
            LivepatchPatchStatus,
            required=False,
            dict_key="Livepatch",
        ),
        Field(
            "supported",
            StringDataValue,
            required=False,
            dict_key="Supported",
        ),
    ]

    def __init__(
        self,
        kernel: Optional[str],
        livepatch: Optional[LivepatchPatchStatus],
        supported: Optional[str],
    ):
        self.kernel = kernel
        self.livepatch = livepatch
        self.supported = supported


class LivepatchStatus(DataObject):
    fields = [
        Field(
            "status",
            data_list(LivepatchStatusStatus),
            required=False,
            dict_key="Status",
        ),
    ]

    def __init__(
        self,
        status: Optional[List[LivepatchStatusStatus]],
    ):
        self.status = status


def status() -> Optional[LivepatchStatusStatus]:
    if not is_livepatch_installed():
        logging.debug("canonical-livepatch is not installed")
        return None

    try:
        out, _ = system.subp([LIVEPATCH_CMD, "status", "--format", "json"])
    except exceptions.ProcessExecutionError:
        with util.disable_log_to_console():
            logging.warning(
                "canonical-livepatch returned error when checking status"
            )
        return None

    try:
        status_json = json.loads(out)
    except json.JSONDecodeError:
        with util.disable_log_to_console():
            logging.warning(
                "canonical-livepatch status returned invalid json: {}".format(
                    out
                )
            )
        return None

    try:
        status_root = LivepatchStatus.from_dict(status_json)
    except IncorrectTypeError:
        with util.disable_log_to_console():
            logging.warning(
                "canonical-livepatch status returned unexpected "
                "structure: {}".format(out)
            )
        return None

    if status_root.status is None or len(status_root.status) < 1:
        logging.debug("canonical-livepatch has no status")
        return None

    return status_root.status[0]


class UALivepatchClient(serviceclient.UAServiceClient):

    cfg_url_base_attr = "livepatch_url"
    api_error_cls = exceptions.UrlError

    def is_kernel_supported(
        self, version: str, flavor: str, arch: str, codename: str
    ) -> Optional[bool]:
        """
        :returns: True if supported
                  False if unsupported
                  None if API returns error or ambiguous response
        """
        query_params = {
            "kernel-version": version,
            "flavour": flavor,
            "architecture": arch,
            "codename": codename,
        }
        headers = self.headers()
        try:
            result, _headers = self.request_url(
                LIVEPATCH_API_V1_KERNELS_SUPPORTED,
                query_params=query_params,
                headers=headers,
            )
        except Exception as e:
            with util.disable_log_to_console():
                logging.warning(
                    "error while checking livepatch supported kernels API"
                )
                logging.warning(e)
            return None

        if not isinstance(result, dict):
            logging.warning(
                "livepatch api returned something that isn't a dict"
            )
            return None

        return bool(result.get("Supported", False))


def _on_supported_kernel_cli() -> Optional[bool]:
    lp_status = status()
    if lp_status is None:
        return None
    if lp_status.supported == "supported":
        return True
    if lp_status.supported == "unsupported":
        return False
    return None


def _on_supported_kernel_cache(
    version: str, flavor: str, arch: str, codename: str
) -> Tuple[bool, Optional[bool]]:
    """Check local cache of kernel support

    :return: (is_cache_valid, result)
    """
    try:
        cache_data = state_files.livepatch_support_cache.read()
    except Exception:
        cache_data = None

    if cache_data is not None:
        one_week_ago = datetime.datetime.now(
            datetime.timezone.utc
        ) - datetime.timedelta(days=7)
        if all(
            [
                cache_data.cached_at > one_week_ago,  # less than one week old
                cache_data.version == version,
                cache_data.flavor == flavor,
                cache_data.arch == arch,
                cache_data.codename == codename,
            ]
        ):
            if cache_data.supported is None:
                with util.disable_log_to_console():
                    logging.warning(
                        "livepatch kernel support cache has None value"
                    )
            return (True, cache_data.supported)
    return (False, None)


def _on_supported_kernel_api(
    version: str, flavor: str, arch: str, codename: str
) -> Optional[bool]:
    supported = UALivepatchClient().is_kernel_supported(
        version=version,
        flavor=flavor,
        arch=arch,
        codename=codename,
    )

    # cache response before returning
    state_files.livepatch_support_cache.write(
        state_files.LivepatchSupportCacheData(
            version=version,
            flavor=flavor,
            arch=arch,
            codename=codename,
            supported=supported,
            cached_at=datetime.datetime.now(datetime.timezone.utc),
        )
    )

    if supported is None:
        with util.disable_log_to_console():
            logging.warning(
                "livepatch kernel support API response was ambiguous"
            )
    return supported


@lru_cache(maxsize=None)
def on_supported_kernel() -> Optional[bool]:
    """
    Checks CLI, local cache, and API in that order for kernel support
    If all checks fail to return an authoritative answer, we return None
    """

    # first check cli
    cli_says = _on_supported_kernel_cli()
    if cli_says is not None:
        logging.debug("using livepatch cli for support")
        return cli_says

    # gather required system info to query support
    kernel_info = system.get_kernel_info()
    if (
        kernel_info.flavor is None
        or kernel_info.major is None
        or kernel_info.minor is None
    ):
        logging.warning(
            "unable to determine enough kernel information to "
            "check livepatch support"
        )
        return None

    arch = util.standardize_arch_name(system.get_dpkg_arch())
    codename = system.get_platform_info()["series"]

    lp_api_kernel_ver = "{major}.{minor}".format(
        major=kernel_info.major, minor=kernel_info.minor
    )

    # second check cache
    is_cache_valid, cache_says = _on_supported_kernel_cache(
        lp_api_kernel_ver, kernel_info.flavor, arch, codename
    )
    if is_cache_valid:
        logging.debug("using livepatch support cache")
        return cache_says

    # finally check api
    logging.debug("using livepatch support api")
    return _on_supported_kernel_api(
        lp_api_kernel_ver, kernel_info.flavor, arch, codename
    )


def unconfigure_livepatch_proxy(
    protocol_type: str, retry_sleeps: Optional[List[float]] = None
) -> None:
    """
    Unset livepatch configuration settings for http and https proxies.

    :param protocol_type: String either http or https
    :param retry_sleeps: Optional list of sleep lengths to apply between
        retries. Specifying a list of [0.5, 1] tells subp to retry twice
        on failure; sleeping half a second before the first retry and 1 second
        before the second retry.
    """
    if not is_livepatch_installed():
        return
    system.subp(
        [LIVEPATCH_CMD, "config", "{}-proxy=".format(protocol_type)],
        retry_sleeps=retry_sleeps,
    )


def configure_livepatch_proxy(
    http_proxy: Optional[str] = None,
    https_proxy: Optional[str] = None,
    retry_sleeps: Optional[List[float]] = None,
) -> None:
    """
    Configure livepatch to use http and https proxies.

    :param http_proxy: http proxy to be used by livepatch. If None, it will
                       not be configured
    :param https_proxy: https proxy to be used by livepatch. If None, it will
                        not be configured
    :@param retry_sleeps: Optional list of sleep lengths to apply between
                               snap calls
    """
    from uaclient.entitlements import LivepatchEntitlement

    if http_proxy or https_proxy:
        event.info(
            messages.SETTING_SERVICE_PROXY.format(
                service=LivepatchEntitlement.title
            )
        )

    if http_proxy:
        system.subp(
            [LIVEPATCH_CMD, "config", "http-proxy={}".format(http_proxy)],
            retry_sleeps=retry_sleeps,
        )

    if https_proxy:
        system.subp(
            [LIVEPATCH_CMD, "config", "https-proxy={}".format(https_proxy)],
            retry_sleeps=retry_sleeps,
        )


def get_config_option_value(key: str) -> Optional[str]:
    """
    Gets the config value from livepatch.
    :param key: can be any valid livepatch config option
    :return: the value of the livepatch config option, or None if not set
    """
    out, _ = system.subp([LIVEPATCH_CMD, "config"])
    match = re.search("^{}: (.*)$".format(key), out, re.MULTILINE)
    value = match.group(1) if match else None
    if value:
        # remove quotes if present
        value = re.sub(r"\"(.*)\"", r"\g<1>", value)
    return value.strip() if value else None


def is_livepatch_installed() -> bool:
    return system.which(LIVEPATCH_CMD) is not None

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
api Folder 0755
clouds Folder 0755
daemon Folder 0755
entitlements Folder 0755
files Folder 0755
jobs Folder 0755
__init__.py File 0 B 0644
actions.py File 8.19 KB 0644
apt.py File 25.74 KB 0644
apt_news.py File 6.33 KB 0644
cli.py File 64.22 KB 0644
config.py File 24.5 KB 0644
contract.py File 27.47 KB 0644
contract_data_types.py File 9.38 KB 0644
data_types.py File 10.3 KB 0644
defaults.py File 2.46 KB 0644
event_logger.py File 7.75 KB 0644
exceptions.py File 13.56 KB 0644
gpg.py File 813 B 0644
livepatch.py File 11.03 KB 0644
lock.py File 3.58 KB 0644
log.py File 1.89 KB 0644
messages.py File 38.47 KB 0644
pip.py File 756 B 0644
security.py File 48.75 KB 0644
security_status.py File 24.19 KB 0644
serviceclient.py File 6.22 KB 0644
snap.py File 4.06 KB 0644
status.py File 25.73 KB 0644
system.py File 17.09 KB 0644
types.py File 308 B 0644
util.py File 20.3 KB 0644
version.py File 2.81 KB 0644
yaml.py File 642 B 0644