[ Avaa Bypassed ]




Upload:

Command:

www-data@18.218.241.211: ~ $
import bisect
import copy
import pyatspi
import time
from gi.repository import GLib

from . import cmdnames
from . import chnames
from . import debug
from . import keybindings
from . import messages
from . import input_event
from . import orca_state
from . import settings_manager

_settingsManager = settings_manager.getManager()

# define 'live' property types
LIVE_OFF       = -1
LIVE_NONE      = 0
LIVE_POLITE    = 1
LIVE_ASSERTIVE = 2
LIVE_RUDE      = 3

# Seconds a message is held in the queue before it is discarded
MSG_KEEPALIVE_TIME = 45  # in seconds

# The number of messages that are cached and can later be reviewed via 
# LiveRegionManager.reviewLiveAnnouncement.
CACHE_SIZE = 9  # corresponds to one of nine key bindings

class PriorityQueue:
    """ This class represents a thread **UNSAFE** priority queue where priority
    is determined by the given integer priority.  The entries are also   
    maintained in chronological order. 

    TODO: experiment with Queue.Queue to make thread safe
    """
    def __init__(self):
        self.queue = []

    def enqueue(self, data, priority, obj):
        """ Add a new element to the queue according to 1) priority and
        2) timestamp. """
        bisect.insort_left(self.queue, (priority, time.time(), data, obj))
       
    def dequeue(self):
        """get the highest priority element from the queue.  """
        return self.queue.pop(0)

    def clear(self):
        """ Clear the queue """
        self.queue = []

    def purgeByKeepAlive(self):
        """ Purge items from the queue that are older than the keepalive 
        time """
        currenttime = time.time()
        myfilter = lambda item: item[1] + MSG_KEEPALIVE_TIME > currenttime
        self.queue = list(filter(myfilter, self.queue))

    def purgeByPriority(self, priority):
        """ Purge items from the queue that have a lower than or equal priority
        than the given argument """
        myfilter = lambda item: item[0] > priority
        self.queue = list(filter(myfilter, self.queue))

    def __len__(self):
        """ Return the length of the queue """
        return len(self.queue)


class LiveRegionManager:
    def __init__(self, script):
        self._script = script
        # message priority queue
        self.msg_queue = PriorityQueue()

        self.inputEventHandlers = self._getInputEventHandlers()
        self.keyBindings = self._getKeyBindings()

        # This is temporary.
        self.functions = [self.advancePoliteness,
                          self.setLivePolitenessOff,
                          self.toggleMonitoring,
                          self.reviewLiveAnnouncement]

        # Message cache.  Used to store up to 9 previous messages so user can
        # review if desired.
        self.msg_cache = []

        # User overrides for politeness settings.
        self._politenessOverrides = None
        self._restoreOverrides = None

        # last live obj to be announced
        self.lastliveobj = None

        # Used to track whether a user wants to monitor all live regions
        # Not to be confused with the global Gecko.liveRegionsOn which 
        # completely turns off live region support.  This one is based on
        # a user control by changing politeness levels to LIVE_OFF or back
        # to the bookmark or markup politeness value.
        self.monitoring = True

        # Set up politeness level overrides and subscribe to bookmarks
        # for load and save user events.
        # We are initialized after bookmarks so call the load handler once
        # to get initialized.
        #
        self.bookmarkLoadHandler()
        script.bookmarks.addSaveObserver(self.bookmarkSaveHandler)
        script.bookmarks.addLoadObserver(self.bookmarkLoadHandler)

    def _getInputEventHandlers(self):
        handlers = {}

        handlers["advanceLivePoliteness"] = \
            input_event.InputEventHandler(
                self.advancePoliteness,
                cmdnames.LIVE_REGIONS_ADVANCE_POLITENESS)

        handlers["setLivePolitenessOff"] = \
            input_event.InputEventHandler(
                self.setLivePolitenessOff,
                cmdnames.LIVE_REGIONS_SET_POLITENESS_OFF)

        handlers["monitorLiveRegions"] = \
            input_event.InputEventHandler(
                self.toggleMonitoring,
                cmdnames.LIVE_REGIONS_MONITOR)

        handlers["reviewLiveAnnouncement"] = \
            input_event.InputEventHandler(
                self.reviewLiveAnnouncement,
                cmdnames.LIVE_REGIONS_REVIEW)

        return handlers

    def _getKeyBindings(self):
        keyBindings = keybindings.KeyBindings()

        keyBindings.add(
            keybindings.KeyBinding(
                "backslash",
                keybindings.defaultModifierMask,
                keybindings.NO_MODIFIER_MASK,
                self.inputEventHandlers.get("advanceLivePoliteness")))

        keyBindings.add(
            keybindings.KeyBinding(
                "backslash",
                keybindings.defaultModifierMask,
                keybindings.SHIFT_MODIFIER_MASK,
                self.inputEventHandlers.get("setLivePolitenessOff")))

        keyBindings.add(
            keybindings.KeyBinding(
                "backslash",
                keybindings.defaultModifierMask,
                keybindings.ORCA_SHIFT_MODIFIER_MASK,
                self.inputEventHandlers.get("monitorLiveRegions")))

        for key in ["F1", "F2", "F3", "F4", "F5", "F6", "F7", "F8", "F9"]:
            keyBindings.add(
                keybindings.KeyBinding(
                    key,
                    keybindings.defaultModifierMask,
                    keybindings.ORCA_MODIFIER_MASK,
                    self.inputEventHandlers.get("reviewLiveAnnouncement")))

        return keyBindings

    def reset(self):
        # First we will purge our politeness override dictionary of LIVE_NONE
        # objects that are not registered for this page
        newpoliteness = {}
        currenturi = self._script.bookmarks.getURIKey()
        for key, value in self._politenessOverrides.items():
            if key[0] == currenturi or value != LIVE_NONE:
                newpoliteness[key] = value
        self._politenessOverrides = newpoliteness

    def bookmarkSaveHandler(self):
        """Bookmark save callback"""
        self._script.bookmarks.saveBookmarksToDisk(self._politenessOverrides,
                                                    filename='politeness')

    def bookmarkLoadHandler(self):
        """Bookmark load callback"""
        # readBookmarksFromDisk() returns None on error. Just initialize to an
        # empty dictionary if this is the case.
        self._politenessOverrides = \
        self._script.bookmarks.readBookmarksFromDisk(filename='politeness') \
        or {}

    def handleEvent(self, event):
        """Main live region event handler"""
        politeness = self._getLiveType(event.source)
        if politeness == LIVE_OFF:
            return
        if politeness == LIVE_NONE:
            # All the 'registered' LIVE_NONE objects will be set to off
            # if not monitoring.  We will ignore LIVE_NONE objects that 
            # arrive after the user switches off monitoring.
            if not self.monitoring:
                return
        elif politeness == LIVE_POLITE:
            # Nothing to do for now
            pass
        elif politeness ==  LIVE_ASSERTIVE:
            self.msg_queue.purgeByPriority(LIVE_POLITE)
        elif politeness == LIVE_RUDE:
            self.msg_queue.purgeByPriority(LIVE_ASSERTIVE)

        message = self._getMessage(event)
        if message:
            if len(self.msg_queue) == 0:
                GLib.timeout_add(100, self.pumpMessages)
            self.msg_queue.enqueue(message, politeness, event.source)

    def pumpMessages(self):
        """ Main gobject callback for live region support.  Handles both 
        purging the message queue and outputting any queued messages that
        were queued up in the handleEvent() method.
        """

        if len(self.msg_queue) > 0:
            debug.println(debug.eventDebugLevel, "\nvvvvv PRESENT LIVE REGION MESSAGE vvvvv")
            self.msg_queue.purgeByKeepAlive()
            politeness, timestamp, message, obj = self.msg_queue.dequeue()
            # Form output message.  No need to repeat labels and content.
            # TODO: really needs to be tested in real life cases.  Perhaps
            # a verbosity setting?
            if message['labels'] == message['content']:
                utts = message['content']
            else:
                utts = message['labels'] + message['content']

            if self.monitoring:
                self._script.presentMessage(utts)
            else:
                msg = "INFO: Not presenting message because monitoring is off"
                debug.println(debug.LEVEL_INFO, msg, True)

            # set the last live obj to be announced
            self.lastliveobj = obj

            # cache our message
            self._cacheMessage(utts)

        # We still want to maintain our queue if we are not monitoring
        if not self.monitoring:
            self.msg_queue.purgeByKeepAlive()

        msg = 'LIVE REGIONS: messages in queue: %i' % len(self.msg_queue)
        debug.println(debug.LEVEL_INFO, msg, True)
        debug.println(debug.eventDebugLevel, "^^^^^ PRESENT LIVE REGION MESSAGE ^^^^^\n")

        # See you again soon, stay in event loop if we still have messages.
        return len(self.msg_queue) > 0

    def getLiveNoneObjects(self):
        """Return the live objects that are registered and have a politeness
        of LIVE_NONE. """
        retval = []
        currenturi = self._script.bookmarks.getURIKey()
        for uri, objectid in self._politenessOverrides:
            if uri == currenturi and isinstance(objectid, tuple):
                retval.append(self._script.bookmarks.pathToObj(objectid))
        return retval

    def advancePoliteness(self, script, inputEvent):
        """Advance the politeness level of the given object"""

        if not _settingsManager.getSetting('inferLiveRegions'):
            self._script.presentMessage(messages.LIVE_REGIONS_OFF)
            return

        obj = orca_state.locusOfFocus
        objectid = self._getObjectId(obj)
        uri = self._script.bookmarks.getURIKey()

        try:
            # The current priority is either a previous override or the
            # live property.  If an exception is thrown, an override for 
            # this object has never occurred and the object does not have
            # live markup.  In either case, set the override to LIVE_NONE.
            cur_priority = self._politenessOverrides[(uri, objectid)] 
        except KeyError:
            cur_priority = self._liveStringToType(obj)

        if cur_priority == LIVE_OFF or cur_priority == LIVE_NONE:
            self._politenessOverrides[(uri, objectid)] = LIVE_POLITE
            self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_POLITE)
        elif cur_priority == LIVE_POLITE:
            self._politenessOverrides[(uri, objectid)] = LIVE_ASSERTIVE
            self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_ASSERTIVE)
        elif cur_priority == LIVE_ASSERTIVE:
            self._politenessOverrides[(uri, objectid)] = LIVE_RUDE
            self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_RUDE)
        elif cur_priority == LIVE_RUDE:
            self._politenessOverrides[(uri, objectid)] = LIVE_OFF
            self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_OFF)


    def goLastLiveRegion(self):
        """Move the caret to the last announced live region and speak the 
        contents of that object"""
        if self.lastliveobj:
            self._script.utilities.setCaretPosition(self.lastliveobj, 0)
            self._script.speakContents(self._script.utilities.getObjectContentsAtOffset(
                                       self.lastliveobj, 0))

    def reviewLiveAnnouncement(self, script, inputEvent):
        """Speak the given number cached message"""

        msgnum = int(inputEvent.event_string[1:])
        if not _settingsManager.getSetting('inferLiveRegions'):
            self._script.presentMessage(messages.LIVE_REGIONS_OFF)
            return

        if msgnum > len(self.msg_cache):
            self._script.presentMessage(messages.LIVE_REGIONS_NO_MESSAGE)
        else:
            self._script.presentMessage(self.msg_cache[-msgnum])

    def setLivePolitenessOff(self, script, inputEvent):
        """User toggle to set all live regions to LIVE_OFF or back to their
        original politeness."""

        if not _settingsManager.getSetting('inferLiveRegions'):
            self._script.presentMessage(messages.LIVE_REGIONS_OFF)
            return

        # start at the document frame
        docframe = self._script.utilities.documentFrame()
        # get the URI of the page.  It is used as a partial key.
        uri = self._script.bookmarks.getURIKey()

        # The user is currently monitoring live regions but now wants to 
        # change all live region politeness on page to LIVE_OFF
        if self.monitoring:
            self._script.presentMessage(messages.LIVE_REGIONS_ALL_OFF)
            self.msg_queue.clear()
            
            # First we'll save off a copy for quick restoration
            self._restoreOverrides = copy.copy(self._politenessOverrides)

            # Set all politeness overrides to LIVE_OFF.
            for override in self._politenessOverrides.keys():
                self._politenessOverrides[override] = LIVE_OFF

            # look through all the objects on the page and set/add to
            # politeness overrides.  This only adds live regions with good
            # markup.
            matches = self._script.utilities.findAllDescendants(
                docframe, self.matchLiveRegion)
            for match in matches:
                objectid = self._getObjectId(match)
                self._politenessOverrides[(uri, objectid)] = LIVE_OFF

            # Toggle our flag
            self.monitoring = False

        # The user wants to restore politeness levels
        else:
            for key, value in self._restoreOverrides.items():
                self._politenessOverrides[key] = value
            self._script.presentMessage(messages.LIVE_REGIONS_ALL_RESTORED)
            # Toggle our flag
            self.monitoring = True  

    def generateLiveRegionDescription(self, obj, **args):
        """Used in conjunction with whereAmI to output description and 
        politeness of the given live region object"""
        objectid = self._getObjectId(obj)
        uri = self._script.bookmarks.getURIKey()

        results = []

        # get the description if there is one.
        for relation in obj.getRelationSet():
            relationtype = relation.getRelationType()
            if relationtype == pyatspi.RELATION_DESCRIBED_BY:
                targetobj = relation.getTarget(0)
                try:
                    # We will add on descriptions if they don't duplicate
                    # what's already in the object's description.
                    # See http://bugzilla.gnome.org/show_bug.cgi?id=568467
                    # for more information.
                    #
                    description = targetobj.queryText().getText(0, -1)
                    if description.strip() != obj.description.strip():
                        results.append(description)
                except NotImplemented:
                    pass

        # get the politeness level as a string
        try:
            livepriority = self._politenessOverrides[(uri, objectid)]
            liveprioritystr = self._liveTypeToString(livepriority)
        except KeyError:
            liveprioritystr = 'none'

        # We will only output useful information
        # 
        if results or liveprioritystr != 'none':
            results.append(messages.LIVE_REGIONS_LEVEL % liveprioritystr)

        return results

    def matchLiveRegion(self, obj):
        """Predicate used to find a live region"""
        attrs = self._getAttrDictionary(obj)
        return 'container-live' in attrs

    def _findContainer(self, obj):
        isContainer = lambda x: self._getAttrDictionary(x).get('atomic')
        if isContainer(obj):
            return obj

        return pyatspi.findAncestor(obj, isContainer)

    def _getMessage(self, event):
        """Gets the message associated with a given live event."""
        attrs = self._getAttrDictionary(event.source)
        content = ""
        labels = ""
        
        # A message is divided into two parts: labels and content.  We
        # will first try to get the content.  If there is None, 
        # assume it is an invalid message and return None
        if event.type.startswith('object:children-changed:add'):
            if attrs.get('container-atomic') == 'true':
                content = self._script.utilities.expandEOCs(event.source)
            else:
                content = self._script.utilities.expandEOCs(event.any_data)

        elif event.type.startswith('object:text-changed:insert'):
            if attrs.get('container-atomic') != 'true':
                content = event.any_data
            else:
                container = self._findContainer(event.source)
                content = self._script.utilities.expandEOCs(container)

        if not content:
            return None

        content = content.strip()
        if len(content) == 1:
            content = chnames.getCharacterName(content)

        # Proper live regions typically come with proper aria labels. These
        # labels are typically exposed as names. Failing that, descriptions.
        # Looking for actual labels seems a non-performant waste of time.
        name = (event.source.name or event.source.description).strip()
        if name and name != content:
            labels = name

        # instantly send out notify messages
        if attrs.get('channel') == 'notify':
            utts = labels + content
            self._script.presentationInterrupt()
            self._script.presentMessage(utts)
            return None

        return {'content':[content], 'labels':[labels]}

    def flushMessages(self):
        self.msg_queue.clear()

    def _cacheMessage(self, utts):
        """Cache a message in our cache list of length CACHE_SIZE"""
        self.msg_cache.append(utts)
        if len(self.msg_cache) > CACHE_SIZE:
            self.msg_cache.pop(0)

    def _getLiveType(self, obj):
        """Returns the live politeness setting for a given object. Also,
        registers LIVE_NONE objects in politeness overrides when monitoring."""
        objectid = self._getObjectId(obj)
        uri = self._script.bookmarks.getURIKey()
        if (uri, objectid) in self._politenessOverrides:
            # look to see if there is a user politeness override
            return self._politenessOverrides[(uri, objectid)]
        else:
            livetype = self._liveStringToType(obj)
            # We'll save off a reference to LIVE_NONE if we are monitoring
            # to give the user a chance to change the politeness level.  It
            # is done here for performance sake (objectid, uri are expensive)
            if livetype == LIVE_NONE and self.monitoring:
                self._politenessOverrides[(uri, objectid)] = livetype
            return livetype

    def _getObjectId(self, obj):
        """Returns the HTML 'id' or a path to the object is an HTML id is
        unavailable"""
        attrs = self._getAttrDictionary(obj)
        if attrs is None:
            return self._getPath(obj)
        try:
            return attrs['id']
        except KeyError:
            return self._getPath(obj)

    def _liveStringToType(self, obj, attributes=None):
        """Returns the politeness enum for a given object"""
        attrs = attributes or self._getAttrDictionary(obj)
        try:
            if attrs['container-live'] == 'off': 
                return LIVE_OFF
            elif attrs['container-live'] == 'polite':  
                return LIVE_POLITE
            elif attrs['container-live'] == 'assertive': 
                return LIVE_ASSERTIVE
            elif attrs['container-live'] == 'rude': 
                return LIVE_RUDE
            else: return LIVE_NONE
        except KeyError:
            return LIVE_NONE

    def _liveTypeToString(self, politeness):
        """Returns the politeness level as a string given a politeness enum"""
        if politeness == LIVE_OFF: 
            return 'off'
        elif politeness == LIVE_POLITE: 
            return 'polite'
        elif politeness == LIVE_ASSERTIVE: 
            return 'assertive'
        elif politeness == LIVE_RUDE: 
            return 'rude'
        elif politeness == LIVE_NONE: 
            return 'none'
        else: return 'unknown'

    def _getAttrDictionary(self, obj):
        return self._script.utilities.objectAttributes(obj)
    
    def _getPath(self, obj):
        """ Returns, as a tuple of integers, the path from the given object 
        to the document frame."""
        docframe = self._script.utilities.documentFrame()
        path = []
        while True:
            if obj.parent is None or obj == docframe:
                path.reverse()
                return tuple(path)
            try:
                path.append(obj.getIndexInParent())
            except Exception:
                raise LookupError
            obj = obj.parent

    def toggleMonitoring(self, script, inputEvent):
        if not _settingsManager.getSetting('inferLiveRegions'):
            _settingsManager.setSetting('inferLiveRegions', True)
            self._script.presentMessage(messages.LIVE_REGIONS_MONITORING_ON)
        else:
            _settingsManager.setSetting('inferLiveRegions', False)
            self.flushMessages()
            self._script.presentMessage(messages.LIVE_REGIONS_MONITORING_OFF)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
backends Folder 0755
scripts Folder 0755
__init__.py File 115 B 0644
acss.py File 3.49 KB 0644
bookmarks.py File 8.37 KB 0644
braille.py File 59.87 KB 0644
braille_generator.py File 20.5 KB 0644
braille_rolenames.py File 10.33 KB 0644
brlmon.py File 6.45 KB 0644
brltablenames.py File 7.15 KB 0644
caret_navigation.py File 13.67 KB 0644
chat.py File 33.63 KB 0644
chnames.py File 23.03 KB 0644
cmdnames.py File 55.65 KB 0644
colornames.py File 38.13 KB 0644
common_keyboardmap.py File 6.64 KB 0644
debug.py File 17.16 KB 0644
desktop_keyboardmap.py File 4.62 KB 0644
event_manager.py File 31.81 KB 0644
eventsynthesizer.py File 17.82 KB 0644
find.py File 12.77 KB 0644
flat_review.py File 51.84 KB 0644
formatting.py File 53.94 KB 0644
generator.py File 59.09 KB 0644
guilabels.py File 45.78 KB 0644
input_event.py File 36.41 KB 0644
keybindings.py File 16.39 KB 0644
keynames.py File 9.71 KB 0644
label_inference.py File 19.38 KB 0644
laptop_keyboardmap.py File 4.61 KB 0644
liveregions.py File 21.55 KB 0644
logger.py File 1.97 KB 0644
mathsymbols.py File 88.14 KB 0644
messages.py File 140.32 KB 0644
mouse_review.py File 18.91 KB 0644
notification_messages.py File 6.18 KB 0644
object_properties.py File 32.74 KB 0644
orca.py File 25 KB 0644
orca_gtkbuilder.py File 5.35 KB 0644
orca_gui_commandlist.py File 4.19 KB 0644
orca_gui_find.py File 8.12 KB 0644
orca_gui_navlist.py File 6.66 KB 0644
orca_gui_prefs.py File 139.07 KB 0644
orca_gui_profile.py File 4.06 KB 0644
orca_i18n.py File 3.18 KB 0644
orca_platform.py File 1.41 KB 0644
orca_state.py File 2.1 KB 0644
phonnames.py File 2.76 KB 0644
pronunciation_dict.py File 2.61 KB 0644
punctuation_settings.py File 13.64 KB 0644
script.py File 19.04 KB 0644
script_manager.py File 13.31 KB 0644
script_utilities.py File 183.59 KB 0644
settings.py File 12.82 KB 0644
settings_manager.py File 20.73 KB 0644
sound.py File 5.17 KB 0644
sound_generator.py File 11.99 KB 0644
speech.py File 11.4 KB 0644
speech_generator.py File 108.24 KB 0644
speechdispatcherfactory.py File 24.54 KB 0644
speechserver.py File 7.41 KB 0644
spellcheck.py File 10.07 KB 0644
structural_navigation.py File 122.53 KB 0644
text_attribute_names.py File 28.62 KB 0644
tutorialgenerator.py File 30.41 KB 0644