logopen() does this internally, but you might want to manually
craft the "regex":
for a .fmt descriptor. In such cases,
you can use regex()
or update()
to combine the log.fmt with definitions
from the global fmt database.
regex()
Takes a .fmt json/dict, and generates the regex with named capture groups from it.
Create regex for log fmt{}.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fmt |
dict |
Should contain record: and class:, but may define custom fields or aliases. |
required |
update |
bool |
Inject fields and other declarations from shared
.fmt database into |
False |
Returns:
Type | Description |
---|---|
str |
Combined regex, for example |
Source code in logfmt1/logfmt1.py
def regex(fmt, update=False):
"""
Create regex for log fmt{}.
Args:
fmt (dict): Should contain record: and class:, but may define
custom fields or aliases.
update (bool: Inject fields and other declarations from shared
.fmt database into `fmt` dict.
Returns:
str: Combined regex, for example `(?<remote_host>[\\w\\-.:]+)
(?<remote_logname>[\\w\\-.:]+) (?<remote_user>[\\-\\w@.]+)
\\[?(?<request_time>\\d[\\d:\\w\\s:./\\-+,;]+)\\]?
"(?<request_line>(?<request_method>\\w+) …␣…)"…`
"""
rules = rulesdb.merge(
fmt,
rulesdb.get(fmt["class"])
)
fields = rules["fields"]
record = fmt["record"]
if update:
for field in ["rewrite", "fields", "expand", "alias", "container"]:
if not field in fmt:
fmt[field] = {}
# pre-cleanup (for irrelevant format string `%!200,500<s` control prefixes)
if "rewrite" in rules:
for rx, repl in rules["rewrite"].items():
record = rx_sub(rx, repl, record)
# create fields from variant placeholders
if "expand" in rules:
rx_quote_alt = { # (is_quoted + rx) can map to alternative regex
(False, '[^"]*'): "\S*",
(True, "\S+"): "(?:[^\"]*|\\\\\")+",
}
for rx, expand in rules["expand"].items():
for is_quoted, match, *uu in re.findall(f"(\"?)({rx})", record):
if match in fields:
continue
x = copy(expand)
# id: is usually "$1", but might be "prefix_$2" or something
if x["id"].find('$') >= 0:
x["id"] = rx_sub(rx, x["id"], match)
x["id"] = re.sub("\W+", "", x["id"]).lower()
# recurse into other pattern types
if not "rx" in x and "class" in x:
x["rx"] = regex({
"class": x["class"],
"record": rx_sub(rx, x.get("record") or "$1", match)
})
# regex alternatives, depending on quotes preceeding the placeholder
if is_quoted and "if_quoted" in x: # maybe an `if_quoted` definition makes this block redundant
x["rx"] = x["if_quoted"]
elif (is_quoted, x["rx"]) in rx_quote_alt:
x["rx"] = rx_quote_alt[(is_quoted, x["rx"])]
fields[match] = x
# catch-all \S+ for completely unknown placeholders
if "placeholder" in rules:
for ph in re.findall(rules["placeholder"], record):
if not ph in fields:
id = re.sub("\W+", "", ph)
fields[ph] = { "id": id, "rx": "\S+" }
# do the actual replacement
def sub_placeholder(m):
ph = fields[m.group(0)]
if update:
fmt["fields"][m.group(0)] = ph # store used placeholders in fmt
rx = ph["rx"]
id = ph["id"]
# check for existing (…) capture group to mark up
if re.search("(?<!\\\\)\((?!\?)", rx):
rx = re.sub("(?<!\\\\)\((?!\?)", f"(?<{id}>", rx, re.M, 1)
else:
rx = f"(?<{id}>{rx})"
return rx
rx = re.sub(rules["placeholder"], sub_placeholder, record)
rx = rename_duplicates(rx)
return rx
rx2re()
Convert generic (?<name>…)
to Python (?P<name>…)
regex capture group.
(logfmt1 definitions use standard syntax per default.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rx |
str |
Generic/PCRE regex syntax |
required |
Returns:
Type | Description |
---|---|
str |
Python re syntax |
Source code in logfmt1/logfmt1.py
def rx2re(rx):
"""
Convert generic `(?<name>…)` to Python `(?P<name>…)` regex capture group.
(logfmt1 definitions use standard syntax per default.)
Args:
rx (str): Generic/PCRE regex syntax
Returns:
str: Python re syntax
"""
return re.sub("\(\?<(?=\w+>)", "(?P<", rx)
update()
Updates the regex:
property in a .fmt dict. Also injects any used
fields:
or alias:
lists. (For now this is basically just a wrapper
of the regex()
builder.)
should be the other way round: regex() is meant to be a subset of update()
Source code in logfmt1/logfmt1.py
def update(fmt):
""" should be the other way round: regex() is meant to be a subset of update() """
fmt["regex"] = regex(fmt, update=True)