#!/usr/bin/env -S uv run --python=python3.12 --script # /// script # requires-python = ">=3.12" # dependencies = ["dfindexeddb"] # /// """Analyze Slack IndexedDB Structure Dumps the full schema of the IndexedDB: databases, object stores, record types, and the Redux state top-level keys with sizes/types. Used for generating documentation. """ import pathlib import shutil import tempfile from collections import Counter from dfindexeddb.indexeddb.chromium.blink import V8ScriptValueDecoder from dfindexeddb.indexeddb.chromium.record import FolderReader HOME = pathlib.Path.home() IDB_BASE = HOME / ( "Library/Containers/com.tinyspeck.slackmacgap" "/Data/Library/Application Support/Slack/IndexedDB" ) LDB_DIR = IDB_BASE / "https_app.slack.com_0.indexeddb.leveldb" BLOB_DIR = IDB_BASE / "https_app.slack.com_0.indexeddb.blob" def analyze_leveldb(): """Analyze LevelDB Record Structure""" print("=" * 70) print("LEVELDB / INDEXEDDB STRUCTURE") print("=" * 70) # Filesystem Layout print("\n## Filesystem Layout") print(f"\nLevelDB dir: {LDB_DIR}") for p in sorted(LDB_DIR.iterdir()): size = p.stat().st_size print(f" {p.name:30s} {size:>12,} bytes") print(f"\nBlob dir: {BLOB_DIR}") for p in sorted(BLOB_DIR.rglob("*")): if p.is_file(): size = p.stat().st_size rel = p.relative_to(BLOB_DIR) print(f" {str(rel):30s} {size:>12,} bytes") # Copy DB to Avoid Lock tmp = pathlib.Path(tempfile.mkdtemp()) shutil.copytree(str(LDB_DIR), str(tmp / "db")) (tmp / "db" / "LOCK").unlink(missing_ok=True) # Parse Records reader = FolderReader(tmp / "db") key_types = Counter() db_meta = {} # db_id -> {name, obj_stores} obj_store_names = {} # (db_id, os_id) -> name for rec in reader.GetRecords(load_blobs=False): kt = type(rec.key).__name__ key_types[kt] += 1 db_id = rec.database_id or 0 if kt == "DatabaseNameKey": dn = getattr(rec.key, "database_name", None) if dn: db_meta.setdefault(db_id, {"name": None})["name"] = str(dn) if kt == "ObjectStoreMetaDataKey": md_type = getattr(rec.key, "metadata_type", None) os_id = getattr(rec.key, "object_store_id", None) if md_type == 0 and rec.value: obj_store_names[(db_id, os_id)] = str(rec.value) if kt == "ObjectStoreDataKey": user_key = getattr(rec.key, "encoded_user_key", None) val = rec.value blob_size = getattr(val, "blob_size", None) if val else None version = getattr(val, "version", None) if val else None key_val = getattr(user_key, "value", None) if user_key else None os_id = rec.object_store_id info = db_meta.setdefault(db_id, {"name": None}) stores = info.setdefault("obj_stores", {}) store_info = stores.setdefault(os_id, {"keys": [], "sample_key": None}) if key_val and not store_info["sample_key"]: store_info["sample_key"] = str(key_val)[:80] store_info["blob_size"] = blob_size store_info["version"] = version shutil.rmtree(tmp) # Print Databases print("\n## Databases") for db_id in sorted(db_meta.keys()): info = db_meta[db_id] name = info.get("name", "?") print(f"\n database_id={db_id}: \"{name}\"") for (did, osid), osname in sorted(obj_store_names.items()): if did == db_id: print(f" object_store_id={osid}: \"{osname}\"") store_info = info.get("obj_stores", {}).get(osid, {}) if store_info.get("sample_key"): print(f" sample_key: {store_info['sample_key']}") if store_info.get("blob_size"): print(f" blob_size: {store_info['blob_size']:,}") # Print Record Types print("\n## Record Type Counts") for kt, count in key_types.most_common(): print(f" {count:6d} {kt}") def analyze_redux_state(): """Analyze Redux State Blob Structure""" print("\n") print("=" * 70) print("REDUX STATE BLOB STRUCTURE") print("=" * 70) # Find Blob blobs = sorted(BLOB_DIR.rglob("*"), key=lambda p: p.stat().st_mtime if p.is_file() else 0) blob_files = [b for b in blobs if b.is_file()] if not blob_files: print("No blob files found!") return blob_path = blob_files[-1] size = blob_path.stat().st_size print(f"\nBlob: {blob_path.relative_to(IDB_BASE)} ({size:,} bytes)") state = V8ScriptValueDecoder.FromBytes(blob_path.read_bytes()) # Top-Level Keys print("\n## Top-Level Keys (sorted by size)") entries = [] for k in sorted(state.keys()): v = state[k] size = len(str(v)) t = type(v).__name__ child_count = len(v) if isinstance(v, dict) else None entries.append((size, k, t, child_count)) entries.sort(reverse=True) for size, k, t, child_count in entries: cc = f" ({child_count} entries)" if child_count is not None else "" print(f" {size:>12,} chars {k} ({t}){cc}") # Detailed Structure of Key Stores detail_keys = [ "messages", "channels", "members", "reactions", "files", "bots", "teams", "userGroups", "channelHistory", "allThreads", "searchResults", "prefs", "userPrefs", "membership", ] print("\n## Key Store Schemas") for store_key in detail_keys: store = state.get(store_key) if store is None: continue print(f"\n### {store_key}") print(f" type: {type(store).__name__}") if isinstance(store, dict): print(f" entry_count: {len(store)}") # Find a representative entry for entry_key, entry_val in store.items(): if isinstance(entry_val, dict) and len(entry_val) > 3: print(f" sample_key: \"{entry_key}\"") print(f" fields:") for fk, fv in entry_val.items(): ft = type(fv).__name__ fval = repr(fv)[:80] print(f" {fk}: {ft} = {fval}") break elif not isinstance(entry_val, dict): # Nested dict of dicts (e.g., messages -> channel -> ts -> msg) if isinstance(entry_val, dict): for inner_key, inner_val in entry_val.items(): if isinstance(inner_val, dict): print(f" structure: {store_key}[channel_id][timestamp] -> message") break break elif hasattr(store, "properties"): print(f" JSArray with {len(store.properties)} properties") if __name__ == "__main__": analyze_leveldb() analyze_redux_state()