logopen()
Open log file and its associated .fmt specifier, to iterate over log lines as dictionary. File-style wrapper that yields parsed dictionaries instead of string lines.
for row in logfmt1.logopen("/var/log/apache2/access.log", debug=True):
print(row["remote_host"])
Though you might want to keep a reference to the iterator to utilize
.names()
and .alias{}
manually.
__init__(self, logfn='', fmt=None, debug=False, fail=False, duplicate=True)
special
Open log file and its associated .fmt specifier, to iterate over log lines as dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logfn |
str |
Filename of .log file to open. Which should be accompanied by a .log.fmt declaration to allow unpacking lines into dictionary. |
'' |
fmt |
dict |
Alternatively to existing .log.fmt, a predefined class
might be given with |
None |
debug |
bool |
In case of log extraction failures, prints (stdout) some regex debugging. |
False |
fail |
bool |
In case of failure, just error out instead of continuing the iterator. |
False |
duplicate |
bool |
Automatically expand aliases. This effectively copies row entries. |
True |
Attributes:
Name | Type | Description |
---|---|---|
f |
file |
Read handle onto log file |
debug |
bool |
Debug flag |
fail |
bool |
Exception flag |
alias |
dict |
List of row aliases |
container |
dict |
Rules for field expansion |
rx |
re.compile |
Compiled regex |
Exceptions:
Type | Description |
---|---|
StopIteration |
For EOF or if the regex failed and fail=True |
FileNotFound |
If logfn doesn't exist etc. |
Returns:
Type | Description |
---|---|
iterator |
Traverses log file line-wise, but yields dictionaries. |
Source code in logfmt1/logfmt1.py
def __init__(self, logfn="", fmt=None, debug=False, fail=False, duplicate=True):
"""
Open log file and its associated .fmt specifier, to iterate over log lines
as dictionary.
Args:
logfn (str): Filename of .log file to open. Which should be
accompanied by a .log.fmt declaration to allow unpacking
lines into dictionary.
fmt (dict): Alternatively to existing .log.fmt, a predefined class
might be given with `fmt={"class":"syslog"}`. You might
even add a fixated `{"record":"%a %t %e %F"}` format string
this way.
debug (bool): In case of log extraction failures, prints (stdout)
some regex debugging.
fail (bool): In case of failure, just error out instead of continuing
the iterator.
duplicate (bool): Automatically expand aliases. This effectively
copies row entries.
Attributes:
f (file): Read handle onto log file
debug (bool): Debug flag
fail (bool): Exception flag
alias (dict): List of row aliases
container (dict): Rules for field expansion
rx (re.compile): Compiled regex
Raises:
StopIteration: For EOF or if the regex failed and fail=True
FileNotFound: If logfn doesn't exist etc.
Returns:
iterator: Traverses log file line-wise, but yields dictionaries.
"""
self.debug = debug
self.fail = fail
self.duplicate = duplicate
# try + nicer error....
self.f = open(logfn, "r", encoding="utf-8")
if not fmt:
try:
fmt = json.loads(open(f"{logfn}.fmt", "r", encoding="utf-8").read())
except Exception as e:
sys.stderr.write(str(e) + "\n")
sys.stderr.write("Use `update-logfmt` or `modseccfg`→File→Install→update_logfmt.py to generate a *.log.fmt descriptor.\n")
fmt = {"class":"apache combined"}
#fmt = rulesdb.find_by_glob(logfn)
fmt = rulesdb.merge(
fmt, # this should be in regex/update
rulesdb.get(fmt.get("class"))
)
self.alias = fmt.get("alias", {})
self.container = fmt.get("container", {})
self.rx = re.compile(rx2re(regex(fmt)))
debug_rx(self, line)
ANSI output for debugging regex/fmt string. Automatically
invoked for failing lines if debug=True
was given.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
line |
str |
Current raw line (string) from log file. |
required |
Output: Prints directly to stdout using ANSI escape sequences to highlight where regex failed on input line. It's not very exact anymore, but reasonably speedy.
Source code in logfmt1/logfmt1.py
def debug_rx(self, line:str):
"""
ANSI output for debugging regex/fmt string. Automatically
invoked for failing lines if `debug=True` was given.
![failed regex + log line](https://imgur.com/QBKzDsK.png)
Args:
line: Current raw line (string) from log file.
Output:
Prints directly to stdout using ANSI escape sequences to
highlight where regex failed on input line. It's not very
exact anymore, but reasonably speedy.
"""
rx = self.rx.pattern
line = line.rstrip()
#rx_cut = re.compile("[^)]* \(\?P<\w+> ( [^()]+ | \([^()]+\) )+ \) [^()]* \Z", re.X)
# iteratively strip (?...) capture groups
while len(rx) and rx.find("(?P<") >= 0:
#fail = rx_cut.search(rx)
#if fail: fail = fail.group(0)
#else: fail = "<unknown-last-capture>"; break
last = rx.rindex("(?P<")
if last < 1:
fail = "<unknown-last-capture>"; break
fail = rx[last:]
#print(f"testfail: `{fail}`")
try:
rx = rx[0:last]
rx = re.sub("[^)]*$", "", rx)
if re.match(rx, line):
break # works now, so `fail` was the culprit
except:
# likely broke regex nesting, try removing next (?...)
pass
try:
matched = re.match(rx, line)
matched = matched.group(0)
except:
matched = ""
print("\033[36m" + "failed regex section: \033[1;33;41m" + fail + "\033[40;0m")
print("\033[42m" + matched + "\033[41m" + line[len(matched):] + "\033[40;0m")
names(self)
Get column names from generated .fmt regex.
Returns:
Type | Description |
---|---|
list |
dictionary keys of row (without aliases). |
Source code in logfmt1/logfmt1.py
def names(self):
"""
Get column names from generated .fmt regex.
Returns:
list: dictionary keys of row (without aliases).
"""
return re.findall("\(\?P?<(\w+)>", self.rx.pattern)