Classes and Logging and Gzip oh my
This commit is contained in:
parent
263b10b049
commit
7275f13738
119
explode_mbox.py
119
explode_mbox.py
@ -1,13 +1,13 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import gzip
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
DEBUG = os.environ.get("DEBUG") == "enabled"
|
|
||||||
|
|
||||||
|
|
||||||
def main(sysargs=sys.argv[:]) -> int:
|
def main(sysargs=sys.argv[:]) -> int:
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
@ -20,75 +20,96 @@ def main(sysargs=sys.argv[:]) -> int:
|
|||||||
type=float,
|
type=float,
|
||||||
help="sleep seconds in between messages",
|
help="sleep seconds in between messages",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-D",
|
||||||
|
"--debug",
|
||||||
|
action="store_true",
|
||||||
|
help="increase logging verbosity to debug level",
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args(sysargs[1:])
|
args = parser.parse_args(sysargs[1:])
|
||||||
|
|
||||||
for msg in _iter_mbox(args.mbox):
|
log_level = logging.INFO
|
||||||
if msg.length < 2:
|
if os.environ.get("DEBUG") == "enabled":
|
||||||
print("skipping invalid message")
|
log_level = logging.DEBUG
|
||||||
continue
|
|
||||||
dest = args.output_directory / msg.relpath
|
logging.basicConfig(level=log_level)
|
||||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
print(f"writing message to {dest}")
|
MBoxExploder().explode(
|
||||||
dest.write_bytes(msg.as_bytes())
|
mbox=args.mbox, output_directory=args.output_directory, pause_seconds=args.zzz
|
||||||
time.sleep(args.zzz)
|
)
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
class Message:
|
class MBoxMessage:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.lines = []
|
self.lines = []
|
||||||
|
|
||||||
|
def as_gz_bytes(self) -> bytes:
|
||||||
|
return gzip.compress(self.as_bytes())
|
||||||
|
|
||||||
def as_bytes(self) -> bytes:
|
def as_bytes(self) -> bytes:
|
||||||
return b"".join([l for l in self.lines])
|
return b"".join([l for l in self.lines])
|
||||||
|
|
||||||
@property
|
|
||||||
def length(self) -> int:
|
|
||||||
return len(self.lines)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def signature(self) -> str:
|
def signature(self) -> str:
|
||||||
return hashlib.sha512(self.lines[0]).hexdigest()
|
return hashlib.sha512(self.as_bytes()).hexdigest()
|
||||||
|
|
||||||
@property
|
|
||||||
def relpath(self) -> str:
|
def relpath(self) -> str:
|
||||||
sig = self.signature
|
sig = self.signature()
|
||||||
return os.path.sep.join(
|
return os.path.sep.join([sig[0:2], sig[2:4], sig])
|
||||||
[sig[0:1], sig[1:3], sig[3:5], sig[5:7], f"{sig}.email"]
|
|
||||||
)
|
def gz_relpath(self) -> str:
|
||||||
|
return self.relpath() + ".gz"
|
||||||
|
|
||||||
|
|
||||||
def _iter_mbox(mbox: typing.BinaryIO) -> typing.Generator[Message, None, None]:
|
class MBoxExploder:
|
||||||
msg = Message()
|
def __init__(self):
|
||||||
cur_line = b""
|
self._log = logging.getLogger().getChild("mbox-exploder")
|
||||||
|
|
||||||
while True:
|
def explode(
|
||||||
byte = mbox.read(1)
|
self,
|
||||||
if len(byte) == 0:
|
mbox: typing.BinaryIO,
|
||||||
_debug(f"reached EOF")
|
output_directory: pathlib.Path,
|
||||||
msg.lines.append(cur_line)
|
pause_seconds: float,
|
||||||
yield msg
|
):
|
||||||
return
|
for i, msg in enumerate(self._iter_mbox(mbox)):
|
||||||
|
if len(msg.lines) < 2:
|
||||||
|
self._log.warn("skipping invalid message (%r)", i)
|
||||||
|
continue
|
||||||
|
dest = output_directory / msg.gz_relpath()
|
||||||
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._log.info("writing message to %s", str(dest))
|
||||||
|
dest.write_bytes(msg.as_gz_bytes())
|
||||||
|
time.sleep(pause_seconds)
|
||||||
|
|
||||||
cur_line += byte
|
def _iter_mbox(
|
||||||
|
self, mbox: typing.BinaryIO
|
||||||
if byte != b"\n":
|
) -> typing.Generator[MBoxMessage, None, None]:
|
||||||
continue
|
msg = MBoxMessage()
|
||||||
|
|
||||||
if cur_line.startswith(b"From "):
|
|
||||||
_debug(f"reached new msg")
|
|
||||||
yield msg
|
|
||||||
msg = Message()
|
|
||||||
|
|
||||||
_debug(f"appending line {cur_line}")
|
|
||||||
msg.lines.append(cur_line)
|
|
||||||
cur_line = b""
|
cur_line = b""
|
||||||
|
|
||||||
|
while True:
|
||||||
|
byte = mbox.read(1)
|
||||||
|
if len(byte) == 0:
|
||||||
|
self._log.debug("reached EOF")
|
||||||
|
msg.lines.append(cur_line)
|
||||||
|
yield msg
|
||||||
|
return
|
||||||
|
|
||||||
def _debug(msg):
|
cur_line += byte
|
||||||
if not DEBUG:
|
|
||||||
return
|
if byte != b"\n":
|
||||||
print(f"DEBUG: {msg}", file=sys.stderr)
|
continue
|
||||||
|
|
||||||
|
if cur_line.startswith(b"From ") and len(msg.lines) > 1:
|
||||||
|
self._log.debug("reached new msg")
|
||||||
|
yield msg
|
||||||
|
msg = MBoxMessage()
|
||||||
|
|
||||||
|
self._log.debug("appending line %r", cur_line)
|
||||||
|
msg.lines.append(cur_line)
|
||||||
|
cur_line = b""
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user