Files
slack-cli/scripts/analyze_structure.py
2026-04-16 12:10:21 -04:00

197 lines
6.9 KiB
Python
Executable File

#!/usr/bin/env -S uv run --python=python3.12 --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["dfindexeddb"]
# ///
"""Analyze Slack IndexedDB Structure
Dumps the full schema of the IndexedDB: databases, object stores,
record types, and the Redux state top-level keys with sizes/types.
Used for generating documentation.
"""
import pathlib
import shutil
import tempfile
from collections import Counter
from dfindexeddb.indexeddb.chromium.blink import V8ScriptValueDecoder
from dfindexeddb.indexeddb.chromium.record import FolderReader
HOME = pathlib.Path.home()
IDB_BASE = HOME / (
"Library/Containers/com.tinyspeck.slackmacgap"
"/Data/Library/Application Support/Slack/IndexedDB"
)
LDB_DIR = IDB_BASE / "https_app.slack.com_0.indexeddb.leveldb"
BLOB_DIR = IDB_BASE / "https_app.slack.com_0.indexeddb.blob"
def analyze_leveldb():
"""Analyze LevelDB Record Structure"""
print("=" * 70)
print("LEVELDB / INDEXEDDB STRUCTURE")
print("=" * 70)
# Filesystem Layout
print("\n## Filesystem Layout")
print(f"\nLevelDB dir: {LDB_DIR}")
for p in sorted(LDB_DIR.iterdir()):
size = p.stat().st_size
print(f" {p.name:30s} {size:>12,} bytes")
print(f"\nBlob dir: {BLOB_DIR}")
for p in sorted(BLOB_DIR.rglob("*")):
if p.is_file():
size = p.stat().st_size
rel = p.relative_to(BLOB_DIR)
print(f" {str(rel):30s} {size:>12,} bytes")
# Copy DB to Avoid Lock
tmp = pathlib.Path(tempfile.mkdtemp())
shutil.copytree(str(LDB_DIR), str(tmp / "db"))
(tmp / "db" / "LOCK").unlink(missing_ok=True)
# Parse Records
reader = FolderReader(tmp / "db")
key_types = Counter()
db_meta = {} # db_id -> {name, obj_stores}
obj_store_names = {} # (db_id, os_id) -> name
for rec in reader.GetRecords(load_blobs=False):
kt = type(rec.key).__name__
key_types[kt] += 1
db_id = rec.database_id or 0
if kt == "DatabaseNameKey":
dn = getattr(rec.key, "database_name", None)
if dn:
db_meta.setdefault(db_id, {"name": None})["name"] = str(dn)
if kt == "ObjectStoreMetaDataKey":
md_type = getattr(rec.key, "metadata_type", None)
os_id = getattr(rec.key, "object_store_id", None)
if md_type == 0 and rec.value:
obj_store_names[(db_id, os_id)] = str(rec.value)
if kt == "ObjectStoreDataKey":
user_key = getattr(rec.key, "encoded_user_key", None)
val = rec.value
blob_size = getattr(val, "blob_size", None) if val else None
version = getattr(val, "version", None) if val else None
key_val = getattr(user_key, "value", None) if user_key else None
os_id = rec.object_store_id
info = db_meta.setdefault(db_id, {"name": None})
stores = info.setdefault("obj_stores", {})
store_info = stores.setdefault(os_id, {"keys": [], "sample_key": None})
if key_val and not store_info["sample_key"]:
store_info["sample_key"] = str(key_val)[:80]
store_info["blob_size"] = blob_size
store_info["version"] = version
shutil.rmtree(tmp)
# Print Databases
print("\n## Databases")
for db_id in sorted(db_meta.keys()):
info = db_meta[db_id]
name = info.get("name", "?")
print(f"\n database_id={db_id}: \"{name}\"")
for (did, osid), osname in sorted(obj_store_names.items()):
if did == db_id:
print(f" object_store_id={osid}: \"{osname}\"")
store_info = info.get("obj_stores", {}).get(osid, {})
if store_info.get("sample_key"):
print(f" sample_key: {store_info['sample_key']}")
if store_info.get("blob_size"):
print(f" blob_size: {store_info['blob_size']:,}")
# Print Record Types
print("\n## Record Type Counts")
for kt, count in key_types.most_common():
print(f" {count:6d} {kt}")
def analyze_redux_state():
"""Analyze Redux State Blob Structure"""
print("\n")
print("=" * 70)
print("REDUX STATE BLOB STRUCTURE")
print("=" * 70)
# Find Blob
blobs = sorted(BLOB_DIR.rglob("*"), key=lambda p: p.stat().st_mtime if p.is_file() else 0)
blob_files = [b for b in blobs if b.is_file()]
if not blob_files:
print("No blob files found!")
return
blob_path = blob_files[-1]
size = blob_path.stat().st_size
print(f"\nBlob: {blob_path.relative_to(IDB_BASE)} ({size:,} bytes)")
state = V8ScriptValueDecoder.FromBytes(blob_path.read_bytes())
# Top-Level Keys
print("\n## Top-Level Keys (sorted by size)")
entries = []
for k in sorted(state.keys()):
v = state[k]
size = len(str(v))
t = type(v).__name__
child_count = len(v) if isinstance(v, dict) else None
entries.append((size, k, t, child_count))
entries.sort(reverse=True)
for size, k, t, child_count in entries:
cc = f" ({child_count} entries)" if child_count is not None else ""
print(f" {size:>12,} chars {k} ({t}){cc}")
# Detailed Structure of Key Stores
detail_keys = [
"messages", "channels", "members", "reactions",
"files", "bots", "teams", "userGroups",
"channelHistory", "allThreads", "searchResults",
"prefs", "userPrefs", "membership",
]
print("\n## Key Store Schemas")
for store_key in detail_keys:
store = state.get(store_key)
if store is None:
continue
print(f"\n### {store_key}")
print(f" type: {type(store).__name__}")
if isinstance(store, dict):
print(f" entry_count: {len(store)}")
# Find a representative entry
for entry_key, entry_val in store.items():
if isinstance(entry_val, dict) and len(entry_val) > 3:
print(f" sample_key: \"{entry_key}\"")
print(f" fields:")
for fk, fv in entry_val.items():
ft = type(fv).__name__
fval = repr(fv)[:80]
print(f" {fk}: {ft} = {fval}")
break
elif not isinstance(entry_val, dict):
# Nested dict of dicts (e.g., messages -> channel -> ts -> msg)
if isinstance(entry_val, dict):
for inner_key, inner_val in entry_val.items():
if isinstance(inner_val, dict):
print(f" structure: {store_key}[channel_id][timestamp] -> message")
break
break
elif hasattr(store, "properties"):
print(f" JSArray with {len(store.properties)} properties")
if __name__ == "__main__":
analyze_leveldb()
analyze_redux_state()