GUI editor to tame mod_security rules

⌈⌋ ⎇ branch:  modseccfg


Artifact [0acb9933c7]

Artifact 0acb9933c783fe76a772723e093c9b8449b0fe6400616ea7a21c22bcc3593016:

  • File modseccfg/utils.py — part of check-in [21714627ef] at 2020-11-26 00:27:07 on branch trunk — Fix is_glob taking precedence over not writable() in log reader, more details for subprocess.run() tasks (instead of os.system() now) (user: mario size: 6093)

# encoding: utf-8
# api: python
# type: functions
# category: utils
# title:  Utils & Config
# description: various shortcut functions, config data, UI and remoting wrappers
# version: 0.4
# depends: pluginconf (>= 0.7.2), python:appdirs, python:pathlib
# config:
#   { name: sshfs_mount, type: str, value: "~/.config/modseccfg/mnt/", description: "Remote connection (sshfs) mount point", help: "This will be used for `modseccfg vps123:` invocations to bind the servers / root locally." }
#    { name: "test[key]", type: bool, value: 1, description: "Array[key] test" }
# state: alpha
#
# Contains some utility code, and Python module monkeypatching.
#
# With `srvroot` all file access gets encapsulated, so it works
# locally or over sshfs.
#
# The `conf` dict is read from `~/.config/modseccfg/settings.json`.
# Defaults are built in, or module-extracted via pluginconf. The
# settings GUI also courtesy of.
#


import sys
import os
import pathlib
import re
import functools
import subprocess
import atexit
import json
import pluginconf, pluginconf.gui
import appdirs
try: import frosch; frosch.hook()
except: pass


#-- config defaults
conf = {
    # mainwindow
    "theme": "DefaultNoNagging",
    "switch_auto": 0,
    "keyboard_binds": 1,
    # writer
    "edit_sys_files": False,
    "backup_files": True,
    "backup_dir": "~/backup-config/",
    # logs
    "log_entries": 5000,
    "log_filter": "(?!404|429)[45]\d\d",
    "log_skip_rx" : "PetalBot|/.well-known/ignore.cgi",
    "add_stub_logs": 1,    # data/common_false_*.log
    # utils
    "sshfs_mount": "~/mnt/",
    "sshfs_o": "",
    "conf_dir": appdirs.user_config_dir("modseccfg", "io"),
    "conf_file": "settings.json",
    "plugins": {
        "__init__": 1,
        "mainwindow": 1,
        "appsettings": 1,
        "utils": 1,
        "vhosts": 1,
        "logs": 1,
        "writer": 1
    }
}

#-- plugin lookup
pluginconf.module_base = __name__
pluginconf.plugin_base = [__package__]
for module,meta in pluginconf.all_plugin_meta().items():
    pluginconf.add_plugin_defaults(conf, {}, meta, module)


#-- path
def expandpath(dir):
    return str(pathlib.Path(dir).expanduser())

#-- @decorator to override module function
@functools.singledispatch
def inject(mod):
    def decorator(func):
        setattr(mod, func.__name__, func)
    return decorator
#-- patch re for \h support
@inject(re)
def compile(regex, *kargs, re_compile_orig=re.compile, **kwargs):
    if type(regex) is str:
        regex = re.sub(r'\\h(?![^\[]*\])', r'[\ \t\f]', regex)
        #print("re_compile: " + regex)
    return re_compile_orig(regex, *kargs, **kwargs)
@inject(re)
def grep(regex, list, flags=0):
    return [s for s in list if re.search(regex, s, flags)]
    





#-- remote/sshfs bindings
#
# This wraps any modseccfg file operations on config or log files.
# If modseccfg is started with a ssh:/ parameter, then we'll connect
# the remote file system. All file IO uses the mount prefix henceforth;
# thusly enabling remote log scans and config editing.
# (Because X11 forwarding with Python/Tkinter is unworkable at best.)
#
class remote:

    # initialize if argv[] contains any `(user@)hostname:/`
    def __init__(self, srv=[]):
        if not srv:
            self.local = 1
            self.mnt = ""
            self.srv = self.srvname = ""
        else:
            self.local = 0
            self.srvname = re.sub(":.*?$", "", srv[0])
            self.srv = self.srvname + ":/"   # must be / root
            self.mnt = expandpath(conf["sshfs_mount"]) + "/" + self.srv
            os.makedirs(self.mnt, 0o0700, True)
            self.mount()

    def mount(self):
        opts = []
        for opt in re.findall("(?<!-)\\b[\w=]+", conf.get("sshfs_o", "")):
            opts += ["-o", opt]
        cmd = ["sshfs"] + opts + [self.srv+"/", self.mnt]
        if self.mnt and self.srv:
            print(f"srvroot.mount = {cmd}")
            subprocess.run(cmd)
            atexit.register(self.umount)

    def umount(self):
        cmd = ["fusermount", "-u", self.mnt]
        if self.mnt and self.srv:
            print(f"srvroot.umount = {cmd}")
            subprocess.run(cmd)
            os.rmdir(self.mnt)

    def fn(self, fn):
        return self.mnt + fn

    def read(self, fn):
        if not self.exists(fn):
            if not re.search("letsencrypt|ssl", fn):
                print("WARNING: file not found", self.mnt, fn)
            return ""
        with open(self.fn(fn), "r", encoding="utf8") as f:
            return f.read()

    def write(self, fn, src):
        with open(self.fn(fn), "w", encoding="utf8") as f:
            return f.write(src)

    def popen(self, cmd, action="r"):
        if not self.local:
            cmd = ["ssh", self.srvname] + cmd
            print(f"srvroot.popen = {cmd}")
        if action=="r":
            return subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout
        elif action=="e":
            return subprocess.Popen(cmd, stderr=subprocess.PIPE).stderr
        else:
            return subprocess.Popen(cmd, stdin=subprocess.PIPE).stdin

    def exists(self, fn):
        return os.path.exists(self.fn(fn))

    def writable(self, fn):
        if self.local:
            return os.access(self.fn(fn), os.W_OK)
        else:
            return True  # need a real test here
    writeable=writable  # alias
            
# initialize with argv[]
srvroot = remote(re.grep("\w+:", sys.argv[1:]))


#-- read config file
def cfg_read():
    fn = conf["conf_dir"] + "/" + conf["conf_file"]
    if os.path.exists(fn):
        conf.update(json.load(open(fn, "r", encoding="utf8")))

# write config file
def cfg_write():
    os.makedirs(conf["conf_dir"], 0o755, True)
    #print(str(conf))
    fn = conf["conf_dir"] + "/" + conf["conf_file"]
    json.dump(conf, open(fn, "w", encoding="utf8"), indent=4)

# show config option dialog
def cfg_window(mainself, *kargs):
    fn_py = __file__.replace("utils", "*")
    save = pluginconf.gui.window(conf, conf["plugins"], files=[fn_py], theme=conf["theme"])
    if save:
        cfg_write()

# initialze conf{}
cfg_read()