[ Avaa Bypassed ]




Upload:

Command:

www-data@3.139.85.192: ~ $
"""Tests for certbot.plugins.standalone."""
import socket
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
from socket import errno as socket_errors  # type: ignore
import unittest

import josepy as jose
import mock
import six

import OpenSSL.crypto  # pylint: disable=unused-import

from acme import challenges
from acme import standalone as acme_standalone  # pylint: disable=unused-import
from acme.magic_typing import Dict, Tuple, Set  # pylint: disable=unused-import, no-name-in-module

from certbot import achallenges
from certbot import errors

from certbot.tests import acme_util
from certbot.tests import util as test_util


class ServerManagerTest(unittest.TestCase):
    """Tests for certbot.plugins.standalone.ServerManager."""

    def setUp(self):
        from certbot.plugins.standalone import ServerManager
        self.certs = {}  # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
        self.http_01_resources = {} \
        # type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
        self.mgr = ServerManager(self.certs, self.http_01_resources)

    def test_init(self):
        self.assertTrue(self.mgr.certs is self.certs)
        self.assertTrue(
            self.mgr.http_01_resources is self.http_01_resources)

    def _test_run_stop(self, challenge_type):
        server = self.mgr.run(port=0, challenge_type=challenge_type)
        port = server.getsocknames()[0][1]  # pylint: disable=no-member
        self.assertEqual(self.mgr.running(), {port: server})
        self.mgr.stop(port=port)
        self.assertEqual(self.mgr.running(), {})

    def test_run_stop_http_01(self):
        self._test_run_stop(challenges.HTTP01)

    def test_run_idempotent(self):
        server = self.mgr.run(port=0, challenge_type=challenges.HTTP01)
        port = server.getsocknames()[0][1]  # pylint: disable=no-member
        server2 = self.mgr.run(port=port, challenge_type=challenges.HTTP01)
        self.assertEqual(self.mgr.running(), {port: server})
        self.assertTrue(server is server2)
        self.mgr.stop(port)
        self.assertEqual(self.mgr.running(), {})

    def test_run_bind_error(self):
        some_server = socket.socket(socket.AF_INET6)
        some_server.bind(("", 0))
        port = some_server.getsockname()[1]
        maybe_another_server = socket.socket()
        try:
            maybe_another_server.bind(("", port))
        except socket.error:
            pass
        self.assertRaises(
            errors.StandaloneBindError, self.mgr.run, port,
            challenge_type=challenges.HTTP01)
        self.assertEqual(self.mgr.running(), {})
        some_server.close()
        maybe_another_server.close()


def get_open_port():
    """Gets an open port number from the OS."""
    open_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    open_socket.bind(("", 0))
    port = open_socket.getsockname()[1]
    open_socket.close()
    return port


class AuthenticatorTest(unittest.TestCase):
    """Tests for certbot.plugins.standalone.Authenticator."""

    def setUp(self):
        from certbot.plugins.standalone import Authenticator

        self.config = mock.MagicMock(http01_port=get_open_port())
        self.auth = Authenticator(self.config, name="standalone")
        self.auth.servers = mock.MagicMock()

    def test_more_info(self):
        self.assertTrue(isinstance(self.auth.more_info(), six.string_types))

    def test_get_chall_pref(self):
        self.assertEqual(self.auth.get_chall_pref(domain=None),
                         [challenges.HTTP01])

    def test_perform(self):
        achalls = self._get_achalls()
        response = self.auth.perform(achalls)

        expected = [achall.response(achall.account_key) for achall in achalls]
        self.assertEqual(response, expected)

    @test_util.patch_get_utility()
    def test_perform_eaddrinuse_retry(self, mock_get_utility):
        mock_utility = mock_get_utility()
        errno = socket_errors.EADDRINUSE
        error = errors.StandaloneBindError(mock.MagicMock(errno=errno), -1)
        self.auth.servers.run.side_effect = [error] + 2 * [mock.MagicMock()]
        mock_yesno = mock_utility.yesno
        mock_yesno.return_value = True

        self.test_perform()
        self._assert_correct_yesno_call(mock_yesno)

    @test_util.patch_get_utility()
    def test_perform_eaddrinuse_no_retry(self, mock_get_utility):
        mock_utility = mock_get_utility()
        mock_yesno = mock_utility.yesno
        mock_yesno.return_value = False

        errno = socket_errors.EADDRINUSE
        self.assertRaises(errors.PluginError, self._fail_perform, errno)
        self._assert_correct_yesno_call(mock_yesno)

    def _assert_correct_yesno_call(self, mock_yesno):
        yesno_args, yesno_kwargs = mock_yesno.call_args
        self.assertTrue("in use" in yesno_args[0])
        self.assertFalse(yesno_kwargs.get("default", True))

    def test_perform_eacces(self):
        errno = socket_errors.EACCES
        self.assertRaises(errors.PluginError, self._fail_perform, errno)

    def test_perform_unexpected_socket_error(self):
        errno = socket_errors.ENOTCONN
        self.assertRaises(
            errors.StandaloneBindError, self._fail_perform, errno)

    def _fail_perform(self, errno):
        error = errors.StandaloneBindError(mock.MagicMock(errno=errno), -1)
        self.auth.servers.run.side_effect = error
        self.auth.perform(self._get_achalls())

    @classmethod
    def _get_achalls(cls):
        domain = b'localhost'
        key = jose.JWK.load(test_util.load_vector('rsa512_key.pem'))
        http_01 = achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.HTTP01_P, domain=domain, account_key=key)

        return [http_01]

    def test_cleanup(self):
        self.auth.servers.running.return_value = {
            1: "server1",
            2: "server2",
        }
        self.auth.served["server1"].add("chall1")
        self.auth.served["server2"].update(["chall2", "chall3"])

        self.auth.cleanup(["chall1"])
        self.assertEqual(self.auth.served, {
            "server1": set(), "server2": set(["chall2", "chall3"])})
        self.auth.servers.stop.assert_called_once_with(1)

        self.auth.servers.running.return_value = {
            2: "server2",
        }
        self.auth.cleanup(["chall2"])
        self.assertEqual(self.auth.served, {
            "server1": set(), "server2": set(["chall3"])})
        self.assertEqual(1, self.auth.servers.stop.call_count)

        self.auth.cleanup(["chall3"])
        self.assertEqual(self.auth.served, {
            "server1": set(), "server2": set([])})
        self.auth.servers.stop.assert_called_with(2)


if __name__ == "__main__":
    unittest.main()  # pragma: no cover

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 30 B 0644
common.py File 18.68 KB 0644
common_test.py File 14.89 KB 0644
disco.py File 9.96 KB 0644
disco_test.py File 11.34 KB 0644
dns_common.py File 11.69 KB 0644
dns_common_lexicon.py File 5.41 KB 0644
dns_common_lexicon_test.py File 651 B 0644
dns_common_test.py File 8.18 KB 0644
dns_test_common.py File 1.65 KB 0644
dns_test_common_lexicon.py File 5.48 KB 0644
enhancements.py File 5.58 KB 0644
enhancements_test.py File 2.36 KB 0644
manual.py File 7.88 KB 0644
manual_test.py File 5.6 KB 0644
null.py File 1.27 KB 0644
null_test.py File 624 B 0644
selection.py File 13.55 KB 0644
selection_test.py File 7.78 KB 0644
standalone.py File 7.72 KB 0644
standalone_test.py File 6.64 KB 0644
storage.py File 4.14 KB 0644
storage_test.py File 5.47 KB 0644
util.py File 1.69 KB 0644
util_test.py File 1.81 KB 0644
webroot.py File 12.08 KB 0644
webroot_test.py File 13.11 KB 0644