Files
pi-lsp/src/daemon.ts

413 lines
14 KiB
TypeScript

// Daemon Server - Owns long-lived LspClient instances keyed by
// (server.id, rootDir). Accepts NDJSON requests over a Unix socket and
// dispatches them to the appropriate client, lazily spawning servers and
// reaping idle ones via ServerConfig.idleTtlMs.
import * as fs from "node:fs";
import * as net from "node:net";
import * as path from "node:path";
import { LspClient } from "./client.ts";
import { findRoot, findServerById, pathToUri } from "./root.ts";
import type { ServerConfig } from "./types.ts";
import { WorkspaceWatcher, type FileEvent } from "./watcher.ts";
import {
logPath,
socketPath,
tryConnect,
type DaemonRequest,
type DaemonResponse,
type LaunchContext,
} from "./daemonProtocol.ts";
const DEFAULT_IDLE_TTL_MS = 5 * 60 * 1000;
const WATCHER_READY_TIMEOUT_MS = 5000;
// Client Entry - One LspClient per (server.id, rootDir), plus the bookkeeping
// needed to keep files in sync and evict on idleness.
interface ClientEntry {
key: string;
server: ServerConfig;
rootDir: string;
client: LspClient;
// ready: gates concurrent requests during startup so we only initialize once.
ready: Promise<void>;
// opened: URI -> last-synced mtimeMs. Used to decide didOpen vs didChange vs nothing.
opened: Map<string, number>;
// serializer: per-entry mutex so file-sync (didOpen/didChange) can't race
// with itself when two requests for the same file land concurrently.
serializer: Promise<unknown>;
idleTimer: NodeJS.Timeout | null;
ttlMs: number;
lastUsed: number;
watcher: WorkspaceWatcher | null;
unsubscribeWatchers: (() => void) | null;
}
const entries = new Map<string, ClientEntry>();
// Log - Single helper so we can prefix and easily silence in tests.
function log(...args: unknown[]) {
process.stdout.write(
`[${new Date().toISOString()}] ` +
args.map((a) => (typeof a === "string" ? a : JSON.stringify(a))).join(" ") +
"\n",
);
}
// Get Or Create Entry - Looks up the cached client for a server+file,
// spawning a fresh LspClient if needed. The returned entry is guaranteed
// to have its `ready` promise resolved before the caller uses it. The
// server registry is resolved against any `.pi-lsp.json` reachable from
// `filePath`, so per-repo config overrides take effect at spawn time.
async function getOrCreateEntry(
filePath: string,
serverId: string,
launch: LaunchContext,
): Promise<ClientEntry> {
const server = findServerById(filePath, serverId);
const rootDir = findRoot(filePath, server.rootMarkers);
const key = `${server.id}::${rootDir}`;
const existing = entries.get(key);
if (existing) {
await existing.ready;
return existing;
}
// Cold Start - Build the entry synchronously so concurrent callers all
// await the same `ready` promise instead of racing to spawn duplicates.
const client = new LspClient(server);
const ttlMs = server.idleTtlMs ?? DEFAULT_IDLE_TTL_MS;
const entry: ClientEntry = {
key,
server,
rootDir,
client,
ready: (async () => {
log(`spawn`, server.id, rootDir);
await client.start(rootDir, launch.env);
await client.waitForReady();
log(`ready`, server.id);
})(),
opened: new Map(),
serializer: Promise.resolve(),
idleTimer: null,
ttlMs,
lastUsed: Date.now(),
watcher: null,
unsubscribeWatchers: null,
};
entries.set(key, entry);
try {
await entry.ready;
} catch (err) {
entries.delete(key);
throw err;
}
await attachWatcher(entry);
bumpIdle(entry);
return entry;
}
// Attach Watcher - Registration can happen during initialize, before the daemon subscribes.
async function attachWatcher(entry: ClientEntry): Promise<void> {
if (process.env.PI_LSP_DISABLE_WATCHERS) return;
const sync = async () => {
const patterns = entry.client.getFileWatchers();
if (patterns.length === 0 && !entry.watcher) return;
if (!entry.watcher) {
entry.watcher = new WorkspaceWatcher(entry.rootDir, (events) =>
forwardEvents(entry, events),
);
log(`watcher`, entry.server.id, entry.rootDir, `patterns=${patterns.length}`);
}
if (process.env.LSP_DEBUG) {
log(`watcher patterns`, entry.server.id, JSON.stringify(patterns));
}
entry.watcher.setPatterns(patterns);
if (patterns.length > 0) await waitForWatcherReady(entry);
};
entry.unsubscribeWatchers = entry.client.onWatchersChanged(() => void sync());
await sync();
}
async function waitForWatcherReady(entry: ClientEntry): Promise<void> {
if (!entry.watcher) return;
let timeout: NodeJS.Timeout | null = null;
let timedOut = false;
try {
await Promise.race([
entry.watcher.ready(),
new Promise<void>((resolve) => {
timeout = setTimeout(() => {
timedOut = true;
resolve();
}, WATCHER_READY_TIMEOUT_MS);
}),
]);
} finally {
if (timeout) clearTimeout(timeout);
}
if (timedOut) {
log(`watcher ready timeout`, entry.server.id, entry.rootDir);
}
}
function forwardEvents(entry: ClientEntry, events: FileEvent[]): void {
try {
if (process.env.LSP_DEBUG) {
log(`watcher fire`, entry.server.id, JSON.stringify(events));
}
entry.client.sendNotification("workspace/didChangeWatchedFiles", {
changes: events,
});
} catch (err) {
log(`watcher send failed`, entry.server.id, (err as Error).message);
}
}
// Bump Idle - Resets the idle eviction timer. Called on every request that
// touches the entry. We log evictions so the daemon's behavior is visible.
function bumpIdle(entry: ClientEntry) {
entry.lastUsed = Date.now();
if (entry.idleTimer) clearTimeout(entry.idleTimer);
entry.idleTimer = setTimeout(() => evict(entry, "idle"), entry.ttlMs);
}
function evict(entry: ClientEntry, reason: string) {
if (!entries.has(entry.key)) return;
log(`evict`, entry.key, reason);
entries.delete(entry.key);
if (entry.idleTimer) clearTimeout(entry.idleTimer);
if (entry.unsubscribeWatchers) entry.unsubscribeWatchers();
void entry.watcher?.dispose();
void entry.client.dispose();
// Auto Shutdown - If this was the last entry, there's nothing left to
// manage. Tear down the daemon so it doesn't sit idle forever.
if (entries.size === 0) {
shutdownDaemon("all entries evicted");
}
}
// Sync File - Ensures the language server has the current contents of the
// file. Sends didOpen on first access, didChange on subsequent calls when
// the on-disk mtime has advanced. Serialized per-entry to avoid races.
async function syncFile(
entry: ClientEntry,
filePath: string,
): Promise<{ uri: string; changed: boolean }> {
const uri = pathToUri(filePath);
const run = async () => {
const stat = fs.statSync(filePath);
const prev = entry.opened.get(uri);
if (prev === undefined) {
entry.client.openDocument(filePath);
entry.opened.set(uri, stat.mtimeMs);
return { uri, changed: true };
} else if (prev !== stat.mtimeMs) {
entry.client.notifyChange(filePath);
entry.opened.set(uri, stat.mtimeMs);
return { uri, changed: true };
}
return { uri, changed: false };
};
// Chain onto the per-entry serializer so concurrent syncs queue up.
const next = entry.serializer.then(run, run);
entry.serializer = next.catch(() => undefined);
return next;
}
// Inject textDocument.uri - Mirrors the helper in commands.ts; we don't
// reuse it because the daemon path operates on raw method strings rather
// than the LspCommand union.
function withDoc(uri: string, params: Record<string, unknown>): Record<string, unknown> {
const existing = (params.textDocument as Record<string, unknown>) ?? {};
return { ...params, textDocument: { uri, ...existing } };
}
// Handle Request - Dispatches a single parsed DaemonRequest. Returns a
// DaemonResponse; never throws (errors are returned as { ok: false }).
async function handle(req: DaemonRequest): Promise<DaemonResponse> {
try {
switch (req.op) {
case "request": {
const filePath = path.resolve(req.file);
const entry = await getOrCreateEntry(filePath, req.serverId, req.launch);
const { uri } = await syncFile(entry, filePath);
bumpIdle(entry);
const result = await entry.client.sendRequest(
req.method,
withDoc(uri, req.params),
);
return { id: req.id, ok: true, result };
}
case "diagnostics": {
const filePath = path.resolve(req.file);
const timeoutMs = req.timeoutMs ?? 1500;
// Fan-Out - Run diagnostics against all requested servers in
// parallel. Individual failures are captured, not thrown.
const results: Record<string, unknown> = {};
const settled = await Promise.allSettled(
req.serverIds.map(async (serverId) => {
const entry = await getOrCreateEntry(filePath, serverId, req.launch);
const { uri, changed } = await syncFile(entry, filePath);
bumpIdle(entry);
if (changed) entry.client.clearDiagnostics(uri);
const diag = await entry.client.waitForDiagnostics(uri, timeoutMs);
return { serverId, diag };
}),
);
for (const outcome of settled) {
if (outcome.status === "fulfilled") {
results[outcome.value.serverId] = outcome.value.diag;
}
}
return { id: req.id, ok: true, result: results };
}
case "status": {
const result = {
socket: socketPath(),
servers: Array.from(entries.values()).map((e) => ({
id: e.server.id,
rootDir: e.rootDir,
openedFiles: Array.from(e.opened.keys()),
idleMs: Date.now() - e.lastUsed,
ttlMs: e.ttlMs,
})),
};
return { id: req.id, ok: true, result };
}
case "destroy_server": {
// Manual Kill - Evict entries matching the server ID (or all if
// unspecified). This is explicitly destructive; the caller knows
// what it's doing.
const toDestroy = req.serverId
? Array.from(entries.values()).filter(
(e) => e.server.id === req.serverId,
)
: Array.from(entries.values());
for (const entry of toDestroy) {
evict(entry, "manual destroy");
}
// Full Shutdown - If destroying all servers and nothing is left
// (including the case where no entries existed), tear down the
// daemon so it doesn't sit idle.
if (!req.serverId && entries.size === 0) {
setImmediate(() => shutdownDaemon("destroy all"));
}
return {
id: req.id,
ok: true,
result: { destroyed: toDestroy.map((e) => e.key) },
};
}
case "shutdown": {
// Acknowledge first, then tear down on next tick so the response
// has a chance to flush before we close listeners.
setImmediate(() => shutdownDaemon("shutdown request"));
return { id: req.id, ok: true, result: { stopping: true } };
}
}
} catch (err) {
return {
id: req.id,
ok: false,
error: (err as Error)?.message ?? String(err),
};
}
}
// Handle Connection - Reads NDJSON from a client socket; each line is one
// independent request. Multiple requests may share a connection.
function handleConnection(sock: net.Socket) {
let buf = "";
sock.on("data", async (chunk) => {
buf += chunk.toString("utf8");
let nl: number;
// Process All Complete Lines - Leftover stays in buf for the next chunk.
while ((nl = buf.indexOf("\n")) !== -1) {
const line = buf.slice(0, nl);
buf = buf.slice(nl + 1);
if (!line.trim()) continue;
let req: DaemonRequest;
try {
req = JSON.parse(line);
} catch (err) {
sock.write(
JSON.stringify({
id: 0,
ok: false,
error: `bad json: ${(err as Error).message}`,
}) + "\n",
);
continue;
}
const resp = await handle(req);
sock.write(JSON.stringify(resp) + "\n");
}
});
sock.on("error", (err) => log("conn error", err.message));
}
let server: net.Server | null = null;
// Shutdown Daemon - Stops accepting connections, disposes all LspClients,
// removes the socket file, and exits. Called on SIGTERM/SIGINT and via
// the explicit `shutdown` op.
function shutdownDaemon(reason: string) {
log(`shutdown`, reason);
if (server) server.close();
for (const entry of entries.values()) {
if (entry.idleTimer) clearTimeout(entry.idleTimer);
if (entry.unsubscribeWatchers) entry.unsubscribeWatchers();
void entry.watcher?.dispose();
void entry.client.dispose();
}
entries.clear();
try {
fs.unlinkSync(socketPath());
} catch {
// Ignore - already gone.
}
// Give pending writes a moment, then exit.
setTimeout(() => process.exit(0), 100);
}
// Start Daemon - Binds the Unix socket, handling stale-socket cleanup.
// If another daemon is already listening, we exit cleanly so racing
// `ensureDaemon` callers converge on a single instance.
export async function startDaemon(): Promise<void> {
const sock = socketPath();
// Stale Socket Detection - If something exists at the path, try to
// connect. A successful connect means another daemon owns it (we exit);
// a failed connect means the socket file is stale (we unlink it).
if (fs.existsSync(sock)) {
try {
const probe = await tryConnect(sock, 200);
probe.destroy();
log(`another daemon already listening on ${sock}, exiting`);
process.exit(0);
} catch {
try {
fs.unlinkSync(sock);
} catch {
// Ignore - listen() will surface a clearer error if this matters.
}
}
}
server = net.createServer(handleConnection);
await new Promise<void>((resolve, reject) => {
server!.once("error", reject);
server!.listen(sock, () => {
// Restrict Permissions - Socket is per-user; nobody else should poke at it.
try {
fs.chmodSync(sock, 0o600);
} catch {
// Ignore - best effort.
}
resolve();
});
});
log(`listening on ${sock} (logs: ${logPath()})`);
process.on("SIGTERM", () => shutdownDaemon("SIGTERM"));
process.on("SIGINT", () => shutdownDaemon("SIGINT"));
}