[ Avaa Bypassed ]




Upload:

Command:

www-data@3.139.85.192: ~ $
# -*- coding: utf-8 -*-

# Copyright 2011 OpenStack Foundation.
# Copyright 2011 Justin Santa Barbara
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import errno
import fcntl
import multiprocessing
import os
import shutil
import signal
import tempfile
import threading
import time

from fasteners import process_lock as pl
from fasteners import test


class BrokenLock(pl.InterProcessLock):
    def __init__(self, name, errno_code):
        super(BrokenLock, self).__init__(name)
        self.errno_code = errno_code

    def unlock(self):
        pass

    def trylock(self):
        err = IOError()
        err.errno = self.errno_code
        raise err


class ProcessLockTest(test.TestCase):
    def setUp(self):
        super(ProcessLockTest, self).setUp()
        self.lock_dir = tempfile.mkdtemp()
        self.tmp_dirs = [self.lock_dir]

    def tearDown(self):
        super(ProcessLockTest, self).tearDown()
        for a_dir in reversed(self.tmp_dirs):
            if os.path.exists(a_dir):
                shutil.rmtree(a_dir, ignore_errors=True)

    def test_lock_acquire_release_file_lock(self):
        lock_file = os.path.join(self.lock_dir, 'lock')
        lock = pl.InterProcessLock(lock_file)

        def try_lock():
            try:
                my_lock = pl.InterProcessLock(lock_file)
                my_lock.lockfile = open(lock_file, 'w')
                my_lock.trylock()
                my_lock.unlock()
                os._exit(1)
            except IOError:
                os._exit(0)

        def attempt_acquire(count):
            children = []
            for i in range(count):
                child = multiprocessing.Process(target=try_lock)
                child.start()
                children.append(child)
            exit_codes = []
            for child in children:
                child.join()
                exit_codes.append(child.exitcode)
            return sum(exit_codes)

        self.assertTrue(lock.acquire())
        try:
            acquired_children = attempt_acquire(10)
            self.assertEqual(0, acquired_children)
        finally:
            lock.release()

        acquired_children = attempt_acquire(5)
        self.assertNotEqual(0, acquired_children)

    def test_nested_synchronized_external_works(self):
        sentinel = object()

        @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-1'))
        def outer_lock():

            @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-2'))
            def inner_lock():
                return sentinel

            return inner_lock()

        self.assertEqual(sentinel, outer_lock())

    def _do_test_lock_externally(self, lock_dir):
        lock_path = os.path.join(lock_dir, "lock")

        def lock_files(handles_dir):
            with pl.InterProcessLock(lock_path):

                # Open some files we can use for locking
                handles = []
                for n in range(50):
                    path = os.path.join(handles_dir, ('file-%s' % n))
                    handles.append(open(path, 'w'))

                # Loop over all the handles and try locking the file
                # without blocking, keep a count of how many files we
                # were able to lock and then unlock. If the lock fails
                # we get an IOError and bail out with bad exit code
                count = 0
                for handle in handles:
                    try:
                        fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
                        count += 1
                        fcntl.flock(handle, fcntl.LOCK_UN)
                    except IOError:
                        os._exit(2)
                    finally:
                        handle.close()

                # Check if we were able to open all files
                self.assertEqual(50, count)

        handles_dir = tempfile.mkdtemp()
        self.tmp_dirs.append(handles_dir)
        children = []
        for n in range(50):
            pid = os.fork()
            if pid:
                children.append(pid)
            else:
                try:
                    lock_files(handles_dir)
                finally:
                    os._exit(0)
        for child in children:
            (pid, status) = os.waitpid(child, 0)
            if pid:
                self.assertEqual(0, status)

    def test_lock_externally(self):
        self._do_test_lock_externally(self.lock_dir)

    def test_lock_externally_lock_dir_not_exist(self):
        os.rmdir(self.lock_dir)
        self._do_test_lock_externally(self.lock_dir)

    def test_lock_file_exists(self):
        lock_file = os.path.join(self.lock_dir, 'lock')

        @pl.interprocess_locked(lock_file)
        def foo():
            self.assertTrue(os.path.exists(lock_file))

        foo()

    def test_bad_acquire(self):
        lock_file = os.path.join(self.lock_dir, 'lock')
        lock = BrokenLock(lock_file, errno.EBUSY)
        self.assertRaises(threading.ThreadError, lock.acquire)

    def test_bad_release(self):
        lock_file = os.path.join(self.lock_dir, 'lock')
        lock = pl.InterProcessLock(lock_file)
        self.assertRaises(threading.ThreadError, lock.release)

    def test_interprocess_lock(self):
        lock_file = os.path.join(self.lock_dir, 'lock')

        pid = os.fork()
        if pid:
            # Make sure the child grabs the lock first
            start = time.time()
            while not os.path.exists(lock_file):
                if time.time() - start > 5:
                    self.fail('Timed out waiting for child to grab lock')
                time.sleep(0)
            lock1 = pl.InterProcessLock('foo')
            lock1.lockfile = open(lock_file, 'w')
            # NOTE(bnemec): There is a brief window between when the lock file
            # is created and when it actually becomes locked.  If we happen to
            # context switch in that window we may succeed in locking the
            # file. Keep retrying until we either get the expected exception
            # or timeout waiting.
            while time.time() - start < 5:
                try:
                    lock1.trylock()
                    lock1.unlock()
                    time.sleep(0)
                except IOError:
                    # This is what we expect to happen
                    break
            else:
                self.fail('Never caught expected lock exception')
            # We don't need to wait for the full sleep in the child here
            os.kill(pid, signal.SIGKILL)
        else:
            try:
                lock2 = pl.InterProcessLock('foo')
                lock2.lockfile = open(lock_file, 'w')
                have_lock = False
                while not have_lock:
                    try:
                        lock2.trylock()
                        have_lock = True
                    except IOError:
                        pass
            finally:
                # NOTE(bnemec): This is racy, but I don't want to add any
                # synchronization primitives that might mask a problem
                # with the one we're trying to test here.
                time.sleep(.5)
                os._exit(0)

    def test_non_destructive(self):
        lock_file = os.path.join(self.lock_dir, 'not-destroyed')
        with open(lock_file, 'w') as f:
            f.write('test')
        with pl.InterProcessLock(lock_file):
            with open(lock_file) as f:
                self.assertEqual(f.read(), 'test')

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 0 B 0644
test_decorators.py File 2.49 KB 0644
test_helpers.py File 995 B 0644
test_lock.py File 13.3 KB 0644
test_process_lock.py File 7.86 KB 0644