[ Avaa Bypassed ]




Upload:

Command:

www-data@18.118.226.34: ~ $
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2009 Michael Terry <mike@mterry.name>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

import os
import subprocess
import atexit
import signal

import duplicity.backend
from duplicity import log
from duplicity import util


def ensure_dbus():
    # GIO requires a dbus session bus which can start the gvfs daemons
    # when required.  So we make sure that such a bus exists and that our
    # environment points to it.
    if u'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
        p = subprocess.Popen([u'dbus-launch'], stdout=subprocess.PIPE, universal_newlines=True)
        output = p.communicate()[0].decode("utf8", errors="replace")
        lines = output.split(u'\n')
        for line in lines:
            parts = line.split(u'=', 1)
            if len(parts) == 2:
                if parts[0] == u'DBUS_SESSION_BUS_PID':  # cleanup at end
                    atexit.register(os.kill, int(parts[1]), signal.SIGTERM)
                os.environ[parts[0]] = parts[1]


class GIOBackend(duplicity.backend.Backend):
    u"""Use this backend when saving to a GIO URL.
       This is a bit of a meta-backend, in that it can handle multiple schemas.
       URLs look like schema://user@server/path.
    """
    def __init__(self, parsed_url):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        from gi.repository import GLib  # @UnresolvedImport  # pylint: disable=import-error

        class DupMountOperation(Gio.MountOperation):
            u"""A simple MountOperation that grabs the password from the environment
               or the user.
            """
            def __init__(self, backend):
                Gio.MountOperation.__init__(self)
                self.backend = backend
                self.connect(u'ask-password', self.ask_password_cb)
                self.connect(u'ask-question', self.ask_question_cb)

            def ask_password_cb(self, *args, **kwargs):
                self.set_password(self.backend.get_password())
                self.reply(Gio.MountOperationResult.HANDLED)

            def ask_question_cb(self, *args, **kwargs):
                # Obviously just always answering with the first choice is a naive
                # approach.  But there's no easy way to allow for answering questions
                # in duplicity's typical run-from-cron mode with environment variables.
                # And only a couple gvfs backends ask questions: 'sftp' does about
                # new hosts and 'afc' does if the device is locked.  0 should be a
                # safe choice.
                self.set_choice(0)
                self.reply(Gio.MountOperationResult.HANDLED)

        duplicity.backend.Backend.__init__(self, parsed_url)

        ensure_dbus()

        self.remote_file = Gio.File.new_for_uri(parsed_url.url_string)

        # Now we make sure the location is mounted
        op = DupMountOperation(self)
        loop = GLib.MainLoop()
        self.remote_file.mount_enclosing_volume(Gio.MountMountFlags.NONE,
                                                op, None,
                                                self.__done_with_mount, loop)
        loop.run()  # halt program until we're done mounting

        # Now make the directory if it doesn't exist
        try:
            self.remote_file.make_directory_with_parents(None)
        except GLib.GError as e:
            if e.code != Gio.IOErrorEnum.EXISTS:
                raise

    def __done_with_mount(self, fileobj, result, loop):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        from gi.repository import GLib  # @UnresolvedImport  # pylint: disable=import-error
        try:
            fileobj.mount_enclosing_volume_finish(result)
        except GLib.GError as e:
            # check for NOT_SUPPORTED because some schemas (e.g. file://) validly don't
            if e.code != Gio.IOErrorEnum.ALREADY_MOUNTED and e.code != Gio.IOErrorEnum.NOT_SUPPORTED:
                log.FatalError(_(u"Connection failed, please check your password: %s")
                               % util.uexc(e), log.ErrorCode.connection_failed)
        loop.quit()

    def __copy_progress(self, *args, **kwargs):
        pass

    def __copy_file(self, source, target):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        # Don't pass NOFOLLOW_SYMLINKS here. Some backends (e.g. google-drive:)
        # use symlinks internally for all files. In the normal course of
        # events, we never deal with symlinks anyway, just tarballs.
        source.copy(target,
                    Gio.FileCopyFlags.OVERWRITE,
                    None, self.__copy_progress, None)

    def _error_code(self, operation, e):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        from gi.repository import GLib  # @UnresolvedImport  # pylint: disable=import-error
        if isinstance(e, GLib.GError):
            if e.code == Gio.IOErrorEnum.FAILED and operation == u'delete':
                # Sometimes delete will return a generic failure on a file not
                # found (notably the FTP does that)
                return log.ErrorCode.backend_not_found
            elif e.code == Gio.IOErrorEnum.PERMISSION_DENIED:
                return log.ErrorCode.backend_permission_denied
            elif e.code == Gio.IOErrorEnum.NOT_FOUND:
                return log.ErrorCode.backend_not_found
            elif e.code == Gio.IOErrorEnum.NO_SPACE:
                return log.ErrorCode.backend_no_space

    def _put(self, source_path, remote_filename):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        source_file = Gio.File.new_for_path(source_path.name)
        target_file = self.remote_file.get_child_for_display_name(util.fsdecode(remote_filename))
        self.__copy_file(source_file, target_file)

    def _get(self, filename, local_path):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        source_file = self.remote_file.get_child_for_display_name(util.fsdecode(filename))
        target_file = Gio.File.new_for_path(local_path.name)
        self.__copy_file(source_file, target_file)

    def _list(self):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        files = []
        # We grab display name, rather than file name because some backends
        # (e.g. google-drive:) use filesystem-specific IDs as file names and
        # only expose the "normal" name as display names. We need the display
        # name, because we try to parse them.
        enum = self.remote_file.enumerate_children(Gio.FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME,
                                                   Gio.FileQueryInfoFlags.NONE,
                                                   None)
        info = enum.next_file(None)
        while info:
            files.append(info.get_display_name())
            info = enum.next_file(None)
        return files

    def _delete(self, filename):
        target_file = self.remote_file.get_child_for_display_name(util.fsdecode(filename))
        target_file.delete(None)

    def _query(self, filename):
        from gi.repository import Gio  # @UnresolvedImport  # pylint: disable=import-error
        target_file = self.remote_file.get_child_for_display_name(util.fsdecode(filename))
        info = target_file.query_info(Gio.FILE_ATTRIBUTE_STANDARD_SIZE,
                                      Gio.FileQueryInfoFlags.NONE, None)
        return {u'size': info.get_size()}


duplicity.backend.register_backend_prefix(u'gio', GIOBackend)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
pyrax_identity Folder 0755
__init__.py File 1.07 KB 0644
_boto_multi.py File 9.52 KB 0644
_boto_single.py File 14.07 KB 0644
_cf_cloudfiles.py File 3.81 KB 0644
_cf_pyrax.py File 5.07 KB 0644
adbackend.py File 16.97 KB 0644
azurebackend.py File 7.91 KB 0644
b2backend.py File 6.77 KB 0644
cfbackend.py File 1.11 KB 0644
dpbxbackend.py File 19.86 KB 0644
gdocsbackend.py File 9.12 KB 0644
giobackend.py File 8.25 KB 0644
hsibackend.py File 2.67 KB 0644
hubicbackend.py File 2.37 KB 0644
imapbackend.py File 9.83 KB 0644
jottacloudbackend.py File 5.53 KB 0644
lftpbackend.py File 9.79 KB 0644
localbackend.py File 2.65 KB 0644
mediafirebackend.py File 4.67 KB 0644
megabackend.py File 6.18 KB 0644
multibackend.py File 13.84 KB 0644
ncftpbackend.py File 5.56 KB 0644
onedrivebackend.py File 13.25 KB 0644
par2backend.py File 8.14 KB 0644
pcabackend.py File 8.96 KB 0644
pydrivebackend.py File 10.68 KB 0644
rclonebackend.py File 4.16 KB 0644
rsyncbackend.py File 6.44 KB 0644
s3_boto3_backend.py File 8.81 KB 0644
s3_boto_backend.py File 1.48 KB 0644
ssh_paramiko_backend.py File 18.33 KB 0644
ssh_pexpect_backend.py File 12.63 KB 0644
swiftbackend.py File 7.5 KB 0644
sxbackend.py File 2.3 KB 0644
tahoebackend.py File 2.61 KB 0644
webdavbackend.py File 19.73 KB 0644