logopen() does this internally, but you might want to manually craft the "regex": for a .fmt descriptor. In such cases, you can use regex() or update() to combine the log.fmt with definitions from the global fmt database.

regex()

Takes a .fmt json/dict, and generates the regex with named capture groups from it.

Create regex for log fmt{}.

Parameters:

Name Type Description Default
fmt dict

Should contain record: and class:, but may define custom fields or aliases.

required
update bool

Inject fields and other declarations from shared .fmt database into fmt dict.

False

Returns:

Type Description
str

Combined regex, for example (?<remote_host>[\w\-.:]+) (?<remote_logname>[\w\-.:]+) (?<remote_user>[\-\w@.]+) \[?(?<request_time>\d[\d:\w\s:./\-+,;]+)\]? "(?<request_line>(?<request_method>\w+) …␣…)"…

Source code in logfmt1/logfmt1.py
def regex(fmt, update=False):
    """
        Create regex for log fmt{}.

        Args:
            fmt (dict): Should contain record: and class:, but may define
                custom fields or aliases.

            update (bool: Inject fields and other declarations from shared
                .fmt database into `fmt` dict.

        Returns:
          str: Combined regex, for example `(?<remote_host>[\\w\\-.:]+)
               (?<remote_logname>[\\w\\-.:]+) (?<remote_user>[\\-\\w@.]+)
               \\[?(?<request_time>\\d[\\d:\\w\\s:./\\-+,;]+)\\]? 
               "(?<request_line>(?<request_method>\\w+) …␣…)"…`
    """

    rules = rulesdb.merge(
        fmt,
        rulesdb.get(fmt["class"])
    )
    fields = rules["fields"]
    record = fmt["record"]
    if update:
        for field in ["rewrite", "fields", "expand", "alias", "container"]:
            if not field in fmt:
                fmt[field] = {}

    # pre-cleanup (for irrelevant format string `%!200,500<s` control prefixes)
    if "rewrite" in rules:
        for rx, repl in rules["rewrite"].items():
            record = rx_sub(rx, repl, record)

    # create fields from variant placeholders
    if "expand" in rules:
        rx_quote_alt = { # (is_quoted + rx) can map to alternative regex
            (False,  '[^"]*'):  "\S*",
            (True,   "\S+"):    "(?:[^\"]*|\\\\\")+",
        }
        for rx, expand in rules["expand"].items():
            for is_quoted, match, *uu in re.findall(f"(\"?)({rx})", record):
                if match in fields:
                    continue
                x = copy(expand)
                # id: is usually "$1", but might be "prefix_$2" or something
                if x["id"].find('$') >= 0:
                    x["id"] = rx_sub(rx, x["id"], match)
                    x["id"] = re.sub("\W+", "", x["id"]).lower()
                # recurse into other pattern types
                if not "rx" in x and "class" in x:
                    x["rx"] = regex({
                        "class": x["class"],
                        "record": rx_sub(rx, x.get("record") or "$1", match)
                    })
                # regex alternatives, depending on quotes preceeding the placeholder
                if is_quoted and "if_quoted" in x:  # maybe an `if_quoted` definition makes this block redundant
                    x["rx"] = x["if_quoted"]
                elif (is_quoted, x["rx"]) in rx_quote_alt:
                    x["rx"] = rx_quote_alt[(is_quoted, x["rx"])]
                fields[match] = x

    # catch-all \S+ for completely unknown placeholders
    if "placeholder" in rules:
        for ph in re.findall(rules["placeholder"], record):
            if not ph in fields:
                id = re.sub("\W+", "", ph)
                fields[ph] = { "id": id, "rx": "\S+" }

    # do the actual replacement
    def sub_placeholder(m):
        ph = fields[m.group(0)]
        if update:
            fmt["fields"][m.group(0)] = ph  # store used placeholders in fmt
        rx = ph["rx"]
        id = ph["id"]
        # check for existing (…) capture group to mark up
        if re.search("(?<!\\\\)\((?!\?)", rx):
            rx = re.sub("(?<!\\\\)\((?!\?)", f"(?<{id}>", rx, re.M, 1)
        else:
            rx = f"(?<{id}>{rx})"
        return rx
    rx = re.sub(rules["placeholder"], sub_placeholder, record)
    rx = rename_duplicates(rx)
    return rx

rx2re()

Convert generic (?<name>…) to Python (?P<name>…) regex capture group. (logfmt1 definitions use standard syntax per default.)

Parameters:

Name Type Description Default
rx str

Generic/PCRE regex syntax

required

Returns:

Type Description
str

Python re syntax

Source code in logfmt1/logfmt1.py
def rx2re(rx):
    """
        Convert generic `(?<name>…)` to Python `(?P<name>…)` regex capture group.
        (logfmt1 definitions use standard syntax per default.)

        Args:
            rx (str): Generic/PCRE regex syntax

        Returns:
            str: Python re syntax
    """
    return re.sub("\(\?<(?=\w+>)", "(?P<", rx)

update()

Updates the regex: property in a .fmt dict. Also injects any used fields: or alias: lists. (For now this is basically just a wrapper of the regex() builder.)

should be the other way round: regex() is meant to be a subset of update()

Source code in logfmt1/logfmt1.py
def update(fmt):
    """ should be the other way round: regex() is meant to be a subset of update() """
    fmt["regex"] = regex(fmt, update=True)