Files
slack-cli/slack_cli/__main__.py

621 lines
22 KiB
Python

"""Read Slack Messages From Local IndexedDB
Parses the Chromium IndexedDB backing Slack's desktop app (Mac App Store
version). Finds the Redux state blob, decodes it with dfindexeddb's Blink
V8 deserializer, and prints cached messages.
Usage:
./slack-cli.py # Recent messages (all channels)
./slack-cli.py -c general # Filter by channel name (glob pattern)
./slack-cli.py -c 'team-*' -c general # Multiple channel filters
./slack-cli.py -x 'alerts-*' -x 'bot-*' # Exclude channels
./slack-cli.py -n 50 # Show last 50 messages
./slack-cli.py -u # Show unread messages only
./slack-cli.py -u -c general # Unread messages in a specific channel
./slack-cli.py -s 2h # Messages from the last 2 hours
./slack-cli.py -s 2026-04-15 # Messages since a specific date
./slack-cli.py --channels # List channels with message counts
./slack-cli.py --dump # Dump full Redux state to file
"""
import argparse
import fnmatch
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from dfindexeddb.indexeddb.chromium.blink import V8ScriptValueDecoder
# ─── Constants ───────────────────────────────────────────────────────────────
SLACK_IDB_BASE = Path.home() / (
"Library/Containers/com.tinyspeck.slackmacgap"
"/Data/Library/Application Support/Slack/IndexedDB"
)
BLOB_DIR = SLACK_IDB_BASE / "https_app.slack.com_0.indexeddb.blob"
# ─── Helpers ─────────────────────────────────────────────────────────────────
def find_latest_blob() -> Path | None:
"""Find the Latest Blob File in the IndexedDB Blob Directory
Slack stores a single large blob containing the entire Redux state.
The blob number increments on every persist, so the latest file is
what we want.
"""
blob_files = [b for b in BLOB_DIR.rglob("*") if b.is_file()]
if not blob_files:
return None
blob_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return blob_files[0]
def decode_blob(blob_path: Path) -> dict:
"""Decode a Blink IDB Value Wrapper Blob"""
raw = blob_path.read_bytes()
return V8ScriptValueDecoder.FromBytes(raw)
def ts_to_datetime(ts: str) -> datetime:
"""Convert Slack Timestamp to Datetime"""
try:
return datetime.fromtimestamp(float(ts))
except (ValueError, TypeError, OSError):
return datetime.min
def resolve_user(state: dict, user_id: str) -> str:
"""Resolve a Slack User ID to Display Name"""
if not isinstance(user_id, str):
return str(user_id)
members = state.get("members", {})
if not isinstance(members, dict):
return user_id
member = members.get(user_id)
if not isinstance(member, dict):
return user_id
# Slack Handle (e.g. "evan") > Display Name > Real Name > fallback to ID
name = member.get("name") # Slack handle / username
if name and name != user_id:
return name
name = member.get("display_name") or member.get("real_name")
if name:
return name
# Also Check Nested Profile
profile = member.get("profile", {})
if isinstance(profile, dict):
name = profile.get("display_name") or profile.get("real_name")
if name:
return name
return user_id
def resolve_mentions(state: dict, text: str) -> str:
"""Replace Slack User Mentions (<@USERID>) With @DisplayNames"""
if not isinstance(text, str):
return text
members = state.get("members", {})
if not isinstance(members, dict):
return text
def _replace(match):
user_id = match.group(1)
resolved = resolve_user(state, user_id)
return f"@{resolved}"
return re.sub(r"<@([A-Z0-9]+)>", _replace, text)
def parse_since(value: str) -> datetime:
"""Parse a --since Value Into a Datetime
Supports relative durations (e.g. 30m, 2h, 3d) and absolute
dates/datetimes (e.g. 2026-04-15, '2026-04-15 10:00').
"""
# Relative Duration: <number><unit>
m = re.fullmatch(r"(\d+)([mhd])", value.strip())
if m:
amount = int(m.group(1))
unit = m.group(2)
delta = {"m": timedelta(minutes=amount), "h": timedelta(
hours=amount), "d": timedelta(days=amount)}[unit]
return datetime.now() - delta
# Absolute Datetime
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
return datetime.strptime(value.strip(), fmt)
except ValueError:
continue
raise argparse.ArgumentTypeError(
f"invalid --since value: {value!r} "
f"(expected e.g. 30m, 2h, 3d, 2026-04-15, '2026-04-15 10:00')"
)
def build_channel_names(state: dict) -> dict[str, str]:
"""Build Channel ID -> Name Lookup"""
channels_store = state.get("channels", {})
names = {}
if isinstance(channels_store, dict):
for cid, cdata in channels_store.items():
if isinstance(cdata, dict):
names[cid] = cdata.get("name", cdata.get("name_normalized", cid))
return names
def get_workspace_domain(state: dict) -> str | None:
"""Get the Primary Workspace Domain From Teams Store"""
teams = state.get("teams", {})
if not isinstance(teams, dict):
return None
for _tid, team in teams.items():
if isinstance(team, dict) and team.get("url"):
return team.get("domain")
# Fallback to First Team With a Domain
for _tid, team in teams.items():
if isinstance(team, dict) and team.get("domain"):
return team.get("domain")
return None
def slack_url(domain: str, channel_id: str, ts: str, thread_ts: str | None = None) -> str:
"""Build a Slack Deep Link URL
Message timestamps become URL path segments by removing the dot:
1776219948.820859 -> p1776219948820859
Thread replies additionally include ?thread_ts=...&cid=...
"""
ts_url = "p" + ts.replace(".", "")
url = f"https://{domain}.slack.com/archives/{channel_id}/{ts_url}"
if thread_ts and thread_ts != ts:
url += f"?thread_ts={thread_ts}&cid={channel_id}"
return url
# ─── Commands ────────────────────────────────────────────────────────────────
def build_read_cursors(state: dict) -> dict[str, float]:
"""Build Channel ID -> Read Cursor Timestamp Lookup
The channelCursors store maps channel IDs to the timestamp of the
last-read message. Messages with ts > cursor are unread.
"""
cursors = state.get("channelCursors", {})
result = {}
if isinstance(cursors, dict):
for cid, ts in cursors.items():
try:
result[cid] = float(ts)
except (ValueError, TypeError):
continue
return result
def channel_matches(name: str, patterns: list[str]) -> bool:
"""Check if a Channel Name Matches Any of the Glob Patterns"""
name_lower = name.lower()
return any(fnmatch.fnmatch(name_lower, p.lower()) for p in patterns)
def cmd_messages(
state: dict,
include_channels: list[str] | None,
exclude_channels: list[str] | None,
count: int,
unread_only: bool = False,
since: datetime | None = None,
):
"""Print Messages From Slack State"""
messages_store = state.get("messages", {})
channel_names = build_channel_names(state)
read_cursors = build_read_cursors(state) if unread_only else {}
domain = get_workspace_domain(state)
# Collect All Messages
all_msgs = []
for cid, channel_msgs in messages_store.items():
if not isinstance(channel_msgs, dict):
continue
ch_name = channel_names.get(cid, cid)
# Apply Channel Include / Exclude Filters
if include_channels and not channel_matches(ch_name, include_channels):
continue
if exclude_channels and channel_matches(ch_name, exclude_channels):
continue
# Determine Read Cursor for Unread Filtering
cursor = read_cursors.get(cid, 0.0) if unread_only else 0.0
# Convert --since Datetime to Unix Timestamp for Comparison
since_ts = since.timestamp() if since else 0.0
for ts, msg in channel_msgs.items():
if not isinstance(msg, dict):
continue
# Skip Messages Before Cutoff (Unread Cursor or --since)
try:
ts_f = float(ts)
except (ValueError, TypeError):
continue
if unread_only and ts_f <= cursor:
continue
if since and ts_f < since_ts:
continue
text = msg.get("text", "")
if not text or not isinstance(text, str):
continue
user_id = msg.get("user", msg.get("bot_id", "?"))
user_name = resolve_user(state, user_id)
text = resolve_mentions(state, text)
subtype = msg.get("subtype", "")
# dfindexeddb Represents JS undefined as an Undefined object
if not isinstance(subtype, str):
subtype = ""
# Resolve Thread Timestamp
thread_ts = msg.get("thread_ts", "")
if not isinstance(thread_ts, str):
thread_ts = ""
all_msgs.append({
"channel": ch_name,
"channel_id": cid,
"ts": ts,
"thread_ts": thread_ts or None,
"dt": ts_to_datetime(ts),
"user": user_name,
"user_id": user_id,
"text": text,
"subtype": subtype,
})
# Sort by Timestamp (Most Recent Last)
all_msgs.sort(key=lambda m: m["dt"])
# Group Thread Replies Under Their Parents
threads: dict[str, list[dict]] = defaultdict(list)
top_level: list[dict] = []
for msg in all_msgs:
thread_ts = msg["thread_ts"]
if thread_ts and thread_ts != msg["ts"]:
threads[thread_ts].append(msg)
else:
top_level.append(msg)
# Build Display List — Each Top-Level Entry Followed by Its Replies
# item = (msg, depth)
# depth 0 = root (top-level message OR thread parent)
# depth 1 = thread reply (or ellipsis row)
# Every root carries `has_replies` so the renderer can pick glyphs.
display: list[tuple[dict, int]] = []
seen_parents: set[str] = {m["ts"] for m in top_level}
for msg in top_level:
replies = threads.get(msg["ts"], [])
msg["has_replies"] = bool(replies)
if replies:
msg["is_thread_parent"] = True
display.append((msg, 0))
for reply in replies:
display.append((reply, 1))
# Collect Orphan Thread Groups — Replies Whose Parent Isn't a Top-Level Match
orphan_groups: list[tuple[str, list[dict]]] = []
for thread_ts, replies in threads.items():
if thread_ts not in seen_parents:
orphan_groups.append((thread_ts, replies))
orphan_groups.sort(key=lambda g: g[1][0]["dt"])
# Append Each Orphan Group — Resolve Parent From Cache When Possible.
# If parent isn't cached, emit a synthetic header with is_orphan=True so
# the renderer shows a consistent row + "parent not cached" body.
for thread_ts, replies in orphan_groups:
cid = replies[0]["channel_id"]
ch_name = replies[0]["channel"]
channel_msgs_raw = messages_store.get(cid)
parent_raw = (
channel_msgs_raw.get(thread_ts)
if isinstance(channel_msgs_raw, dict) else None
)
if isinstance(parent_raw, dict) and isinstance(parent_raw.get("text"), str):
user_id = parent_raw.get("user", parent_raw.get("bot_id", "?"))
subtype = parent_raw.get("subtype", "")
if not isinstance(subtype, str):
subtype = ""
parent_msg = {
"channel": ch_name,
"channel_id": cid,
"ts": thread_ts,
"thread_ts": thread_ts,
"dt": ts_to_datetime(thread_ts),
"user": resolve_user(state, user_id),
"user_id": user_id,
"text": resolve_mentions(state, parent_raw.get("text", "")),
"subtype": subtype,
"is_thread_parent": True,
"has_replies": True,
}
reply_count = parent_raw.get("reply_count")
shown = len(replies)
if isinstance(reply_count, int):
# Known: show ellipsis only if replies were actually skipped
skipped = reply_count - shown if reply_count > shown else 0
else:
# Unknown reply_count — signal ambiguous gap
skipped = None
else:
# Orphan — Parent Not in Local Cache
parent_msg = {
"channel": ch_name,
"channel_id": cid,
"ts": thread_ts,
"thread_ts": thread_ts,
"dt": ts_to_datetime(thread_ts),
"user": "???",
"user_id": "",
"text": "",
"subtype": "",
"is_thread_parent": True,
"is_orphan": True,
"has_replies": True,
}
# No reply_count available for orphans → generic ellipsis
skipped = None
display.append((parent_msg, 0))
# Suppress Ellipsis When We Know No Replies Were Skipped
if skipped != 0:
display.append(({"ellipsis": True, "skipped": skipped}, 1))
for reply in replies:
display.append((reply, 1))
# Print Last N Messages (Count depth=0 Entries Only)
if len(display) > 0:
# Walk Backwards to Find the Cutoff That Includes `count` Roots
roots_seen = 0
start_idx = len(display)
for i in range(len(display) - 1, -1, -1):
if display[i][1] == 0:
roots_seen += 1
if roots_seen > count:
break
start_idx = i
visible = display[start_idx:]
else:
visible = []
# ANSI Palette
dim = "\033[90m"
ts_color = "\033[90m"
ch_color = "\033[36m"
user_color = "\033[33m"
bullet_top = "\033[36m●\033[0m" # Top-level message (cyan dot)
bullet_thread = "\033[33m◆\033[0m" # Thread parent (yellow diamond)
reset = "\033[0m"
bar = f"{dim}{reset}" # Thread continuation gutter
branch_mid = f"{dim}{reset}" # Ellipsis branch
branch_end = f"{dim}{reset}" # Final reply branch
# Horizontal Rule Between Roots — Sized to Terminal Width (Clamped)
try:
term_width = os.get_terminal_size().columns
except OSError:
term_width = 78
rule_width = max(40, min(term_width, 100))
rule = f"{dim}{'' * rule_width}{reset}"
def wrap_text(text: str, prefix: str) -> None:
"""Print Message Body, Prefixing Every Line"""
for line in text.split("\n"):
print(f"{prefix}{line}")
prev_depth = None
for idx, (msg, depth) in enumerate(visible):
# Peek Ahead — Needed to Decide Between Mid vs End Reply Branch
next_is_reply = (
idx + 1 < len(visible) and visible[idx + 1][1] > 0
)
# Horizontal Rule Before Every New Root
if depth == 0:
print(rule)
# Ellipsis Row — Between Thread Parent and Shown Replies
if msg.get("ellipsis"):
skipped = msg.get("skipped")
if skipped:
inner = f"[… {skipped} earlier repl{'y' if skipped == 1 else 'ies'}]"
else:
inner = "[… older replies]"
print(f"{branch_mid} {dim}{inner}{reset}")
# Continuation Bar Below Ellipsis if More Replies Follow
if next_is_reply:
print(bar)
continue
dt_str = msg["dt"].strftime("%Y-%m-%d %H:%M:%S")
subtype_tag = f" [{msg['subtype']}]" if msg.get("subtype") else ""
if depth == 0:
# Root Row — Choose Bullet Based on Thread State
is_thread = msg.get("is_thread_parent", False)
bullet = bullet_thread if is_thread else bullet_top
link_suffix = ""
if domain and msg.get("channel_id") and msg.get("ts"):
link = slack_url(domain, msg["channel_id"], msg["ts"])
link_suffix = f" {ts_color}{link}{reset}"
print(
f"{bullet} {ts_color}{dt_str}{reset} "
f"{ch_color}#{msg['channel']}{reset} "
f"{user_color}{msg['user']}{reset}{subtype_tag}{link_suffix}"
)
# Orphan Subtitle (Kept Below Header for Layout Consistency)
if msg.get("is_orphan"):
print(f"{bar} {dim}(orphan — parent not cached){reset}")
# Body: Either the Orphan Placeholder Box or the Real Text
if msg.get("is_orphan"):
# ⚠ Renders as Double-Width in Most Terminals, So the
# top/bottom rules need to be 2 chars shorter than the
# printable width of the middle row.
box_top = f"{dim}{'' * 39}{reset}"
box_mid = f"{dim}{reset} \033[33m⚠ ORPHANED THREAD — PARENT UNKNOWN\033[0m {dim}{reset}"
box_bot = f"{dim}{'' * 39}{reset}"
print(f"{bar}")
print(f"{bar} {box_top}")
print(f"{bar} {box_mid}")
print(f"{bar} {box_bot}")
else:
# Top-Level and Resolved-Parent Bodies Both Indent 2 Spaces;
# thread parents use a gutter to signal replies follow.
body_prefix = f"{bar} " if msg.get("has_replies") else " "
wrap_text(msg["text"], body_prefix)
# Trailing Gutter Only When Replies Follow
if next_is_reply:
print(bar)
else:
print()
else:
# Reply Row — Use └ (Final) or ├ (Mid) Depending on Followups
branch = branch_mid if next_is_reply else branch_end
print(
f"{branch} {ts_color}{dt_str}{reset} "
f"{user_color}{msg['user']}{reset}{subtype_tag}"
)
# Mid Replies Keep the │ Gutter; Final Reply Indents Flat
text_prefix = f"{bar} " if next_is_reply else " "
wrap_text(msg["text"], text_prefix)
if next_is_reply:
print(bar)
else:
print()
prev_depth = depth
root_count = sum(1 for _, d in visible if d == 0)
total_roots = sum(1 for _, d in display if d == 0)
label = "unread messages" if unread_only else "messages"
if since:
label += f" since {since.strftime('%Y-%m-%d %H:%M')}"
print(f"--- Showing {root_count} of {total_roots} {label} ---")
def cmd_channels(state: dict):
"""List Channels With Message Counts"""
messages_store = state.get("messages", {})
channel_names = build_channel_names(state)
counts = {}
for cid, channel_msgs in messages_store.items():
if not isinstance(channel_msgs, dict):
continue
ch_name = channel_names.get(cid, cid)
msg_count = sum(
1 for v in channel_msgs.values()
if isinstance(v, dict) and v.get("text")
)
if msg_count > 0:
counts[ch_name] = msg_count
for name, c in sorted(counts.items(), key=lambda x: -x[1]):
print(f" {c:5d} #{name}")
print(f"\n--- {len(counts)} channels with cached messages ---")
def cmd_dump(state: dict, output: str):
"""Dump Full Redux State to File"""
with open(output, "w") as f:
json.dump(state, f, indent=2, default=str, ensure_ascii=False)
size_mb = os.path.getsize(output) / 1024 / 1024
print(f"Dumped {size_mb:.1f}MB to {output}")
# ─── Main ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Read Slack messages from local IndexedDB"
)
parser.add_argument(
"-c", "--channel", action="append", dest="channels",
help="Include channels matching glob pattern (repeatable, e.g. -c 'team-*' -c general)"
)
parser.add_argument(
"-x", "--exclude", action="append", dest="exclude_channels",
help="Exclude channels matching glob pattern (repeatable, e.g. -x 'alerts-*' -x 'bot-*')"
)
parser.add_argument(
"-n", "--count", type=int, default=30,
help="Number of messages to show (default: 30)"
)
parser.add_argument(
"-u", "--unread", action="store_true",
help="Show only unread messages (based on read cursor position)"
)
parser.add_argument(
"-s", "--since", type=parse_since,
help="Show messages since time (e.g. 30m, 2h, 3d, 2026-04-15, '2026-04-15 10:00')"
)
parser.add_argument(
"--channels", action="store_true", dest="list_channels",
help="List channels with message counts"
)
parser.add_argument(
"--dump", nargs="?", const="slack_state.json",
help="Dump full state to JSON file"
)
args = parser.parse_args()
# Find and Decode the Blob
blob_path = find_latest_blob()
if not blob_path:
print(
"No blob files found. Is Slack installed and has it been opened?",
file=sys.stderr,
)
sys.exit(1)
size_mb = blob_path.stat().st_size / 1024 / 1024
print(f"Reading blob: {blob_path} ({size_mb:.1f}MB)", file=sys.stderr)
state = decode_blob(blob_path)
# Dispatch Command
if args.dump:
cmd_dump(state, args.dump)
elif args.list_channels:
cmd_channels(state)
else:
cmd_messages(state, args.channels, args.exclude_channels,
args.count, args.unread, args.since)
if __name__ == "__main__":
main()