clean up mbox thing
This commit is contained in:
parent
951c3bcebe
commit
f45663f248
116
explode_mbox.py
116
explode_mbox.py
@ -1,116 +0,0 @@
|
||||
import argparse
|
||||
import gzip
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
import time
|
||||
import typing
|
||||
|
||||
|
||||
def main(sysargs=sys.argv[:]) -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("mbox", type=argparse.FileType("rb"))
|
||||
parser.add_argument("output_directory", type=pathlib.Path)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--zzz",
|
||||
default=0.005,
|
||||
type=float,
|
||||
help="sleep seconds in between messages",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-D",
|
||||
"--debug",
|
||||
action="store_true",
|
||||
help="increase logging verbosity to debug level",
|
||||
)
|
||||
|
||||
args = parser.parse_args(sysargs[1:])
|
||||
|
||||
log_level = logging.INFO
|
||||
if os.environ.get("DEBUG") == "enabled":
|
||||
log_level = logging.DEBUG
|
||||
|
||||
logging.basicConfig(level=log_level)
|
||||
|
||||
MBoxExploder().explode(
|
||||
mbox=args.mbox, output_directory=args.output_directory, pause_seconds=args.zzz
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
class MBoxMessage:
|
||||
def __init__(self):
|
||||
self.lines = []
|
||||
|
||||
def as_gz_bytes(self) -> bytes:
|
||||
return gzip.compress(self.as_bytes())
|
||||
|
||||
def as_bytes(self) -> bytes:
|
||||
return b"".join([l for l in self.lines])
|
||||
|
||||
def signature(self) -> str:
|
||||
return hashlib.sha512(self.as_bytes()).hexdigest()
|
||||
|
||||
def relpath(self) -> str:
|
||||
sig = self.signature()
|
||||
return os.path.sep.join([sig[0:2], sig[2:4], sig])
|
||||
|
||||
def gz_relpath(self) -> str:
|
||||
return self.relpath() + ".gz"
|
||||
|
||||
|
||||
class MBoxExploder:
|
||||
def __init__(self):
|
||||
self._log = logging.getLogger().getChild("mbox-exploder")
|
||||
|
||||
def explode(
|
||||
self,
|
||||
mbox: typing.BinaryIO,
|
||||
output_directory: pathlib.Path,
|
||||
pause_seconds: float,
|
||||
):
|
||||
for i, msg in enumerate(self._iter_mbox(mbox)):
|
||||
if len(msg.lines) < 2:
|
||||
self._log.warn("skipping invalid message (%r)", i)
|
||||
continue
|
||||
dest = output_directory / msg.gz_relpath()
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._log.info("writing message to %s", str(dest))
|
||||
dest.write_bytes(msg.as_gz_bytes())
|
||||
time.sleep(pause_seconds)
|
||||
|
||||
def _iter_mbox(
|
||||
self, mbox: typing.BinaryIO
|
||||
) -> typing.Generator[MBoxMessage, None, None]:
|
||||
msg = MBoxMessage()
|
||||
cur_line = b""
|
||||
|
||||
while True:
|
||||
byte = mbox.read(1)
|
||||
if len(byte) == 0:
|
||||
self._log.debug("reached EOF")
|
||||
msg.lines.append(cur_line)
|
||||
yield msg
|
||||
return
|
||||
|
||||
cur_line += byte
|
||||
|
||||
if byte != b"\n":
|
||||
continue
|
||||
|
||||
if cur_line.startswith(b"From ") and len(msg.lines) > 1:
|
||||
self._log.debug("reached new msg")
|
||||
yield msg
|
||||
msg = MBoxMessage()
|
||||
|
||||
self._log.debug("appending line %r", cur_line)
|
||||
msg.lines.append(cur_line)
|
||||
cur_line = b""
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
Loading…
Reference in New Issue
Block a user