Skip to content
Open
2 changes: 2 additions & 0 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Options struct {
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
StatsReporter *workspacestats.Reporter
MetadataBatcher *MetadataBatcher
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
Expand Down Expand Up @@ -179,6 +180,7 @@ func New(opts Options, workspace database.Workspace) *API {
Database: opts.Database,
Pubsub: opts.Pubsub,
Log: opts.Log,
Batcher: opts.MetadataBatcher,
}

api.LogsAPI = &LogsAPI{
Expand Down
47 changes: 33 additions & 14 deletions coderd/agentapi/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type MetadataAPI struct {
Database database.Store
Pubsub pubsub.Pubsub
Log slog.Logger
Batcher *MetadataBatcher

TimeNowFn func() time.Time // defaults to dbtime.Now()
}
Expand Down Expand Up @@ -122,21 +123,26 @@ func (a *MetadataAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.B
)
}

err = a.Database.UpdateWorkspaceAgentMetadata(rbacCtx, dbUpdate)
if err != nil {
return nil, xerrors.Errorf("update workspace agent metadata in database: %w", err)
}
// Use batcher if available, otherwise fall back to direct database write.
if a.Batcher != nil {
a.Batcher.Add(workspaceAgent.ID, dbUpdate.Key, dbUpdate.Value, dbUpdate.Error, dbUpdate.CollectedAt)
} else {
err = a.Database.UpdateWorkspaceAgentMetadata(rbacCtx, dbUpdate)
if err != nil {
return nil, xerrors.Errorf("update workspace agent metadata in database: %w", err)
}

payload, err := json.Marshal(WorkspaceAgentMetadataChannelPayload{
CollectedAt: collectedAt,
Keys: dbUpdate.Key,
})
if err != nil {
return nil, xerrors.Errorf("marshal workspace agent metadata channel payload: %w", err)
}
err = a.Pubsub.Publish(WatchWorkspaceAgentMetadataChannel(workspaceAgent.ID), payload)
if err != nil {
return nil, xerrors.Errorf("publish workspace agent metadata: %w", err)
payload, err := json.Marshal(WorkspaceAgentMetadataChannelPayload{
CollectedAt: collectedAt,
Keys: dbUpdate.Key,
})
if err != nil {
return nil, xerrors.Errorf("marshal workspace agent metadata channel payload: %w", err)
}
err = a.Pubsub.Publish(WatchWorkspaceAgentMetadataChannel(workspaceAgent.ID), payload)
if err != nil {
return nil, xerrors.Errorf("publish workspace agent metadata: %w", err)
}
}

// If the metadata keys were too large, we return an error so the agent can
Expand All @@ -160,6 +166,19 @@ type WorkspaceAgentMetadataChannelPayload struct {
Keys []string `json:"keys"`
}

// WorkspaceAgentMetadataBatchPayload is published to the batched metadata
// channel with agent IDs that have metadata updates. Listeners should
// re-fetch metadata for these agents from the database.
type WorkspaceAgentMetadataBatchPayload struct {
AgentIDs []uuid.UUID `json:"agent_ids"`
}

func WatchWorkspaceAgentMetadataChannel(id uuid.UUID) string {
return "workspace_agent_metadata:" + id.String()
}

// WatchWorkspaceAgentMetadataBatchChannel returns the global channel name for
// batched metadata updates across all agents.
func WatchWorkspaceAgentMetadataBatchChannel() string {
return "workspace_agent_metadata_batch"
}
Loading
Loading