Script
#!/usr/bin/env python3
import argparse, hashlib, json, os, shutil, subprocess, sys, tempfile
from pathlib import Path
from tqdm import tqdm
from PIL import Image, ImageFile
import magic
from oletools.olevba import VBA_Parser
ImageFile.LOAD_TRUNCATED_IMAGES = True
SAFE_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.tiff', '.bmp', '.webp'}
VIDEO_EXTS = {'.mp4', '.mov', '.mkv', '.avi', '.webm', '.flv'}
DOC_EXTS = {'.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.odt', '.ods', '.odp', '.rtf', '.txt'}
PDF_EXTS = {'.pdf'}
ARCHIVE_EXTS = {'.zip', '.7z', '.tar', '.gz', '.bz2'}
def sha256(path: Path):
h=hashlib.sha256()
with path.open('rb') as f:
for chunk in iter(lambda: f.read(8192), b''): h.update(chunk)
return h.hexdigest()
def safe_mkdir(p: Path): p.mkdir(parents=True, exist_ok=True)
def sanitize_image(src: Path, dst: Path, quality=80):
try:
with Image.open(src) as im:
im = im.convert('RGB')
safe_mkdir(dst.parent)
out = dst.with_suffix('.jpg')
im.save(out, 'JPEG', quality=quality, optimize=True)
return out
except: return None
def sanitize_video(src: Path, dst: Path):
safe_mkdir(dst.parent)
out = dst.with_suffix('.mp4')
cmd=['ffmpeg','-y','-i',str(src),'-c:v','libx264','-preset','fast','-crf','26','-c:a','aac','-b:a','96k','-map_metadata','-1',str(out)]
try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); return out
except: return None
def sanitize_pdf(src: Path, dst: Path):
safe_mkdir(dst.parent)
out = dst.with_suffix('.pdf')
try:
subprocess.run(['gs','-dNOPAUSE','-dBATCH','-sDEVICE=pdfwrite','-dPDFSETTINGS=/printer','-dCompatibilityLevel=1.4','-sOutputFile='+str(out),str(src)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return out
except: return None
def remove_macros_office(src: Path, dst_dir: Path):
safe_mkdir(dst_dir)
tmp=dst_dir/('nomacro_'+src.name)
try:
if src.suffix.lower() in {'.doc', '.docm', '.docx', '.xls', '.xlsm', '.xlsx', '.ppt', '.pptm', '.pptx'}:
try:
vb=VBA_Parser(str(src))
if vb.detect_vba_macros():
# create a macro-free copy by converting to odf then back to desired formats
with tempfile.TemporaryDirectory() as td:
subprocess.run(['libreoffice','--headless','--convert-to','odt' if src.suffix.lower().startswith('.doc') else ('ods' if src.suffix.lower().startswith('.xls') else 'odp'),'--outdir',td,str(src)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=120)
for f in Path(td).iterdir():
shutil.move(str(f), str(tmp))
return tmp if tmp.exists() else None
except Exception:
pass
shutil.copy2(src, tmp)
return tmp
except Exception:
return None
def convert_office(src: Path, dst_dir: Path):
safe_mkdir(dst_dir)
base=dst_dir/(src.stem)
odf=None
try:
with tempfile.TemporaryDirectory() as td:
subprocess.run(['libreoffice','--headless','--convert-to','odt' if src.suffix.lower() in {'.doc','.docx','.rtf','.txt'} else ('ods' if src.suffix.lower() in {'.xls','.xlsx'} else 'odp'),'--outdir',td,str(src)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=120)
for f in Path(td).iterdir():
odf=dst_dir/f.name
shutil.move(str(f), str(odf))
except Exception:
pass
try:
pdf_out=dst_dir/(src.stem+'_sanitized.pdf')
subprocess.run(['libreoffice','--headless','--convert-to','pdf','--outdir',str(dst_dir),str(src)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=120)
if (dst_dir/(src.stem+'.pdf')).exists():
sanitize_pdf(dst_dir/(src.stem+'.pdf'), pdf_out)
return odf, pdf_out if pdf_out.exists() else None
except Exception:
pass
return odf, None
def unpack_archive(src: Path, tempdir: Path):
try:
safe_mkdir(tempdir)
subprocess.run(['7z','x','-y','-o'+str(tempdir),str(src)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except: return False
def pack_zip(srcdir: Path, dstzip: Path):
safe_mkdir(dstzip.parent)
shutil.make_archive(str(dstzip.with_suffix('')), 'zip', root_dir=str(srcdir))
def is_executable(mime: str, suf: str):
low=mime.lower()
if 'javascript' in low or 'x-sh' in low or suf in {'.exe','.dll','.so','.scr','.bat','.cmd','.ps1'}: return True
return False
def process_file(src: Path, in_root: Path, out_root: Path, quarantine: Path):
rel=src.relative_to(in_root)
out_base=out_root/rel
rec={'src':str(src),'sha256_src':sha256(src),'action':None,'out':[],'sha256_out':[]}
try:
m=magic.from_file(str(src), mime=True)
except: m=''
suf=src.suffix.lower()
if suf in ARCHIVE_EXTS:
with tempfile.TemporaryDirectory() as td:
tdpath=Path(td)
ok=unpack_archive(src, tdpath)
if not ok:
q=quarantine/rel
safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='archive_failed_quarantine'; rec['out'].append(str(q)); return rec
for f in tdpath.rglob('*'):
if f.is_file():
process_file(f, tdpath, out_root/rel.stem, quarantine/rel.stem)
packed=out_root/rel.with_suffix('.zip')
pack_zip(out_root/rel.stem, packed)
rec['action']='archive_processed_repacked'
rec['out'].append(str(packed))
rec['sha256_out'].append(sha256(packed))
return rec
if suf in SAFE_IMAGE_EXTS or m.startswith('image/'):
s=sanitize_image(src, out_base)
if s: rec['action']='image_reencoded'; rec['out'].append(str(s)); rec['sha256_out'].append(sha256(s))
else:
q=quarantine/rel; safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='image_quarantined'; rec['out'].append(str(q))
return rec
if suf in VIDEO_EXTS or m.startswith('video/'):
s=sanitize_video(src, out_base)
if s: rec['action']='video_reencoded'; rec['out'].append(str(s)); rec['sha256_out'].append(sha256(s))
else:
q=quarantine/rel; safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='video_quarantined'; rec['out'].append(str(q))
return rec
if suf in PDF_EXTS or m=='application/pdf':
s=sanitize_pdf(src, out_base)
if s: rec['action']='pdf_rasterized'; rec['out'].append(str(s)); rec['sha256_out'].append(sha256(s))
else:
q=quarantine/rel; safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='pdf_quarantined'; rec['out'].append(str(q))
return rec
if suf in DOC_EXTS or any(k in m for k in ('wordprocessingml','presentation','spreadsheet','msword','vnd.openxmlformats-officedocument')):
nm=remove_macros_office(src, quarantine/('nomacro'))
target_src = nm if nm is not None else src
odf,pdf = convert_office(target_src, out_root/rel.parent)
if odf:
rec['action']='office_converted_macros_removed'
rec['out'].append(str(odf))
if pdf: rec['out'].append(str(pdf)); rec['sha256_out'].append(sha256(pdf))
else:
q=quarantine/rel; safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='office_quarantined'; rec['out'].append(str(q))
return rec
if is_executable(m, suf):
q=quarantine/rel; safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='executable_quarantined'; rec['out'].append(str(q)); return rec
q=quarantine/rel; safe_mkdir(q.parent); shutil.copy2(src,q); rec['action']='unknown_quarantined'; rec['out'].append(str(q)); return rec
def walk_and_process(in_root: Path, out_root: Path, report: Path):
quarantine=out_root/'QUARANTINE'
files=[p for p in in_root.rglob('*') if p.is_file()]
reports=[]
for f in tqdm(files, desc="Processing", unit="file", ncols=80):
try:
reports.append(process_file(f, in_root, out_root, quarantine))
except Exception as e:
reports.append({'src':str(f),'action':'error','error':str(e)})
report.write_text(json.dumps(reports, indent=2))
return reports
def main():
p=argparse.ArgumentParser(description='Aggressive sanitization: rasterize PDFs, re-encode media, remove macros, process archives recursively')
p.add_argument('--path', required=True, help='input folder')
p.add_argument('--output', required=True, help='output folder')
p.add_argument('--report', default='report.json', help='report file name (json)')
args=p.parse_args()
inp=Path(args.path).resolve()
out=Path(args.output).resolve()
safe_mkdir(out)
if not inp.exists(): print('Input does not exist'); sys.exit(1)
print('Starting aggressive sanitization. Originals are not deleted. Keep backups.')
walk_and_process(inp, out, out/args.report)
print('Done. Report:', out/args.report)
if __name__=='__main__': main()
This script reduces common risks (strips metadata, re-encodes media, converts docs via LibreOffice, sanitizes PDFs) . It will recursively scan specified folder for videos, images, and documents.