File pluginconf/depends.py artifact 1d2be7d94e part of check-in b5dc7df53f


# encoding: utf-8
# api: pluginconf
##type: class
# category: config
# title: Dependency verification
# description: Check depends: lines
# depends: pluginconf >= 0.7
# version: 0.5
# state: beta
# license: PD
# priority: optional
#
# This is a rather basic depends: checker, mostly for local and
# installable modules. It's largely built around streamtuner2
# requirements, and should be customized.
#
# DependencyValidation().depends()/.valid()
# ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾
#  Probes a new plugins` depends: list against installed base modules.
#  Utilizes each version: fields and allows for virtual modules, or
#  alternatives and honors alias: names.
#


import sys
import re
import pluginconf
try:
    from distutils.spawn import find_executable
except ImportError:
    try:
        from compat2and3 import find_executable
    except ImportError:
        def find_executable(name):
            pass
import zipfile
import logging


# Minimal depends: probing
# ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾
class DependencyValidation(object):
    """
    Now this definitely requires customization. Each plugin can carry
    a list of (soft-) dependency names.

    # depends: config, appcore >= 2.0, bin:wkhtmltoimage, python < 3.5

    Here only in-application modules are honored, system references
    ignored. Unknown plugin names are also skipped. A real install
    helper might want to auto-tick them on, etc. This example is just
    meant for probing downloadable plugins.

    The .valid() helper only asserts the api: string, or skips existing
    modules, and if they're more recent.
    While .depends() compares minimum versions against existing modules.

    In practice there's little need for full-blown dependency resolving
    for application-level modules.
    """
    
    """ supported APIs """
    api = ["python", "streamtuner2"]
    
    """ debugging """
    log = logging.getLogger("pluginconf.dependency")

    # prepare list of known plugins and versions
    def __init__(self, add={}, core=["st2", "uikit", "config", "action"]):
        self.have = {
            "python": {"version": sys.version}
        }

        # inject virtual modules
        for name, meta in add.items():
            if isinstance(meta, bool):
                meta = 1 if meta else -1
            if isinstance(meta, tuple):
                meta = ".".join(str(n) for n in meta)
            if isinstance(meta, (int, float, str)):
                meta = {"version": str(meta)}
            self.have[name] = meta

        # read plugins/*
        self.have.update(pluginconf.all_plugin_meta())

        # add core modules
        for name in core:
            self.have[name] = pluginconf.plugin_meta(module=name, extra_base=["config"])

        # aliases
        for name, meta in self.have.copy().items():
            if meta.get("alias"):
                for alias in re.split(r"\s*[,;]\s*", meta["alias"]):
                    self.have[alias] = self.have[name]

    # basic plugin pre-screening (skip __init__, filter by api:,
    # exclude installed & same-version plugins)
    def valid(self, new_plugin):
        if not "$name" in new_plugin:
            self.log.warning(".valid() checks online plugin lists, requires $name")
        id = new_plugin.get("$name", "__invalid")
        have_ver = self.have.get(id, {}).get("version", "0")
        if id.find("__") == 0:
            self.log.debug("wrong/no id")
        elif new_plugin.get("api") not in self.api:
            self.log.debug("not in allowed APIs")
        elif {new_plugin.get("status"), new_plugin.get("priority")} & {"obsolete", "broken"}:
            self.log.debug("wrong status (obsolete/broken)")
        elif have_ver >= new_plugin.get("version", "0.0"):
            self.log.debug("newer version already installed")
        else:
            return True

    # Verify depends: and breaks: against existing plugins/modules
    def depends(self, plugin):
        result = True
        if plugin.get("depends"):
            result &= self.and_or(self.split(plugin["depends"]), self.have)
        if plugin.get("breaks"):
            result &= self.neither(self.split(plugin["breaks"]), self.have)
        self.log.debug("plugin '%s' matching requirements: %i", plugin["id"], result)
        return result

    # Split trivial "pkg | alt, mod>=1, uikit<4.0" string into nested list [[dep],[alt,alt],[dep]]
    def split(self, dep_str):
        dep_cmp = []
        for alt_str in re.split(r"\s*[,;]+\s*", dep_str):
            alt_cmp = []
            # split alternatives |
            for part in re.split(r"\s*\|+\s*", alt_str):
                # skip deb:pkg-name, rpm:name, bin:name etc.
                if not len(part):
                    continue
                if part.find(":") >= 0:
                    self.have[part] = {"version": self.module_test(*part.split(":"))}
                # find comparison and version num
                part += " >= 0"
                m = re.search(r"([\w.:-]+)\s*\(?\s*([>=<!~]+)\s*([\d.]+([-~.]\w+)*)", part)
                if m and m.group(2):
                    alt_cmp.append([m.group(i) for i in (1, 2, 3)])
            if alt_cmp:
                dep_cmp.append(alt_cmp)
        return dep_cmp

    # Single comparison
    def cmp(self, d, have, absent=True):
        name, op, ver = d
        # absent=True is the relaxed check, will ignore unknown plugins
        # set absent=False or None for strict check (as in breaks: rule e.g.)
        if not have.get(name, {}).get("version"):
            return absent
        # curr = installed version
        curr = have[name]["version"]
        tbl = {
            ">=": curr >= ver,
            "<=": curr <= ver,
            "==": curr == ver,
            ">":  curr > ver,
            "<":  curr < ver,
            "!=": curr != ver,
        }
        r = tbl.get(op, True)
        #print "log.VERSION_COMPARE: ", name, " → (", curr, op, ver, ") == ", r
        return r

    # Compare nested structure of [[dep],[alt,alt]]
    def and_or(self, deps, have, r=True):
        #print deps
        return not False in [
            True in [self.cmp(d, have) for d in alternatives] for alternatives in deps
        ]

    # Breaks/Conflicts: check [[or],[or]]
    def neither(self, deps, have):
        return not True in [
            self.cmp(d, have, absent=None) for cnd in deps for d in cnd
        ]

    # Resolves/injects complex "bin:name" or "python:name" dependency URNs
    def module_test(self, type, name):
        return "1"  # disabled for now
        if "_" + type in dir(self):
            return "1" if bool(getattr(self, "_" + type)(name)) else "-1"

    # `bin:name` lookup
    def _bin(self, name):
        return find_executable(name)

    # `python:module` test
    def _python(self, name):
        return __import__("imp").find_module(name) is not None