"""Read Slack Messages From Local IndexedDB Parses the Chromium IndexedDB backing Slack's desktop app (Mac App Store version). Finds the Redux state blob, decodes it with dfindexeddb's Blink V8 deserializer, and prints cached messages. Usage: ./slack-cli.py # Recent messages (all channels) ./slack-cli.py -c general # Filter by channel name (glob pattern) ./slack-cli.py -c 'team-*' -c general # Multiple channel filters ./slack-cli.py -x 'alerts-*' -x 'bot-*' # Exclude channels ./slack-cli.py -n 50 # Show last 50 messages ./slack-cli.py -u # Show unread messages only ./slack-cli.py -u -c general # Unread messages in a specific channel ./slack-cli.py -s 2h # Messages from the last 2 hours ./slack-cli.py -s 2026-04-15 # Messages since a specific date ./slack-cli.py --channels # List channels with message counts ./slack-cli.py --dump # Dump full Redux state to file """ import argparse import fnmatch import json import os import re import sys from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path from dfindexeddb.indexeddb.chromium.blink import V8ScriptValueDecoder # ─── Constants ─────────────────────────────────────────────────────────────── SLACK_IDB_BASE = Path.home() / ( "Library/Containers/com.tinyspeck.slackmacgap" "/Data/Library/Application Support/Slack/IndexedDB" ) BLOB_DIR = SLACK_IDB_BASE / "https_app.slack.com_0.indexeddb.blob" # ─── Helpers ───────────────────────────────────────────────────────────────── def find_latest_blob() -> Path | None: """Find the Latest Blob File in the IndexedDB Blob Directory Slack stores a single large blob containing the entire Redux state. The blob number increments on every persist, so the latest file is what we want. """ blob_files = [b for b in BLOB_DIR.rglob("*") if b.is_file()] if not blob_files: return None blob_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) return blob_files[0] def decode_blob(blob_path: Path) -> dict: """Decode a Blink IDB Value Wrapper Blob""" raw = blob_path.read_bytes() return V8ScriptValueDecoder.FromBytes(raw) def ts_to_datetime(ts: str) -> datetime: """Convert Slack Timestamp to Datetime""" try: return datetime.fromtimestamp(float(ts)) except (ValueError, TypeError, OSError): return datetime.min def resolve_user(state: dict, user_id: str) -> str: """Resolve a Slack User ID to Display Name""" if not isinstance(user_id, str): return str(user_id) members = state.get("members", {}) if not isinstance(members, dict): return user_id member = members.get(user_id) if not isinstance(member, dict): return user_id # Slack Redux State Stores Name Fields at Top Level name = member.get("display_name") or member.get("real_name") or member.get("name") if name: return name # Also Check Nested Profile profile = member.get("profile", {}) if isinstance(profile, dict): name = profile.get("display_name") or profile.get("real_name") if name: return name return user_id def parse_since(value: str) -> datetime: """Parse a --since Value Into a Datetime Supports relative durations (e.g. 30m, 2h, 3d) and absolute dates/datetimes (e.g. 2026-04-15, '2026-04-15 10:00'). """ # Relative Duration: m = re.fullmatch(r"(\d+)([mhd])", value.strip()) if m: amount = int(m.group(1)) unit = m.group(2) delta = {"m": timedelta(minutes=amount), "h": timedelta( hours=amount), "d": timedelta(days=amount)}[unit] return datetime.now() - delta # Absolute Datetime for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"): try: return datetime.strptime(value.strip(), fmt) except ValueError: continue raise argparse.ArgumentTypeError( f"invalid --since value: {value!r} " f"(expected e.g. 30m, 2h, 3d, 2026-04-15, '2026-04-15 10:00')" ) def build_channel_names(state: dict) -> dict[str, str]: """Build Channel ID -> Name Lookup""" channels_store = state.get("channels", {}) names = {} if isinstance(channels_store, dict): for cid, cdata in channels_store.items(): if isinstance(cdata, dict): names[cid] = cdata.get("name", cdata.get("name_normalized", cid)) return names def get_workspace_domain(state: dict) -> str | None: """Get the Primary Workspace Domain From Teams Store""" teams = state.get("teams", {}) if not isinstance(teams, dict): return None for _tid, team in teams.items(): if isinstance(team, dict) and team.get("url"): return team.get("domain") # Fallback to First Team With a Domain for _tid, team in teams.items(): if isinstance(team, dict) and team.get("domain"): return team.get("domain") return None def slack_url(domain: str, channel_id: str, ts: str, thread_ts: str | None = None) -> str: """Build a Slack Deep Link URL Message timestamps become URL path segments by removing the dot: 1776219948.820859 -> p1776219948820859 Thread replies additionally include ?thread_ts=...&cid=... """ ts_url = "p" + ts.replace(".", "") url = f"https://{domain}.slack.com/archives/{channel_id}/{ts_url}" if thread_ts and thread_ts != ts: url += f"?thread_ts={thread_ts}&cid={channel_id}" return url # ─── Commands ──────────────────────────────────────────────────────────────── def build_read_cursors(state: dict) -> dict[str, float]: """Build Channel ID -> Read Cursor Timestamp Lookup The channelCursors store maps channel IDs to the timestamp of the last-read message. Messages with ts > cursor are unread. """ cursors = state.get("channelCursors", {}) result = {} if isinstance(cursors, dict): for cid, ts in cursors.items(): try: result[cid] = float(ts) except (ValueError, TypeError): continue return result def channel_matches(name: str, patterns: list[str]) -> bool: """Check if a Channel Name Matches Any of the Glob Patterns""" name_lower = name.lower() return any(fnmatch.fnmatch(name_lower, p.lower()) for p in patterns) def cmd_messages( state: dict, include_channels: list[str] | None, exclude_channels: list[str] | None, count: int, unread_only: bool = False, since: datetime | None = None, ): """Print Messages From Slack State""" messages_store = state.get("messages", {}) channel_names = build_channel_names(state) read_cursors = build_read_cursors(state) if unread_only else {} domain = get_workspace_domain(state) # Collect All Messages all_msgs = [] for cid, channel_msgs in messages_store.items(): if not isinstance(channel_msgs, dict): continue ch_name = channel_names.get(cid, cid) # Apply Channel Include / Exclude Filters if include_channels and not channel_matches(ch_name, include_channels): continue if exclude_channels and channel_matches(ch_name, exclude_channels): continue # Determine Read Cursor for Unread Filtering cursor = read_cursors.get(cid, 0.0) if unread_only else 0.0 # Convert --since Datetime to Unix Timestamp for Comparison since_ts = since.timestamp() if since else 0.0 for ts, msg in channel_msgs.items(): if not isinstance(msg, dict): continue # Skip Messages Before Cutoff (Unread Cursor or --since) try: ts_f = float(ts) except (ValueError, TypeError): continue if unread_only and ts_f <= cursor: continue if since and ts_f < since_ts: continue text = msg.get("text", "") if not text or not isinstance(text, str): continue user_id = msg.get("user", msg.get("bot_id", "?")) user_name = resolve_user(state, user_id) subtype = msg.get("subtype", "") # dfindexeddb Represents JS undefined as an Undefined object if not isinstance(subtype, str): subtype = "" # Resolve Thread Timestamp thread_ts = msg.get("thread_ts", "") if not isinstance(thread_ts, str): thread_ts = "" all_msgs.append({ "channel": ch_name, "channel_id": cid, "ts": ts, "thread_ts": thread_ts or None, "dt": ts_to_datetime(ts), "user": user_name, "user_id": user_id, "text": text, "subtype": subtype, }) # Sort by Timestamp (Most Recent Last) all_msgs.sort(key=lambda m: m["dt"]) # Group Thread Replies Under Their Parents threads: dict[str, list[dict]] = defaultdict(list) top_level: list[dict] = [] for msg in all_msgs: thread_ts = msg["thread_ts"] if thread_ts and thread_ts != msg["ts"]: threads[thread_ts].append(msg) else: top_level.append(msg) # Build Display List — Each Top-Level Entry Followed by Its Replies # item = (msg | None, depth) — None msg means orphan thread header # depth 0 = root (top-level message or orphan thread header) # depth 1 = thread reply display: list[tuple[dict | None, int]] = [] seen_parents: set[str] = {m["ts"] for m in top_level} for msg in top_level: display.append((msg, 0)) for reply in threads.get(msg["ts"], []): display.append((reply, 1)) # Collect Orphan Thread Groups — Replies Whose Parent Isn't Shown orphan_groups: list[tuple[str, list[dict]]] = [] for thread_ts, replies in threads.items(): if thread_ts not in seen_parents: orphan_groups.append((thread_ts, replies)) # Sort Orphan Groups by Earliest Reply Timestamp orphan_groups.sort(key=lambda g: g[1][0]["dt"]) # Append Each Orphan Group With a Header Placeholder for thread_ts, replies in orphan_groups: # Use First Reply's Channel and ID for the Header header = { "channel": replies[0]["channel"], "channel_id": replies[0]["channel_id"], "thread_ts": thread_ts, } display.append((None, 0)) # Placeholder for header for reply in replies: display.append((reply, 1)) # Patch the Placeholder With Header Info display[-len(replies) - 1] = (header, 0) # Print Last N Messages (Count depth=0 Entries Only) if len(display) > 0: # Walk Backwards to Find the Cutoff That Includes `count` Roots roots_seen = 0 start_idx = len(display) for i in range(len(display) - 1, -1, -1): if display[i][1] == 0: roots_seen += 1 if roots_seen > count: break start_idx = i visible = display[start_idx:] else: visible = [] ts_color = "\033[90m" ch_color = "\033[36m" user_color = "\033[33m" reset = "\033[0m" bar_str = f"\033[90m│\033[0m" for idx, (msg, depth) in enumerate(visible): # Peek Ahead to See if Next Item Is Still a Thread Reply next_is_reply = (idx + 1 < len(visible) and visible[idx + 1][1] > 0) # Orphan Thread Header if msg is not None and "dt" not in msg: header_line = f"\033[90m↳ thread in {ch_color}#{msg['channel']}{reset}" if domain and msg.get("channel_id") and msg.get("thread_ts"): link = slack_url(domain, msg["channel_id"], msg["thread_ts"]) header_line += f" \033[90m{link}{reset}" print(header_line) continue dt_str = msg["dt"].strftime("%Y-%m-%d %H:%M:%S") subtype_tag = f" [{msg['subtype']}]" if msg["subtype"] else "" bar = f"{bar_str} " if depth > 0 else "" # Build Slack Link for Top-Level Messages link_suffix = "" if domain and depth == 0: link = slack_url(domain, msg["channel_id"], msg["ts"]) link_suffix = f" {ts_color}{link}{reset}" print( f"{bar}{ts_color}{dt_str}{reset} " f"{ch_color}#{msg['channel']}{reset} " f"{user_color}{msg['user']}{reset}{subtype_tag}{link_suffix}" ) # Indent Message Text (Prefix Every Line for Multi-Line Messages) text_prefix = f"{bar_str} " if depth > 0 else " " for line in msg["text"][:500].split("\n"): print(f"{text_prefix}{line}") # Connecting Bar Between Thread Messages if next_is_reply: print(bar_str) else: print() root_count = sum(1 for _, d in visible if d == 0) total_roots = sum(1 for _, d in display if d == 0) label = "unread messages" if unread_only else "messages" if since: label += f" since {since.strftime('%Y-%m-%d %H:%M')}" print(f"--- Showing {root_count} of {total_roots} {label} ---") def cmd_channels(state: dict): """List Channels With Message Counts""" messages_store = state.get("messages", {}) channel_names = build_channel_names(state) counts = {} for cid, channel_msgs in messages_store.items(): if not isinstance(channel_msgs, dict): continue ch_name = channel_names.get(cid, cid) msg_count = sum( 1 for v in channel_msgs.values() if isinstance(v, dict) and v.get("text") ) if msg_count > 0: counts[ch_name] = msg_count for name, c in sorted(counts.items(), key=lambda x: -x[1]): print(f" {c:5d} #{name}") print(f"\n--- {len(counts)} channels with cached messages ---") def cmd_dump(state: dict, output: str): """Dump Full Redux State to File""" with open(output, "w") as f: json.dump(state, f, indent=2, default=str, ensure_ascii=False) size_mb = os.path.getsize(output) / 1024 / 1024 print(f"Dumped {size_mb:.1f}MB to {output}") # ─── Main ──────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Read Slack messages from local IndexedDB" ) parser.add_argument( "-c", "--channel", action="append", dest="channels", help="Include channels matching glob pattern (repeatable, e.g. -c 'team-*' -c general)" ) parser.add_argument( "-x", "--exclude", action="append", dest="exclude_channels", help="Exclude channels matching glob pattern (repeatable, e.g. -x 'alerts-*' -x 'bot-*')" ) parser.add_argument( "-n", "--count", type=int, default=30, help="Number of messages to show (default: 30)" ) parser.add_argument( "-u", "--unread", action="store_true", help="Show only unread messages (based on read cursor position)" ) parser.add_argument( "-s", "--since", type=parse_since, help="Show messages since time (e.g. 30m, 2h, 3d, 2026-04-15, '2026-04-15 10:00')" ) parser.add_argument( "--channels", action="store_true", dest="list_channels", help="List channels with message counts" ) parser.add_argument( "--dump", nargs="?", const="slack_state.json", help="Dump full state to JSON file" ) args = parser.parse_args() # Find and Decode the Blob blob_path = find_latest_blob() if not blob_path: print( "No blob files found. Is Slack installed and has it been opened?", file=sys.stderr, ) sys.exit(1) size_mb = blob_path.stat().st_size / 1024 / 1024 print(f"Reading blob: {blob_path} ({size_mb:.1f}MB)", file=sys.stderr) state = decode_blob(blob_path) # Dispatch Command if args.dump: cmd_dump(state, args.dump) elif args.list_channels: cmd_channels(state) else: cmd_messages(state, args.channels, args.exclude_channels, args.count, args.unread, args.since) if __name__ == "__main__": main()