diff options
Diffstat (limited to 'tools/merge-recordings.py')
-rwxr-xr-x | tools/merge-recordings.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/tools/merge-recordings.py b/tools/merge-recordings.py new file mode 100755 index 0000000..637858e --- /dev/null +++ b/tools/merge-recordings.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +import os +import re +import subprocess +import tempfile +import sys + +from datetime import datetime, timedelta +from argparse import ArgumentParser + + +fmt = '%d%m%y-%H%M%S' + +File = dict +FileList = list[File] + + +def get_files(source_directory: str) -> FileList: + files = [] + for f in os.listdir(source_directory): + m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', f) + if not m: + continue + + files.append({ + 'filename': os.path.join(source_directory, f), + 'start': datetime.strptime(m.group(1), fmt), + 'stop': datetime.strptime(m.group(2), fmt) + }) + files.sort(key=lambda f: f['start'].timestamp()) + return files + + +def group_files(files: FileList) -> list[FileList]: + groups = [] + group_idx = None + + for info in files: + # if group_idx is not None: + # print(info['start'], groups[group_idx][-1]['stop']) + # print(' ', info['start'] - groups[group_idx][-1]['stop']) + # print() + + if group_idx is None or \ + not groups[group_idx] or \ + info['start'] - groups[group_idx][-1]['stop'] <= timedelta(seconds=1): + if group_idx is None: + groups.append([]) + group_idx = 0 + else: + group_idx += 1 + groups.append([]) + groups[group_idx].append(info) + + return groups + + +def merge(groups: list[FileList], + output_directory: str, + delete_source_files=False, + vbr=False) -> None: + for g in groups: + success = False + + fd = tempfile.NamedTemporaryFile(delete=False) + try: + for file in g: + line = f'file \'{file["filename"]}\'\n' + # print(line.strip()) + fd.write(line.encode()) + fd.close() + + start = g[0]['start'].strftime(fmt) + stop = g[-1]['stop'].strftime(fmt) + fn = f'{start}_{stop}_merged.mp3' + output = os.path.join(output_directory, fn) + + cmd = ['ffmpeg', '-y', + '-f', 'concat', + '-safe', '0', + '-i', fd.name, + '-map_metadata', '-1', + '-codec:a', 'libmp3lame'] + if vbr: + cmd.extend(['-codec:a', 'libmp3lame', '-q:a', '4']) + else: + cmd.extend(['-codec:a', 'copy']) + cmd.append(output) + + p = subprocess.run(cmd, capture_output=False) + if p.returncode != 0: + print(f'error: ffmpeg returned {p.returncode}') + else: + success = True + finally: + os.unlink(fd.name) + + if success and delete_source_files: + for file in g: + os.unlink(file['filename']) + + +def main(): + default_dir = os.getcwd() + + parser = ArgumentParser() + parser.add_argument('--input-directory', '-i', type=str, default=default_dir, + help='Directory with files') + parser.add_argument('--output-directory', '-o', type=str, default=default_dir, + help='Output directory') + parser.add_argument('-D', '--delete-source-files', action='store_true') + parser.add_argument('--vbr', action='store_true', + help='Re-encode using VBR (-q:a 4)') + args = parser.parse_args() + + files = get_files(os.path.realpath(args.input_directory)) + if not len(files): + print(f"No mp3 files found in {args.input_directory}.") + sys.exit() + + groups = group_files(files) + + merge(groups, + os.path.realpath(args.output_directory), + delete_source_files=args.delete_source_files, + vbr=args.vbr) + + +if __name__ == '__main__': + main() |