-
Notifications
You must be signed in to change notification settings - Fork 0
/
pptslim.py
executable file
·168 lines (137 loc) · 5.26 KB
/
pptslim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
import argparse
import asyncio
import mimetypes
import os
import shutil
import tempfile
import zipfile
from pathlib import Path
from functools import partial
'''
Draft:
ffmpeg -loop 1 -i dummy.jpg -t 1 -c:v libx264 -tune stillimage -c:a aac -b:a 192k -pix_fmt yuv420p -shortest dummy.mp4
'''
def sizeof_fmt(num, suffix='B'):
'''from https://stackoverflow.com/a/1094933/2281355'''
for unit in ['','K','M','G']:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'T', suffix)
async def run_subp_async(args, **kwargs):
proc = await asyncio.create_subprocess_exec(
*args, **kwargs,
stdout=asyncio.subprocess.PIPE)
try:
print('Running command', args, ':')
async def _read_stream(stream):
while True:
ln = await stream.readline()
if not ln: break
print(f'>>> ' + ln.decode(errors='ignore').rstrip('\n'))
await _read_stream(proc.stdout)
await proc.wait()
except Exception as err:
proc.kill()
await proc.wait()
if isinstance(err, asyncio.TimeoutError):
return None
else:
raise
def prepare_replacing_file(zipf, entry, mime):
'''For an entry in the zip file, produce a smaller file at path of the same
format. Return a thunk(dest) for writing to the destination,
None otherwise.
'''
# the dummy file should be generated of the correct format.
# ffmpeg can handle this by using an appropriate extension, but who cares
async def write_mp4(dest):
shutil.copyfile(Path(__file__).parent / 'assets/dummy.mp4', dest)
async def write_image(dest):
with zipf.open(entry.filename) as src, \
tempfile.NamedTemporaryFile() as tmp:
shutil.copyfileobj(src, tmp)
tmp.flush()
await run_subp_async(['convert', tmp.name + '[0]', dest])
async def write_png(dest):
with zipf.open(entry.filename) as src, \
tempfile.NamedTemporaryFile(suffix='.png') as tmp:
shutil.copyfileobj(src, tmp)
tmp.flush()
await run_subp_async(
['pngquant', tmp.name,
'--speed', '1',
'--nofs', '-v',
'--output', dest])
if entry.compress_size < 300 * 1024: # 300k? is 300k a lot?
return None
if mime.startswith('video/'):
return write_mp4
elif mime == 'image/png':
return write_png
elif mime.startswith('image/'):
return write_image
else:
print('!! Unknown/unsupported format [{}] for file {}, skipping...'
.format(mime, entry.filename))
return None
def gen_parser():
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('input', help='path to input file')
parser.add_argument('output', nargs='?', help='path to output file')
parser.add_argument('--in-place',
dest='inplace',
action='store_true',
help='If set, the input file would be overwritten.')
return parser
async def main(args):
if args.output and args.inplace:
print('Error: output and --in-place cannot be both used.')
return
elif not args.output and not args.inplace:
print('Error: Missing the path to output file. If you want to overwrite, use --in-place.')
return
if args.inplace:
args.output = args.input
tmpdir = tempfile.TemporaryDirectory()
f = tempfile.NamedTemporaryFile(suffix='.zip')
with open(args.input, 'rb') as finp:
shutil.copyfileobj(finp, f)
zipf = zipfile.ZipFile(f)
infolist = filter(lambda x: x.filename.startswith('ppt/media/'), zipf.infolist())
zip_changed = False
for ent in infolist:
entpath = Path(ent.filename)
mime = mimetypes.types_map.get(entpath.suffix, '<<unknown>>')
# uncomment if you are only troubled with gifs...
# if mime.startswith('image/') and mime != 'image/gif':
# continue
thunk = prepare_replacing_file(zipf, ent, mime)
if thunk is not None:
print('> {}, type={}, size={} ({} stored)'.format(
ent.filename, mime,
sizeof_fmt(ent.file_size),
sizeof_fmt(ent.compress_size)))
zip_changed = True
dest_dir = Path(tmpdir.name, Path(ent.filename).parent)
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / entpath.name
await thunk(str(dest))
new_sz = dest.stat().st_size
print(' --> new size: {}'.format(sizeof_fmt(new_sz)))
if zip_changed:
print('****** Updating zip...')
await run_subp_async(['zip', '-r', f.name, '.'], cwd=tmpdir.name)
else:
print('****** No file is changed!')
zipf.close()
# here, copyfileobj on f copies the unmodified content instead, why???
shutil.copyfile(f.name, args.output)
print('New size ==>', sizeof_fmt(Path(args.output).stat().st_size))
# it should clean up itself, but we do it explicitly here
print('****** Cleaning up...')
tmpdir.cleanup()
if __name__ == '__main__':
parser = gen_parser()
asyncio.run(main(parser.parse_args()))