You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
box-o-sand/explode_mbox.py

87 lines
1.9 KiB

import argparse
import hashlib
import os
import pathlib
import sys
import typing
DEBUG = os.environ.get("DEBUG") == "enabled"
def main(sysargs=sys.argv[:]) -> int:
parser = argparse.ArgumentParser()
parser.add_argument("mbox", type=argparse.FileType("rb"))
parser.add_argument("output_directory", type=pathlib.Path)
args = parser.parse_args(sysargs[1:])
for msg in _iter_mbox(args.mbox):
if msg.length < 2:
print("skipping invalid message")
continue
dest = args.output_directory / msg.relpath
dest.parent.mkdir(parents=True, exist_ok=True)
print(f"writing message to {dest}")
dest.write_bytes(msg.as_bytes())
return 0
class Message:
def __init__(self):
self.lines = []
def as_bytes(self) -> bytes:
return b"".join([l for l in self.lines])
@property
def length(self) -> int:
return len(self.lines)
@property
def signature(self) -> str:
return hashlib.sha512(self.lines[0]).hexdigest()
@property
def relpath(self) -> str:
sig = self.signature
return os.path.sep.join(
[sig[0:1], sig[1:3], sig[3:5], sig[5:7], f"{sig}.email"]
)
def _iter_mbox(mbox: typing.BinaryIO) -> typing.Generator[Message, None, None]:
msg = Message()
cur_line = b""
while True:
byte = mbox.read(1)
if len(byte) == 0:
_debug(f"reached EOF")
msg.lines.append(cur_line)
yield msg
return
cur_line += byte
if byte != b"\n":
continue
if cur_line.startswith(b"From "):
_debug(f"reached new msg")
yield msg
msg = Message()
_debug(f"appending line {cur_line}")
msg.lines.append(cur_line)
cur_line = b""
def _debug(msg):
if not DEBUG:
return
print(f"DEBUG: {msg}", file=sys.stderr)
if __name__ == "__main__":
sys.exit(main())