logopen()

Open log file and its associated .fmt specifier, to iterate over log lines as dictionary. File-style wrapper that yields parsed dictionaries instead of string lines.

for row in logfmt1.logopen("/var/log/apache2/access.log", debug=True):
    print(row["remote_host"])

Though you might want to keep a reference to the iterator to utilize .names() and .alias{} manually.

__init__(self, logfn='', fmt=None, debug=False, fail=False, duplicate=True) special

Open log file and its associated .fmt specifier, to iterate over log lines as dictionary.

Parameters:

Name Type Description Default
logfn str

Filename of .log file to open. Which should be accompanied by a .log.fmt declaration to allow unpacking lines into dictionary.

''
fmt dict

Alternatively to existing .log.fmt, a predefined class might be given with fmt={"class":"syslog"}. You might even add a fixated {"record":"%a %t %e %F"} format string this way.

None
debug bool

In case of log extraction failures, prints (stdout) some regex debugging.

False
fail bool

In case of failure, just error out instead of continuing the iterator.

False
duplicate bool

Automatically expand aliases. This effectively copies row entries.

True

Attributes:

Name Type Description
f file

Read handle onto log file

debug bool

Debug flag

fail bool

Exception flag

alias dict

List of row aliases

container dict

Rules for field expansion

rx re.compile

Compiled regex

Exceptions:

Type Description
StopIteration

For EOF or if the regex failed and fail=True

FileNotFound

If logfn doesn't exist etc.

Returns:

Type Description
iterator

Traverses log file line-wise, but yields dictionaries.

Source code in logfmt1/logfmt1.py
def __init__(self, logfn="", fmt=None, debug=False, fail=False, duplicate=True):
    """
        Open log file and its associated .fmt specifier, to iterate over log lines
        as dictionary.

        Args:
            logfn (str):  Filename of .log file to open.  Which should be
                accompanied by a .log.fmt declaration to allow unpacking
                lines into dictionary.
            fmt (dict): Alternatively to existing .log.fmt, a predefined class
                might be given with `fmt={"class":"syslog"}`.  You might
                even add a fixated `{"record":"%a %t %e %F"}` format string
                this way.
            debug (bool): In case of log extraction failures, prints (stdout)
                some regex debugging.
            fail (bool): In case of failure, just error out instead of continuing
                the iterator.
            duplicate (bool): Automatically expand aliases.  This effectively
                copies row entries.

        Attributes:
            f (file): Read handle onto log file
            debug (bool): Debug flag
            fail (bool): Exception flag
            alias (dict): List of row aliases
            container (dict): Rules for field expansion
            rx (re.compile): Compiled regex

        Raises:
            StopIteration: For EOF or if the regex failed and fail=True
            FileNotFound: If logfn doesn't exist etc.

        Returns:
            iterator: Traverses log file line-wise, but yields dictionaries.
    """
    self.debug = debug
    self.fail = fail
    self.duplicate = duplicate
    # try + nicer error....
    self.f = open(logfn, "r", encoding="utf-8")
    if not fmt:
        try:
            fmt = json.loads(open(f"{logfn}.fmt", "r", encoding="utf-8").read())
        except Exception as e:
            sys.stderr.write(str(e) + "\n")
            sys.stderr.write("Use `update-logfmt` or `modseccfg`→File→Install→update_logfmt.py to generate a *.log.fmt descriptor.\n")
            fmt = {"class":"apache combined"}
            #fmt = rulesdb.find_by_glob(logfn)
    fmt = rulesdb.merge(
        fmt,   # this should be in regex/update
        rulesdb.get(fmt.get("class"))
    )
    self.alias = fmt.get("alias", {})
    self.container = fmt.get("container", {})
    self.rx = re.compile(rx2re(regex(fmt)))

debug_rx(self, line)

ANSI output for debugging regex/fmt string. Automatically invoked for failing lines if debug=True was given.

failed regex + log line

Parameters:

Name Type Description Default
line str

Current raw line (string) from log file.

required

Output: Prints directly to stdout using ANSI escape sequences to highlight where regex failed on input line. It's not very exact anymore, but reasonably speedy.

Source code in logfmt1/logfmt1.py
def debug_rx(self, line:str):
    """
        ANSI output for debugging regex/fmt string. Automatically
        invoked for failing lines if `debug=True` was given.

        ![failed regex + log line](https://imgur.com/QBKzDsK.png)

        Args:
          line: Current raw line (string) from log file.
        Output:
          Prints directly to stdout using ANSI escape sequences to
          highlight where regex failed on input line. It's not very
          exact anymore, but reasonably speedy.
    """

    rx = self.rx.pattern
    line = line.rstrip()
    #rx_cut = re.compile("[^)]*  \(\?P<\w+>  ( [^()]+ | \([^()]+\) )+  \)  [^()]* \Z", re.X)
    # iteratively strip (?...) capture groups
    while len(rx) and rx.find("(?P<") >= 0:
        #fail = rx_cut.search(rx)
        #if fail: fail = fail.group(0)
        #else: fail = "<unknown-last-capture>"; break
        last = rx.rindex("(?P<")
        if last < 1:
            fail = "<unknown-last-capture>"; break
        fail = rx[last:]
        #print(f"testfail: `{fail}`")
        try:
            rx = rx[0:last]
            rx = re.sub("[^)]*$", "", rx)
            if re.match(rx, line):
                break # works now, so `fail` was the culprit
        except:
            # likely broke regex nesting, try removing next (?...)
            pass
    try:
        matched = re.match(rx, line)
        matched = matched.group(0)
    except:
        matched = ""
    print("\033[36m" + "failed regex section: \033[1;33;41m" + fail + "\033[40;0m")
    print("\033[42m" + matched + "\033[41m" + line[len(matched):] + "\033[40;0m")

names(self)

Get column names from generated .fmt regex.

Returns:

Type Description
list

dictionary keys of row (without aliases).

Source code in logfmt1/logfmt1.py
def names(self):
    """
        Get column names from generated .fmt regex.

        Returns:
          list: dictionary keys of row (without aliases).
    """
    return re.findall("\(\?P?<(\w+)>", self.rx.pattern)