Skip to content

Commit 79a58b1

Browse files
Callum Styanclaude
andcommitted
feat: add backend polling for workspace agent metadata
Replace frontend polling with backend polling via WebSocket for agent metadata updates. The backend polls the database every 15 seconds and streams updates to the frontend over WebSocket. Changes: - Add watchWorkspaceAgentMetadataPolling function that polls DB every 15s - Add /watch-metadata-polling-ws endpoint for WebSocket-based streaming - Update AgentMetadata.tsx to use new WebSocket endpoint - Keep existing pubsub-based endpoints for backward compatibility 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 73243da commit 79a58b1

File tree

4 files changed

+236
-23
lines changed

4 files changed

+236
-23
lines changed

coderd/coderd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,7 @@ func New(options *Options) *API {
14311431
r.Get("/metadata", api.workspaceAgentMetadata)
14321432
r.Get("/watch-metadata", api.watchWorkspaceAgentMetadataSSE)
14331433
r.Get("/watch-metadata-ws", api.watchWorkspaceAgentMetadataWS)
1434+
r.Get("/watch-metadata-polling-ws", api.watchWorkspaceAgentMetadataPollingWS)
14341435
r.Get("/startup-logs", api.workspaceAgentLogsDeprecated)
14351436
r.Get("/logs", api.workspaceAgentLogs)
14361437
r.Get("/listening-ports", api.workspaceAgentListeningPorts)

coderd/workspaceagents.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,6 +1646,177 @@ func (api *API) watchWorkspaceAgentMetadataWS(rw http.ResponseWriter, r *http.Re
16461646
api.watchWorkspaceAgentMetadata(rw, r, httpapi.OneWayWebSocketEventSender)
16471647
}
16481648

1649+
// @Summary Watch for workspace agent metadata updates via WebSockets (polling-based)
1650+
// @ID watch-for-workspace-agent-metadata-updates-via-websockets-polling
1651+
// @Security CoderSessionToken
1652+
// @Produce json
1653+
// @Tags Agents
1654+
// @Success 200 {object} codersdk.ServerSentEvent
1655+
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
1656+
// @Router /workspaceagents/{workspaceagent}/watch-metadata-polling-ws [get]
1657+
// @x-apidocgen {"skip": true}
1658+
func (api *API) watchWorkspaceAgentMetadataPollingWS(rw http.ResponseWriter, r *http.Request) {
1659+
api.watchWorkspaceAgentMetadataPolling(rw, r, httpapi.OneWayWebSocketEventSender)
1660+
}
1661+
1662+
func (api *API) watchWorkspaceAgentMetadataPolling(
1663+
rw http.ResponseWriter,
1664+
r *http.Request,
1665+
connect httpapi.EventSender,
1666+
) {
1667+
// Allow us to interrupt watch via cancel.
1668+
ctx, cancel := context.WithCancel(r.Context())
1669+
defer cancel()
1670+
r = r.WithContext(ctx) // Rewire context for cancellation.
1671+
1672+
workspaceAgent := httpmw.WorkspaceAgentParam(r)
1673+
log := api.Logger.Named("workspace_metadata_watcher_polling").With(
1674+
slog.F("workspace_agent_id", workspaceAgent.ID),
1675+
)
1676+
1677+
// We always use the original Request context because it contains
1678+
// the RBAC actor.
1679+
initialMD, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
1680+
WorkspaceAgentID: workspaceAgent.ID,
1681+
Keys: nil,
1682+
})
1683+
if err != nil {
1684+
// If we can't successfully pull the initial metadata, polling
1685+
// updates will be no-op so we may as well terminate the
1686+
// connection early.
1687+
httpapi.InternalServerError(rw, err)
1688+
return
1689+
}
1690+
1691+
log.Debug(ctx, "got initial metadata", "num", len(initialMD))
1692+
1693+
metadataMap := make(map[string]database.WorkspaceAgentMetadatum, len(initialMD))
1694+
for _, datum := range initialMD {
1695+
metadataMap[datum.Key] = datum
1696+
}
1697+
//nolint:ineffassign // Release memory.
1698+
initialMD = nil
1699+
1700+
sendEvent, senderClosed, err := connect(rw, r)
1701+
if err != nil {
1702+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1703+
Message: "Internal error setting up websocket.",
1704+
Detail: err.Error(),
1705+
})
1706+
return
1707+
}
1708+
// Prevent handler from returning until the sender is closed.
1709+
defer func() {
1710+
cancel()
1711+
<-senderClosed
1712+
}()
1713+
// Synchronize cancellation from WebSocket -> context, this lets us simplify the
1714+
// cancellation logic.
1715+
go func() {
1716+
select {
1717+
case <-ctx.Done():
1718+
case <-senderClosed:
1719+
cancel()
1720+
}
1721+
}()
1722+
1723+
var lastSend time.Time
1724+
sendMetadata := func() {
1725+
lastSend = time.Now()
1726+
values := maps.Values(metadataMap)
1727+
1728+
log.Debug(ctx, "sending metadata", "num", len(values))
1729+
1730+
_ = sendEvent(codersdk.ServerSentEvent{
1731+
Type: codersdk.ServerSentEventTypeData,
1732+
Data: convertWorkspaceAgentMetadata(values),
1733+
})
1734+
}
1735+
1736+
// We send updates exactly every second.
1737+
const sendInterval = time.Second * 1
1738+
sendTicker := time.NewTicker(sendInterval)
1739+
defer sendTicker.Stop()
1740+
1741+
// Log the request immediately instead of after it completes.
1742+
if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil {
1743+
rl.WriteLog(ctx, http.StatusAccepted)
1744+
}
1745+
1746+
// Send initial metadata.
1747+
sendMetadata()
1748+
1749+
// Poll for metadata updates every 15 seconds.
1750+
fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum)
1751+
go func() {
1752+
defer close(fetchedMetadata)
1753+
defer cancel()
1754+
1755+
ticker := time.NewTicker(15 * time.Second)
1756+
defer ticker.Stop()
1757+
1758+
for {
1759+
select {
1760+
case <-ctx.Done():
1761+
return
1762+
case <-ticker.C:
1763+
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
1764+
WorkspaceAgentID: workspaceAgent.ID,
1765+
Keys: nil, // Get all metadata
1766+
})
1767+
if err != nil {
1768+
if !database.IsQueryCanceledError(err) {
1769+
log.Error(ctx, "failed to get metadata", slog.Error(err))
1770+
_ = sendEvent(codersdk.ServerSentEvent{
1771+
Type: codersdk.ServerSentEventTypeError,
1772+
Data: codersdk.Response{
1773+
Message: "Failed to get metadata.",
1774+
Detail: err.Error(),
1775+
},
1776+
})
1777+
}
1778+
return
1779+
}
1780+
select {
1781+
case <-ctx.Done():
1782+
return
1783+
case fetchedMetadata <- md:
1784+
log.Debug(ctx, "fetched metadata from poll", "num", len(md))
1785+
}
1786+
}
1787+
}
1788+
}()
1789+
defer func() {
1790+
<-fetchedMetadata
1791+
}()
1792+
1793+
pendingChanges := true
1794+
for {
1795+
select {
1796+
case <-ctx.Done():
1797+
return
1798+
case md, ok := <-fetchedMetadata:
1799+
if !ok {
1800+
return
1801+
}
1802+
for _, datum := range md {
1803+
metadataMap[datum.Key] = datum
1804+
}
1805+
pendingChanges = true
1806+
continue
1807+
case <-sendTicker.C:
1808+
// We send an update even if there's no change every 5 seconds
1809+
// to ensure that the frontend always has an accurate "Result.Age".
1810+
if !pendingChanges && time.Since(lastSend) < 5*time.Second {
1811+
continue
1812+
}
1813+
pendingChanges = false
1814+
}
1815+
1816+
sendMetadata()
1817+
}
1818+
}
1819+
16491820
func (api *API) watchWorkspaceAgentMetadata(
16501821
rw http.ResponseWriter,
16511822
r *http.Request,

site/src/api/api.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ export const watchAgentMetadata = (
127127
});
128128
};
129129

130+
export const watchAgentMetadataPolling = (
131+
agentId: string,
132+
): OneWayWebSocket<TypesGen.ServerSentEvent> => {
133+
return new OneWayWebSocket({
134+
apiRoute: `/api/v2/workspaceagents/${agentId}/watch-metadata-polling-ws`,
135+
});
136+
};
137+
130138
/**
131139
* @returns {OneWayWebSocket} A OneWayWebSocket that emits Server-Sent Events.
132140
*/

site/src/modules/resources/AgentMetadata.tsx

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import Skeleton from "@mui/material/Skeleton";
2-
import { API } from "api/api";
2+
import { watchAgentMetadataPolling } from "api/api";
33
import type {
4+
ServerSentEvent,
45
WorkspaceAgent,
56
WorkspaceAgentMetadata,
67
} from "api/typesGenerated";
@@ -21,6 +22,7 @@ import {
2122
useState,
2223
} from "react";
2324
import { cn } from "utils/cn";
25+
import type { OneWayWebSocket } from "utils/OneWayWebSocket";
2426

2527
type ItemStatus = "stale" | "valid" | "loading";
2628

@@ -46,7 +48,7 @@ interface AgentMetadataProps {
4648
initialMetadata?: WorkspaceAgentMetadata[];
4749
}
4850

49-
const pollInterval = 15000; // 15 seconds
51+
const maxSocketErrorRetryCount = 3;
5052

5153
export const AgentMetadata: FC<AgentMetadataProps> = ({
5254
agent,
@@ -55,39 +57,70 @@ export const AgentMetadata: FC<AgentMetadataProps> = ({
5557
const [activeMetadata, setActiveMetadata] = useState(initialMetadata);
5658

5759
useEffect(() => {
60+
// This is an unfortunate pitfall with this component's testing setup,
61+
// but even though we use the value of initialMetadata as the initial
62+
// value of the activeMetadata, we cannot put activeMetadata itself into
63+
// the dependency array. If we did, we would destroy and rebuild each
64+
// connection every single time a new message comes in from the socket,
65+
// because the socket has to be wired up to the state setter
5866
if (initialMetadata !== undefined) {
5967
return;
6068
}
6169

62-
let isMounted = true;
70+
let timeoutId: number | undefined;
71+
let activeSocket: OneWayWebSocket<ServerSentEvent> | null = null;
72+
let retries = 0;
6373

64-
const fetchMetadata = async () => {
65-
try {
66-
const metadata = await API.getWorkspaceAgentMetadata(agent.id);
67-
if (isMounted) {
68-
setActiveMetadata(metadata);
69-
}
70-
} catch (error) {
71-
if (isMounted) {
72-
console.error("Failed to fetch agent metadata:", error);
74+
const createNewConnection = () => {
75+
const socket = watchAgentMetadataPolling(agent.id);
76+
activeSocket = socket;
77+
78+
socket.addEventListener("error", () => {
79+
setActiveMetadata(undefined);
80+
window.clearTimeout(timeoutId);
81+
82+
// The error event is supposed to fire when an error happens
83+
// with the connection itself, which implies that the connection
84+
// would auto-close. Couldn't find a definitive answer on MDN,
85+
// though, so closing it manually just to be safe
86+
socket.close();
87+
activeSocket = null;
88+
89+
retries++;
90+
if (retries >= maxSocketErrorRetryCount) {
7391
displayError(
74-
"Failed to fetch agent metadata. Will retry automatically.",
92+
"Unexpected disconnect while watching Metadata changes. Please try refreshing the page.",
7593
);
94+
return;
7695
}
77-
}
78-
};
7996

80-
// Fetch immediately
81-
void fetchMetadata();
97+
displayError(
98+
"Unexpected disconnect while watching Metadata changes. Creating new connection...",
99+
);
100+
timeoutId = window.setTimeout(() => {
101+
createNewConnection();
102+
}, 3_000);
103+
});
82104

83-
// Then poll every 15 seconds
84-
const intervalId = setInterval(() => {
85-
void fetchMetadata();
86-
}, pollInterval);
105+
socket.addEventListener("message", (e) => {
106+
if (e.parseError) {
107+
displayError(
108+
"Unable to process newest response from server. Please try refreshing the page.",
109+
);
110+
return;
111+
}
112+
113+
const msg = e.parsedMessage;
114+
if (msg.type === "data") {
115+
setActiveMetadata(msg.data as WorkspaceAgentMetadata[]);
116+
}
117+
});
118+
};
87119

120+
createNewConnection();
88121
return () => {
89-
isMounted = false;
90-
clearInterval(intervalId);
122+
window.clearTimeout(timeoutId);
123+
activeSocket?.close();
91124
};
92125
}, [agent.id, initialMetadata]);
93126

0 commit comments

Comments
 (0)