#!/usr/bin/env python3
# Copyright (c) 2025 Amin Bandali <
[email protected]>
#
# Copying and distribution of this file, with or without modification,
# are permitted in any medium without royalty provided the copyright
# notice and this notice are preserved. This file is offered as-is,
# without any warranty.
# Accompanying writeup:
#
#
https://kelar.org/~bandali/2025/07/25/protesilaos-videos-archive.html
#
gopher://kelar.org/1/~bandali/2025/07/25/protesilaos-videos-archive.txt
# Changelog:
#
# 0.1 - Initial release
from __future__ import annotations
from collections.abc import Iterator
from enum import IntEnum
from types import FrameType
import argparse
import datetime
import glob
import logging
import os
import re
import signal
import sys
import time
import typing
try:
import orjson as json
except ImportError:
print('warn: orjson library not available, falling back to built in json')
import json
try:
import markdown
except ImportError:
print('error: markdown library not found')
sys.exit(1)
try:
import internetarchive
except ImportError:
print('error: internetarchive library not found')
sys.exit(1)
try:
import requests
except ImportError:
print('error: requests library not found')
sys.exit(1)
try:
import yt_dlp
except ImportError:
print('error: yt_dlp library not found')
sys.exit(1)
PROT_WEBSITE = '
https://protesilaos.com'
PROGRESS_FILE_NAME = '.pva-progress.jsonl'
LOG_FILE_NAME = '.pva.log'
MD_FILE_EXTS = ('.md', '.markdown', '.mdwn', '.txt')
YT_OPTS = {
'continuedl': True,
'ignoreerrors': True,
'outtmpl': '%(id)s.%(ext)s',
'sleep_interval_requests': 2,
'sleep_interval': 3,
'max_sleep_interval': 5
}
EXCLUDED_META = {'layout', 'permalink'}
logger: logging.Logger # will be initialized in 'main'
progress_dict = {}
# {
# name: {
# mediaid | name: FileStatus,
# [...]
# },
# [...]
# }
_sigint_received = False
class FileStatus(IntEnum):
DOWNLOAD_SUCCESS = 1
DOWNLOAD_FAIL = 2
UPLOAD_SUCCESS = 3
UPLOAD_FAIL = 4
class PVALogger(logging.Logger):
def __init__(self, name: str):
super().__init__(name=name)
class PVAFormatter(logging.Formatter):
def converter(self, timestamp: float | None):
if timestamp:
return datetime.datetime.fromtimestamp(
timestamp,
datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo)
else:
return time.localtime()
def formatTime(self, record: logging.LogRecord,
datefmt: str | None = None):
ct = self.converter(record.created)
if datefmt:
s = ct.strftime(datefmt)
else:
s = ct.strftime(self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
return s
def strip_balanced(s: str, s0: str) -> str:
if s.startswith(s0) and s.endswith(s0):
return s.removeprefix(s0).removesuffix(s0)
else:
return s
def strip_quotes(s: str) -> str:
return strip_balanced(strip_balanced(s, "'"), '"')
def simplify_meta(meta: dict[str, list[str]]) -> dict[str, list[str] | str]:
m = {} # type: dict[str, list[str] | str]
for k in meta:
m_k = meta[k] # type: list[str] | str
if isinstance(m_k, str):
m_k = strip_quotes(m_k)
elif isinstance(m_k, list):
m_k = [strip_quotes(s) for s in m_k if isinstance(s, str)]
if len(m_k) == 1:
m_k = m_k[0]
m[k] = m_k
return m
def get_md_files(root_dir: str) -> Iterator[str]:
for root, _dirs, files in os.walk(root_dir):
for file in files:
if file.lower().endswith(MD_FILE_EXTS):
abs_path = os.path.join(root, file)
yield abs_path.removeprefix(root_dir).lstrip(os.sep)
def read_md_file(md: markdown.Markdown, root_dir: str, file: str) -> dict:
try:
file_path = os.path.join(root_dir, file)
with open(file_path, 'r', encoding='utf-8') as f:
try:
html = md.reset().convert(f.read())
meta = md.Meta # type: ignore[attr-defined]
if 'mediaid' in meta:
meta = simplify_meta(meta)
mediaids = meta['mediaid'] # type: list[str] | str
if not any(mediaids):
logger.warning(f'only blank mediaid in file {f.name}; skipping')
return {}
(file_noext, file_ext) = os.path.splitext(file)
basename = os.path.basename(file)
url_frag = file_noext.lstrip('_')
item_id = 'prot-' + url_frag.replace(os.sep, '-')
source = '' # type: str
if 'permalink' in meta and (source := meta['permalink']):
source = PROT_WEBSITE + source
else:
source = f'{PROT_WEBSITE}/{url_frag}/'
date = ''
meta_date = '' # type: str
if 'date' in meta and (meta_date := meta['date']):
date_formats = ['%Y-%m-%dT%H:%M:%S%:z',
'%Y-%m-%dT%H:%M:%S%z',
'%Y-%m-%d']
for date_format in date_formats:
try:
dt = datetime.datetime.strptime(meta_date, date_format)
date = dt.strftime('%Y-%m-%d')
break
except ValueError:
pass
if not date:
if (m := re.match(r'(\d+)-(\d+)-(\d+)-.*', basename)):
year, month, day = m.groups()
date = f'{year}-{month}-{day}'
file_dict = {
'name': file,
'status': {},
'ext': file_ext,
'basename': basename,
'path': file_path,
'body': html,
'item_id': item_id,
'source': source,
'date': date,
'media': {},
'meta': meta
}
if isinstance(mediaids, str):
file_dict['media'][mediaids] = ''
elif isinstance(mediaids, list):
for mediaid in mediaids:
file_dict['media'][mediaid] = ''
return file_dict
else:
return {}
except UnicodeDecodeError:
logger.warning(f'UnicodeDecodeError when trying to read file {f.name}; skipping')
return {}
except (FileNotFoundError, PermissionError, OSError):
logger.warning(f'failed to read file {file}; skipping')
return {}
def download_from_yt(file: dict, cookie_file: str | None):
yt_opts = YT_OPTS.copy()
yt_opts['outtmpl'] = file['item_id'] + '-' + yt_opts['outtmpl']
if cookie_file:
yt_opts['cookiefile'] = cookie_file
with yt_dlp.YoutubeDL(yt_opts) as y:
for mediaid in file['media']:
mediaid_files = glob.glob(f'*{mediaid}.*')
if (not mediaid_files
or any(s.endswith('.part') for s in mediaid_files)):
logger.info(f'downloading {mediaid}')
info = y.extract_info(mediaid)
mediaid_files_after = glob.glob(f'*{mediaid}.*')
if (info and mediaid_files_after
and not any(s.endswith('.part') for s in mediaid_files_after)):
# assert y._download_retcode == 0
file['media'][mediaid] = y.prepare_filename(info)
file['status'][mediaid] = FileStatus.DOWNLOAD_SUCCESS
logger.info(f'downloaded {mediaid} successfully')
else:
file['status'][mediaid] = FileStatus.DOWNLOAD_FAIL
logger.warning(f'failed to download {mediaid}')
if _sigint_received:
return
def upload_to_archive(file: dict):
item_id = file['item_id']
item = internetarchive.get_item(item_id)
metadata = {k: v for k, v in file['meta'].items()
if k not in EXCLUDED_META}
metadata.update({
'collection': 'protesilaos-videos',
'creator': 'Protesilaos Stavrou',
'title': file['meta']['title'],
'date': file['date'],
'description': file['body'],
'source': file['source'],
'licenseurl': '
https://creativecommons.org/licenses/by-sa/4.0/',
'mediatype': 'movies'
})
logger.info(f'uploading item {item_id} to Internet Archive')
for mediaid in file['media']:
media_name = file['media'][mediaid]
media_size = 0
try:
media_size = os.path.getsize(media_name)
except OSError:
pass
if (file['status'][mediaid] in (FileStatus.DOWNLOAD_SUCCESS,
FileStatus.UPLOAD_FAIL)
and os.path.exists(media_name)
and media_size > 0):
logger.info(f'uploading file {media_name}')
r = item.upload({media_name: media_name}, metadata=metadata)
# assert len(r) == 1
assert isinstance(r[0], requests.Response)
if r[0].ok:
file['status'][mediaid] = FileStatus.UPLOAD_SUCCESS
logger.info(f'uploaded file {media_name} successfully')
else:
file['status'][mediaid] = FileStatus.UPLOAD_FAIL
logger.warning(f'failed to upload file {media_name}:')
logger.warning(f'response.status_code: {r[0].status_code}')
logger.warning(f'response.url: {r[0].url}')
logger.warning(f'response.text: {r[0].text}')
if _sigint_received:
return
# and lastly upload the markdown file
remote_name = item_id + file['ext']
local_name = file['path']
file_name = file['name']
media_status = {k: v for k, v in file['status'].items()
if k != file_name}
# it's important that at least one media file be uploaded before
# we upload the markdown file, so that the item will be assigned
# right mediatype, which cannot be changed afterwards.
if (any(media_status[m] == FileStatus.UPLOAD_SUCCESS
for m in media_status)
and (not file_name in file['status']
or file['status'][file_name] == FileStatus.UPLOAD_FAIL)):
logger.info(f'uploading file {file_name}')
r = item.upload({remote_name: local_name})
# assert len(r) == 1
assert isinstance(r[0], requests.Response)
if r[0].ok:
file['status'][file_name] = FileStatus.UPLOAD_SUCCESS
logger.info(f"uploaded file {file_name} successfully")
else:
file['status'][file_name] = FileStatus.UPLOAD_FAIL
logger.warning(f'failed to upload file {file_name}:')
logger.warning(f'response.status_code: {r[0].status_code}')
logger.warning(f'response.url: {r[0].url}')
logger.warning(f'response.text: {r[0].text}')
logger.info(f'finished uploading item {item_id} to Internet Archive')
def update_item_description(file: dict):
item_id = file['item_id']
item = internetarchive.get_item(item_id)
r = item.modify_metadata(metadata={'description': file['body']})
assert isinstance(r, requests.Response)
logger.info(f'response.status_code: {r.status_code}'
f' - updated description for item {item_id}')
def read_progress_file():
try:
with open(PROGRESS_FILE_NAME, 'x', encoding='utf-8'):
logger.info(f'created empty file {PROGRESS_FILE_NAME}')
except FileExistsError:
logger.info(f'found existing {PROGRESS_FILE_NAME}')
except (PermissionError, OSError):
logger.error(f'failed to create file {PROGRESS_FILE_NAME}')
try:
with open(PROGRESS_FILE_NAME, 'r', encoding='utf-8') as f:
for line in f:
l = line.rstrip('\n')
j = {}
try:
j = json.loads(l)
except (json.JSONDecodeError, TypeError) as e:
logger.error(f'failed to decode progress line as JSON: {l}')
logger.error(f' {str(e)}')
if j:
progress_dict.update(j)
except (FileNotFoundError, PermissionError, OSError):
logger.error(f'failed to read file {PROGRESS_FILE_NAME}')
def write_progress_file():
if progress_dict:
try:
with open(PROGRESS_FILE_NAME, 'w', encoding='utf-8') as f:
for k in progress_dict:
j = json.dumps({k: progress_dict[k]})
f.write(f'{j}\n')
except (FileNotFoundError, PermissionError, OSError):
logger.error(f'failed to write file {PROGRESS_FILE_NAME}')
def exit_if_sigint():
if _sigint_received:
write_progress_file()
sys.exit(130)
def process_files(root_dir: str, cookie_file: str | None) -> None:
md = markdown.Markdown(extensions=['extra', 'meta', 'smarty'])
read_progress_file()
remaining_files = []
logger.info('gathering list of markdown files with mediaid'
' (this may take a while)')
for md_filename in get_md_files(root_dir):
if (not md_filename in progress_dict
or not md_filename in progress_dict[md_filename]):
if (f := read_md_file(md, root_dir, md_filename)):
remaining_files.append(f)
exit_if_sigint()
logger.info(f'sorting files lexicographically')
remaining_files.sort(key=lambda f: f['basename'])
for f in remaining_files:
logger.info(f"processing markdown file {f['name']}")
if f['name'] in progress_dict:
f['status'].update(progress_dict[f['name']])
else:
progress_dict[f['name']] = {}
download_from_yt(f, cookie_file)
progress_dict[f['name']].update(f['status'])
exit_if_sigint()
upload_to_archive(f)
progress_dict[f['name']].update(f['status'])
exit_if_sigint()
write_progress_file()
def signal_handler(signum: int, frame: FrameType | None):
if signum == signal.SIGINT:
global _sigint_received
_sigint_received = True
logger.info('SIGINT received, will exit gracefully ASAP')
def initialize_logger(level: int = logging.INFO)\
-> tuple[logging.Logger, logging.Formatter,
logging.StreamHandler, logging.FileHandler]:
logging.setLoggerClass(PVALogger)
logger = logging.getLogger(__name__)
formatter = PVAFormatter(
fmt='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S.%f%z')
ch = logging.StreamHandler()
fh = logging.FileHandler(filename=LOG_FILE_NAME, encoding='utf-8')
for h in (ch, fh):
h.name = __name__
h.setFormatter(formatter)
logger.addHandler(h)
logger.setLevel(level)
return logger, formatter, ch, fh
def main() -> int:
signal.signal(signal.SIGINT, signal_handler)
global logger
logger, _, _, _ = initialize_logger()
parser = argparse.ArgumentParser(
description="Prepare and upload Prot's videos to Internet Archive")
parser.add_argument(
'root_dir',
help='directory to search (recursively) for markdown files')
parser.add_argument(
'-c', '--cookie-file',
help='path to a cookie file to be used with yt-dlp')
parser.add_argument(
'-w', '--working-dir',
help='working directory, for storing videos and progress file')
args = parser.parse_args()
root_dir = os.path.expanduser(args.root_dir)
working_dir = os.path.expanduser(args.working_dir) \
if args.working_dir else None
cookie_file = os.path.expanduser(args.cookie_file) \
if args.cookie_file else None
if working_dir:
try:
os.chdir(working_dir)
logger.info(f'changed working directory to {working_dir}')
except FileNotFoundError:
logger.warning('failed to change working directory to'
f' {working_dir} because it does not exist')
if cookie_file:
if os.path.exists(cookie_file):
logger.info(f'using cookie file {cookie_file} with yt-dlp')
else:
logger.warning(f'cookie file {cookie_file} does not exist')
process_files(root_dir, cookie_file)
return 0
if __name__ == '__main__':
sys.exit(main())