You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
box-o-sand/explode_mbox.py

117 lines
2.9 KiB

import argparse
import gzip
import hashlib
import logging
import os
import pathlib
import sys
import time
import typing
def main(sysargs=sys.argv[:]) -> int:
parser = argparse.ArgumentParser()
parser.add_argument("mbox", type=argparse.FileType("rb"))
parser.add_argument("output_directory", type=pathlib.Path)
parser.add_argument(
"-s",
"--zzz",
default=0.005,
type=float,
help="sleep seconds in between messages",
)
parser.add_argument(
"-D",
"--debug",
action="store_true",
help="increase logging verbosity to debug level",
)
args = parser.parse_args(sysargs[1:])
log_level = logging.INFO
if os.environ.get("DEBUG") == "enabled":
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
MBoxExploder().explode(
mbox=args.mbox, output_directory=args.output_directory, pause_seconds=args.zzz
)
return 0
class MBoxMessage:
def __init__(self):
self.lines = []
def as_gz_bytes(self) -> bytes:
return gzip.compress(self.as_bytes())
def as_bytes(self) -> bytes:
return b"".join([l for l in self.lines])
def signature(self) -> str:
return hashlib.sha512(self.as_bytes()).hexdigest()
def relpath(self) -> str:
sig = self.signature()
return os.path.sep.join([sig[0:2], sig[2:4], sig])
def gz_relpath(self) -> str:
return self.relpath() + ".gz"
class MBoxExploder:
def __init__(self):
self._log = logging.getLogger().getChild("mbox-exploder")
def explode(
self,
mbox: typing.BinaryIO,
output_directory: pathlib.Path,
pause_seconds: float,
):
for i, msg in enumerate(self._iter_mbox(mbox)):
if len(msg.lines) < 2:
self._log.warn("skipping invalid message (%r)", i)
continue
dest = output_directory / msg.gz_relpath()
dest.parent.mkdir(parents=True, exist_ok=True)
self._log.info("writing message to %s", str(dest))
dest.write_bytes(msg.as_gz_bytes())
time.sleep(pause_seconds)
def _iter_mbox(
self, mbox: typing.BinaryIO
) -> typing.Generator[MBoxMessage, None, None]:
msg = MBoxMessage()
cur_line = b""
while True:
byte = mbox.read(1)
if len(byte) == 0:
self._log.debug("reached EOF")
msg.lines.append(cur_line)
yield msg
return
cur_line += byte
if byte != b"\n":
continue
if cur_line.startswith(b"From ") and len(msg.lines) > 1:
self._log.debug("reached new msg")
yield msg
msg = MBoxMessage()
self._log.debug("appending line %r", cur_line)
msg.lines.append(cur_line)
cur_line = b""
if __name__ == "__main__":
sys.exit(main())