Overview
Comment: | pylint fixes, support for ##type:, shift depends into separate module |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
6466a162bef7835d5e44b01291bd5965 |
User & Date: | mario on 2022-10-26 05:13:03 |
Other Links: | manifest | tags |
Context
2022-10-26
| ||
05:13 | add some more tests check-in: 3c5491d0b8 user: mario tags: trunk | |
05:13 | pylint fixes, support for ##type:, shift depends into separate module check-in: 6466a162be user: mario tags: trunk | |
2022-10-25
| ||
22:06 | enjoin pluginconf (python implementation here) check-in: 4b8c748142 user: mario tags: trunk | |
Changes
Modified pluginconf/__init__.py from [dd2e021c4c] to [7d506ba315].
1 2 | # encoding: utf-8 # api: python | | | 1 2 3 4 5 6 7 8 9 10 | # encoding: utf-8 # api: python ##type: extract # category: config # title: Plugin configuration # description: Read meta data, pyz/package contents, module locating # version: 0.7.7 # state: stable # classifiers: documentation # license: PD |
︙ | ︙ | |||
55 56 57 58 59 60 61 | # It's somewhat off-scope for plugin management, but used internally. # # argparse_map() # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾ # Converts a list of config: options with arg: attribute for use as # argparser parameters. # | < < < < < < > < < < < < < < < | | < | > > | | < > > | > > | > | | > > | > > > > > > > > > > > > > > > > > > | | | | | > > > | > | | | | < | > > | | | | > | > > | | | < < > > > > | | | < | < | < < | < < | > > | | | | 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | # It's somewhat off-scope for plugin management, but used internally. # # argparse_map() # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾ # Converts a list of config: options with arg: attribute for use as # argparser parameters. # # # Generally this scheme concerns itself more with plugin basenames. # That is: module scripts in a package like `ext.plg1` and `ext.plg2`. # It can be initialized by injecting the plugin-package basename into # plugin_base = []. The associated paths will be used for module # lookup via pkgutil.iter_modules(). # # And a central module can be extended with new lookup locations best # by attaching new locations itself via module.__path__ + ["./local"] # for example. # # Plugin loading thus becomes as simple as __import__("ext.local"). # The attached plugin_state config dictionary in most cases can just # list module basenames, if there's only one set to manage. import sys import os import re import functools import pkgutil import inspect try: from gzip import decompress as gzip_decode # Py3 only except ImportError: try: from compat2and3 import gzip_decode # st2 stub except ImportError: import zlib def gzip_decode(bytestr): """ haphazard workaround """ return zlib.decompress(bytestr, 16 + zlib.MAX_WBITS) import zipfile import argparse __all__ = [ "get_data", "module_list", "plugin_meta", "add_plugin_defaults" ] # Injectables # ‾‾‾‾‾‾‾‾‾‾‾ """ injectable callback function for logging """ log_ERR = lambda *x: None """ File lookup relation for get_data(), should name a top-level package. (Equivalent PluginBase(package=…)) """ module_base = "config" """ Package/module names for module_list() and plugin_meta() lookups. All associated paths will be scanned for module/plugin basenames. (Equivalent to `searchpath` in PluginBase) """ plugin_base = ["channels"] # Compatiblity # ‾‾‾‾‾‾‾‾‾‾‾‾ def renamed_arguments(renamed): """ map old argument names """ def wrapped(func): def execute(*args, **kwargs): return func(*args, **{ renamed.get(key, key): value for key, value in kwargs.items() }) functools.update_wrapper(execute, func) return execute return wrapped # Resource retrieval # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ @renamed_arguments({"fn": "filename", "gz": "gzip"}) def get_data(filename, decode=False, gzip=False, file_base=None): """ Fetches file content from install path or from within PYZ archive. This is just an alias and convenience wrapper for pkgutil.get_data(). Utilizes the module_base / plugin_base as top-level reference. :arg str fn: filename in pyz or bundle :arg bool decode: text file decoding utf-8 :arg bool gz: automatic gzdecode :arg str file_base: alternative base module reference """ try: data = pkgutil.get_data(file_base or module_base, filename) if gzip: data = gzip_decode(data) if decode: return data.decode("utf-8", errors='ignore') return str(data) except: # log_ERR("get_data() didn't find:", fn, "in", file_base) pass # Plugin name lookup # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ def module_list(extra_paths=None): """ Search through ./plugins/ (and other configured plugin_base names → paths) and get module basenames. :arg list extra_paths: in addition to plugin_base list """ # Convert plugin_base package names into paths for iter_modules paths = [] for mp in plugin_base: if sys.modules.get(mp): paths += sys.modules[mp].__path__ elif os.path.exists(mp): paths.append(mp) # Should list plugins within zips as well as local paths ls = pkgutil.iter_modules(paths + (extra_paths or [])) return [name for loader, name, ispkg in ls] # Plugin => meta dict # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ def all_plugin_meta(): """ This is a trivial wrapper to assemble a complete dictionary of available/installed plugins. It associates each plugin name with a its meta{} fields. """ return { name: plugin_meta(module=name) for name in module_list() } # Plugin meta data extraction # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ @renamed_arguments({"filename": "fn"}) def plugin_meta(fn=None, src=None, module=None, frame=1, **kwargs): """ Extract plugin meta data block from different sources: :arg str fn: read literal files, or .pyz contents :arg str src: from already uncovered script code :arg str module: lookup per pkgutil, from plugin_base or top-level modules :arg int frame: extract comment header of caller (default) :arg list extra_base: additional search directories :arg ist max_length: maximum size to read from files """ # Try via pkgutil first, # find any plugins.* modules, or main packages if module: fn = module for base in plugin_base + kwargs.get("extra_base", []): try: src = get_data(fn=fn+".py", decode=True, file_base=base) if src: break except: continue # plugin_meta_extract() will print a notice later # Real filename/path elif fn and os.path.exists(fn): src = open(fn).read(kwargs.get("max_length", 6144)) # Else get source directly from caller elif not src and not fn: module = inspect.getmodule(sys._getframe(frame+1)) # decorator+1 fn = inspect.getsourcefile(module) src = inspect.getcomments(module) # Assume it's a filename within a zip elif fn: intfn = "" while fn and len(fn) and not os.path.exists(fn): |
︙ | ︙ | |||
230 231 232 233 234 235 236 | if not isinstance(src, str): src = src.decode("utf-8", errors='replace') return plugin_meta_extract(src, fn) # Comment and field extraction logic # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ | > > | | | | < > > > > | 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 | if not isinstance(src, str): src = src.decode("utf-8", errors='replace') return plugin_meta_extract(src, fn) # Comment and field extraction logic # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ def plugin_meta_extract(src="", fn=None, literal=False): """ Finds the first comment block. Splits key:value header fields from comment. Turns everything into an dict, with some stub fields if absent. :arg str src: from existing source code :arg int fn: set filename attribute :arg bool literla: just split comment from doc """ # Defaults meta = { "id": os.path.splitext(os.path.basename(fn or ""))[0], "fn": fn, "api": "python", "type": "module", |
︙ | ︙ | |||
275 276 277 278 279 280 281 | meta["config"] = plugin_meta_config(meta.get("config") or "") return meta # Unpack config: structures # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ | > > | | | > > > | | | > | < < < | 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 | meta["config"] = plugin_meta_config(meta.get("config") or "") return meta # Unpack config: structures # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ def plugin_meta_config(str): """ Further breaks up the meta['config'] descriptor. Creates an array from JSON/YAML option lists. :arg str str: unprocessed config: field Stubs out name, value, type, description if absent. # config: { name: 'var1', type: text, value: "default, ..." } { name=option2, type=boolean, $value=1, etc. } """ config = [] for entry in rx.config.findall(str): entry = entry[0] or entry[1] opt = { "type": None, "name": None, "description": "", |
︙ | ︙ | |||
320 321 322 323 324 325 326 | longstr="text", string="str", boolean="bool", checkbox="bool", integer="int", number="int", choice="select", options="select", table="dict", array="dict" ) # Comment extraction regexps # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ | > > | | | > | < | | 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 | longstr="text", string="str", boolean="bool", checkbox="bool", integer="int", number="int", choice="select", options="select", table="dict", array="dict" ) # Comment extraction regexps # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ class rx: """ Pretty crude comment splitting approach. But works well enough already. Technically a YAML parser would do better; but is likely overkill. """ comment = re.compile(r"""(^ {0,4}#.*\n)+""", re.M) hash = re.compile(r"""(^ {0,4}#{1,2} {0,3}\r*)""", re.M) keyval = re.compile(r""" ^([\w-]+):(.*$(?:\n(?![\w-]+:).+$)*) # plain key:value lines """, re.M | re.X) config = re.compile(r""" \{ ((?: [^\{\}]+ | \{[^\}]*\} )+) \} # JSOL/YAML scheme {...} dicts | \< (.+?) \> # old <input> HTML style """, re.X) |
︙ | ︙ | |||
349 350 351 352 353 354 355 | select_dict = re.compile(r"(\w+)\s*[=:>]+\s*([^=,|:]+)") select_list = re.compile(r"\s*([^,|;]+)\s*") # ArgumentParser options conversion # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ | > > | | | | | | | | | | | < | < | < | | < | | | < | | | < < > > < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < > > | | | | | | < < > | > | > | | | | > | | | | | | > > | > | > | > > | | | < < | 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 | select_dict = re.compile(r"(\w+)\s*[=:>]+\s*([^=,|:]+)") select_list = re.compile(r"\s*([^,|;]+)\s*") # ArgumentParser options conversion # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ def argparse_map(opt): """ As variation of in-application config: options, this method converts cmdline argument specifiers. # config: { arg: -i, name: input[], type: str, description: input files } Which allows to collect argumentparser options from different plugins. The only difference to normal config entries is the `arg:` attribute. · It adds array arguments with a [] name suffix, or a `*` type suffix. Else even a `?` or `+` and numeric counts after the type flag. · Understands the types `str`, `int` and `bool`. · Entries may carry a `hidden: 1` or `required: 1` attribute. · And `help:` is an alias to `description:` And `default:` an alias for `value:` · While `type: select` utilizes the `select: a|b|c` format as usual. ArgParsers const=, metavar= flag, or type=file are not aliased here. Basically returns a dictionary that can be fed per **kwargs directly to an ArgumentParsers add_argument(). Iterate over each plugins meta['config'][] options to convert them. """ if not ("arg" in opt and opt["name"] and opt["type"]): return {} # Extract --flag names args = opt["arg"].split() + re.findall(r"-+\w+", opt["name"]) # Prepare mapping options typing = re.findall(r"bool|str|\[\]|const|false|true", opt["type"]) naming = re.findall(r"\[\]", opt["name"]) name = re.findall(r"(?<!-)\b\w+", opt["name"]) nargs = re.findall(r"\b\d+\b|[\?\*\+]", opt["type"]) or [None] is_arr = "[]" in (naming + typing) and nargs == [None] is_bool = "bool" in typing false_b = "false" in typing or opt["value"] in ("0", "false") # print("\nname=", name, "is_arr=", is_arr, "is_bool=", is_bool, # "bool_d=", false_b, "naming=", naming, "typing=", typing) # Populate combination as far as ArgumentParser permits # pylint: disable=bad-whitespace, bad-continuation kwargs = dict( args = args, dest = name[0] if not name[0] in args else None, action = is_arr and "append" or is_bool and false_b and "store_false" or is_bool and "store_true" or "store", nargs = nargs[0], default = opt.get("default") or opt["value"], type = None if is_bool else ("int" in typing and int or "bool" in typing and bool or str), choices = opt["select"].split("|") if "select" in opt else None, required = "required" in opt or None, help = opt["description"] if not "hidden" in opt else argparse.SUPPRESS ) return {k: w for k, w in kwargs.items() if w is not None} # Add plugin defaults to conf.* store # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ def add_plugin_defaults(conf_options, conf_plugins, meta, module=""): """ Utility function which collect defaults from plugin meta data to a config store. Which in the case of streamtuner2 is really just a dictionary `conf{}` and a plugin list in `conf.plugins{}`. :arg dict conf_options: storage for amassed options :arg dict conf_plugins: enable status based on plugin state/priority: :arg dict meta: input plugin meta data (invoke once per plugin) :arg str module: module name of meta: block """ # Option defaults, if not yet defined for opt in meta.get("config", []): if "name" not in opt or "value" not in opt: continue _value = opt.get("value", "") _name = opt.get("name") _type = opt.get("type") if _name in conf_options: continue # typemap if _type == "bool": val = _value.lower() in ("1", "true", "yes", "on") elif _type == "int": val = int(_value) elif _type in ("table", "list"): val = [ re.split(r"\s*[,;]\s*", s.strip()) for s in re.split(r"\s*[|]\s*", _value) ] elif _type == "dict": val = dict([ re.split(r"\s*(?:=>+|==*|-+>|:=+)\s*", s.strip(), 1) for s in re.split(r"\s*[|;,]\s*", _value) ]) else: val = str(_value) conf_options[_name] = val # Initial plugin activation status if module and module not in conf_plugins: conf_plugins[module] = meta.get("priority") in ( "core", "builtin", "always", "default", "standard" ) |
Added pluginconf/depends.py version [f07828cc9d].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | # encoding: utf-8 # api: pluginconf ##type: class # category: config # title: Dependency verification # description: Check depends: lines # depends: pluginconf >= 0.7 # version: 0.5 # state: beta # license: PD # priority: optional # # This is a rather basic depends: checker, mostly for local and # installable modules. It's largely built around streamtuner2 # requirements, and should be customized. # # DependencyValidation().depends()/.valid() # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ # Probes a new plugins` depends: list against installed base modules. # Utilizes each version: fields and allows for virtual modules, or # alternatives and honors alias: names. # import pluginconf import re try: from distutils.spawn import find_executable except ImportError: try: from compat2and3 import find_executable except ImportError: def find_executable(name): pass import zipfile import logging # Minimal depends: probing # ‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ class DependencyValidation(object): """ Now this definitely requires customization. Each plugin can carry a list of (soft-) dependency names. # depends: config, appcore >= 2.0, bin:wkhtmltoimage, python < 3.5 Here only in-application modules are honored, system references ignored. Unknown plugin names are also skipped. A real install helper might want to auto-tick them on, etc. This example is just meant for probing downloadable plugins. The .valid() helper only asserts the api: string, or skips existing modules, and if they're more recent. While .depends() compares minimum versions against existing modules. In practice there's little need for full-blown dependency resolving for application-level modules. """ """ supported APIs """ api = ["python", "streamtuner2"] """ debugging """ log = logging.getLogger("pluginconf.dependency") # prepare list of known plugins and versions def __init__(self, add={}, core=["st2", "uikit", "config", "action"]): self.have = { "python": {"version": sys.version} } # inject virtual modules for name, meta in add.items(): if isinstance(meta, bool): meta = 1 if meta else -1 if isinstance(meta, tuple): meta = ".".join(str(n) for n in meta) if isinstance(meta, (int, float, str)): meta = {"version": str(meta)} self.have[name] = meta # read plugins/* self.have.update(all_plugin_meta()) # add core modules for name in core: self.have[name] = plugin_meta(module=name, extra_base=["config"]) # aliases for name, meta in self.have.copy().items(): if meta.get("alias"): for alias in re.split(r"\s*[,;]\s*", meta["alias"]): self.have[alias] = self.have[name] # basic plugin pre-screening (skip __init__, filter by api:, # exclude installed & same-version plugins) def valid(self, new_plugin): id = new_plugin.get("$name", "__invalid") have_ver = self.have.get(id, {}).get("version", "0") if id.find("__") == 0: self.log.debug("wrong/no id") elif new_plugin.get("api") not in self.api: self.log.debug("not in allowed APIs") elif {new_plugin.get("status"), new_plugin.get("priority")} & {"obsolete", "broken"}: self.log.debug("wrong status (obsolete/broken)") elif have_ver >= new_plugin.get("version", "0.0"): self.log.debug("newer version already installed") else: return True # Verify depends: and breaks: against existing plugins/modules def depends(self, plugin): result = True if plugin.get("depends"): result &= self.and_or(self.split(plugin["depends"]), self.have) if plugin.get("breaks"): result &= self.neither(self.split(plugin["breaks"]), self.have) self.log.debug("plugin '%s' matching requirements: %i", plugin["id"], result) return result # Split trivial "pkg | alt, mod>=1, uikit<4.0" string into nested list [[dep],[alt,alt],[dep]] def split(self, dep_str): dep_cmp = [] for alt_str in re.split(r"\s*[,;]+\s*", dep_str): alt_cmp = [] # split alternatives | for part in re.split(r"\s*\|+\s*", alt_str): # skip deb:pkg-name, rpm:name, bin:name etc. if not len(part): continue if part.find(":") >= 0: self.have[part] = {"version": self.module_test(*part.split(":"))} # find comparison and version num part += " >= 0" m = re.search(r"([\w.:-]+)\s*\(?\s*([>=<!~]+)\s*([\d.]+([-~.]\w+)*)", part) if m and m.group(2): alt_cmp.append([m.group(i) for i in (1, 2, 3)]) if alt_cmp: dep_cmp.append(alt_cmp) return dep_cmp # Single comparison def cmp(self, d, have, absent=True): name, op, ver = d # absent=True is the relaxed check, will ignore unknown plugins # set absent=False or None for strict check (as in breaks: rule e.g.) if not have.get(name, {}).get("version"): return absent # curr = installed version curr = have[name]["version"] tbl = { ">=": curr >= ver, "<=": curr <= ver, "==": curr == ver, ">": curr > ver, "<": curr < ver, "!=": curr != ver, } r = tbl.get(op, True) #print "log.VERSION_COMPARE: ", name, " → (", curr, op, ver, ") == ", r return r # Compare nested structure of [[dep],[alt,alt]] def and_or(self, deps, have, r=True): #print deps return not False in [ True in [self.cmp(d, have) for d in alternatives] for alternatives in deps ] # Breaks/Conflicts: check [[or],[or]] def neither(self, deps, have): return not True in [ self.cmp(d, have, absent=None) for cnd in deps for d in cnd ] # Resolves/injects complex "bin:name" or "python:name" dependency URNs def module_test(self, type, name): return "1" # disabled for now if "_" + type in dir(self): return "1" if bool(getattr(self, "_" + type)(name)) else "-1" # `bin:name` lookup def _bin(self, name): return find_executable(name) # `python:module` test def _python(self, name): return __import__("imp").find_module(name) is not None |