1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
#!/usr/bin/env python3
import os
import re
import subprocess
import tempfile
import sys
from typing import List
from datetime import datetime, timedelta
from argparse import ArgumentParser
fmt = '%d%m%y-%H%M%S'
File = dict
FileList = List[File]
def get_files(source_directory: str) -> FileList:
files = []
for f in os.listdir(source_directory):
m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', f)
if not m:
continue
files.append({
'filename': os.path.join(source_directory, f),
'start': datetime.strptime(m.group(1), fmt),
'stop': datetime.strptime(m.group(2), fmt)
})
files.sort(key=lambda f: f['start'].timestamp())
return files
def group_files(files: FileList) -> List[FileList]:
groups = []
group_idx = None
for info in files:
# if group_idx is not None:
# print(info['start'], groups[group_idx][-1]['stop'])
# print(' ', info['start'] - groups[group_idx][-1]['stop'])
# print()
if group_idx is None or \
not groups[group_idx] or \
info['start'] - groups[group_idx][-1]['stop'] <= timedelta(seconds=1):
if group_idx is None:
groups.append([])
group_idx = 0
else:
group_idx += 1
groups.append([])
groups[group_idx].append(info)
return groups
def merge(groups: List[FileList],
output_directory: str,
delete_source_files=False,
vbr=False) -> None:
for g in groups:
success = False
fd = tempfile.NamedTemporaryFile(delete=False)
try:
for file in g:
line = f'file \'{file["filename"]}\'\n'
# print(line.strip())
fd.write(line.encode())
fd.close()
start = g[0]['start'].strftime(fmt)
stop = g[-1]['stop'].strftime(fmt)
fn = f'{start}_{stop}_merged.mp3'
output = os.path.join(output_directory, fn)
cmd = ['ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', fd.name,
'-map_metadata', '-1',
'-codec:a', 'libmp3lame']
if vbr:
cmd.extend(['-codec:a', 'libmp3lame', '-q:a', '4'])
else:
cmd.extend(['-codec:a', 'copy'])
cmd.append(output)
p = subprocess.run(cmd, capture_output=False)
if p.returncode != 0:
print(f'error: ffmpeg returned {p.returncode}')
else:
success = True
finally:
os.unlink(fd.name)
if success and delete_source_files:
for file in g:
os.unlink(file['filename'])
def main():
default_dir = os.getcwd()
parser = ArgumentParser()
parser.add_argument('--input-directory', '-i', type=str, default=default_dir,
help='Directory with files')
parser.add_argument('--output-directory', '-o', type=str, default=default_dir,
help='Output directory')
parser.add_argument('-D', '--delete-source-files', action='store_true')
parser.add_argument('--vbr', action='store_true',
help='Re-encode using VBR (-q:a 4)')
args = parser.parse_args()
files = get_files(os.path.realpath(args.input_directory))
if not len(files):
print(f"No mp3 files found in {args.input_directory}.")
sys.exit()
groups = group_files(files)
merge(groups,
os.path.realpath(args.output_directory),
delete_source_files=args.delete_source_files,
vbr=args.vbr)
if __name__ == '__main__':
main()
|