diff --git a/explode_mbox.py b/explode_mbox.py index 9d8ba48..f7a3d10 100644 --- a/explode_mbox.py +++ b/explode_mbox.py @@ -1,13 +1,13 @@ import argparse +import gzip import hashlib +import logging import os import pathlib import sys import time import typing -DEBUG = os.environ.get("DEBUG") == "enabled" - def main(sysargs=sys.argv[:]) -> int: parser = argparse.ArgumentParser() @@ -20,75 +20,96 @@ def main(sysargs=sys.argv[:]) -> int: type=float, help="sleep seconds in between messages", ) + parser.add_argument( + "-D", + "--debug", + action="store_true", + help="increase logging verbosity to debug level", + ) args = parser.parse_args(sysargs[1:]) - for msg in _iter_mbox(args.mbox): - if msg.length < 2: - print("skipping invalid message") - continue - dest = args.output_directory / msg.relpath - dest.parent.mkdir(parents=True, exist_ok=True) - print(f"writing message to {dest}") - dest.write_bytes(msg.as_bytes()) - time.sleep(args.zzz) + log_level = logging.INFO + if os.environ.get("DEBUG") == "enabled": + log_level = logging.DEBUG + + logging.basicConfig(level=log_level) + + MBoxExploder().explode( + mbox=args.mbox, output_directory=args.output_directory, pause_seconds=args.zzz + ) + return 0 -class Message: +class MBoxMessage: def __init__(self): self.lines = [] + def as_gz_bytes(self) -> bytes: + return gzip.compress(self.as_bytes()) + def as_bytes(self) -> bytes: return b"".join([l for l in self.lines]) - @property - def length(self) -> int: - return len(self.lines) - - @property def signature(self) -> str: - return hashlib.sha512(self.lines[0]).hexdigest() + return hashlib.sha512(self.as_bytes()).hexdigest() - @property def relpath(self) -> str: - sig = self.signature - return os.path.sep.join( - [sig[0:1], sig[1:3], sig[3:5], sig[5:7], f"{sig}.email"] - ) + sig = self.signature() + return os.path.sep.join([sig[0:2], sig[2:4], sig]) + + def gz_relpath(self) -> str: + return self.relpath() + ".gz" -def _iter_mbox(mbox: typing.BinaryIO) -> typing.Generator[Message, None, None]: - msg = Message() - cur_line = b"" +class MBoxExploder: + def __init__(self): + self._log = logging.getLogger().getChild("mbox-exploder") - while True: - byte = mbox.read(1) - if len(byte) == 0: - _debug(f"reached EOF") - msg.lines.append(cur_line) - yield msg - return + def explode( + self, + mbox: typing.BinaryIO, + output_directory: pathlib.Path, + pause_seconds: float, + ): + for i, msg in enumerate(self._iter_mbox(mbox)): + if len(msg.lines) < 2: + self._log.warn("skipping invalid message (%r)", i) + continue + dest = output_directory / msg.gz_relpath() + dest.parent.mkdir(parents=True, exist_ok=True) + self._log.info("writing message to %s", str(dest)) + dest.write_bytes(msg.as_gz_bytes()) + time.sleep(pause_seconds) - cur_line += byte - - if byte != b"\n": - continue - - if cur_line.startswith(b"From "): - _debug(f"reached new msg") - yield msg - msg = Message() - - _debug(f"appending line {cur_line}") - msg.lines.append(cur_line) + def _iter_mbox( + self, mbox: typing.BinaryIO + ) -> typing.Generator[MBoxMessage, None, None]: + msg = MBoxMessage() cur_line = b"" + while True: + byte = mbox.read(1) + if len(byte) == 0: + self._log.debug("reached EOF") + msg.lines.append(cur_line) + yield msg + return -def _debug(msg): - if not DEBUG: - return - print(f"DEBUG: {msg}", file=sys.stderr) + cur_line += byte + + if byte != b"\n": + continue + + if cur_line.startswith(b"From ") and len(msg.lines) > 1: + self._log.debug("reached new msg") + yield msg + msg = MBoxMessage() + + self._log.debug("appending line %r", cur_line) + msg.lines.append(cur_line) + cur_line = b"" if __name__ == "__main__":