⚠️ Early Version: This is an early version of ContextWorker. Documentation is actively being developed, and the API may change.
ContextWorker is the Background Processing Engine of the ContextUnity ecosystem. Built on Temporal, it provides:
- Durable Workflows — long-running processes that survive restarts and failures
- Scheduled Jobs — cron-based recurring tasks (harvesting, enrichment, sync)
- gRPC Service — trigger workflows from other services via ContextUnit protocol
- Agent System — polling-based background agents with registry pattern
Think of it as the "Hands" of the ecosystem — executing the work orchestrated by Router.
Worker contains NO business logic. It provides infrastructure for:
- Temporal workflow/activity execution
- Schedule management
- Agent lifecycle
- gRPC service for workflow triggers
Business logic lives in domain packages:
- Commerce harvesting →
ContextCommerce - AI enrichment →
ContextRouter
All inter-service communication uses ContextUnit:
from contextcore import ContextUnit
# Trigger workflow via gRPC
unit = ContextUnit(
payload={
"workflow_type": "harvest",
"supplier_code": "camping-trade",
"tenant_id": "myproject",
},
provenance=["commerce:trigger"],
)What is gRPC? gRPC is a high-performance RPC framework using Protocol Buffers. It provides type-safe, efficient service-to-service communication with built-in streaming.
┌─────────────────────────────────────────────────────────────────────┐
│ ContextRouter │
│ (The "Mind" — Orchestration) │
├─────────────────────────────────────────────────────────────────────┤
│ • Orchestrates AI agents (Gardener, Matcher) │
│ • Routes LLM requests │
│ • Calls Worker to trigger workflows │
└───────────────────────────────────┬─────────────────────────────────┘
│ triggers via gRPC
▼
┌─────────────────────────────────────────────────────────────────────┐
│ ContextWorker │
│ (The "Hands" — Execution) │
├─────────────────────────────────────────────────────────────────────┤
│ • Executes durable Temporal workflows │
│ • Manages scheduled jobs (harvest, enrich, sync) │
│ • Runs background agents (polling loops) │
│ • Exposes gRPC service for workflow triggers │
└───────────────────────────────────┬─────────────────────────────────┘
│ runs workflows defined in
▼
┌─────────────────────────────────────────────────────────────────────┐
│ ContextCommerce │
│ (The "Store" — Domain Logic) │
├─────────────────────────────────────────────────────────────────────┤
│ • Defines harvester workflows/activities │
│ • Defines sync workflows (Horoshop, Prom) │
│ • Product catalog and taxonomy │
└─────────────────────────────────────────────────────────────────────┘
| Service | Role | How Worker Uses It |
|---|---|---|
| ContextCore | Shared types, gRPC protos | worker.proto, ContextUnit |
| ContextRouter | AI orchestration | Executes Gardener/Matcher agents |
| ContextCommerce | E-commerce platform | Provides workflows/activities |
| ContextBrain | Knowledge storage | Taxonomy lookups via gRPC |
- ⚡ Temporal Integration — durable, fault-tolerant workflow execution
- 📅 Schedule Management — create, pause, trigger recurring jobs
- 🔌 Agent Registry —
@registerdecorator for background agents - 📡 gRPC Service — WorkerService for workflow triggers
- 📈 Scalable — run multiple worker instances for parallel processing
src/contextworker/
├── __main__.py # CLI entry point
├── config.py # Pydantic settings (WorkerConfig)
├── registry.py # Agent registry (@register decorator)
├── service.py # gRPC WorkerService
├── schedules.py # Temporal schedule management
│
├── core/
│ └── temporal.py # Temporal client setup
│
├── agents/ # Background polling agents
│ ├── gardener.py # Product enrichment polling
│ ├── harvester.py # Harvest trigger agent
│ └── lexicon.py # Lexicon sync agent
│
└── harvester/ # Harvester workflow (infrastructure)
├── workflow.py # HarvestWorkflow definition
└── activities.py # Download, parse, store activities
# Install
pip install contextworker
# Run worker (discovers agents automatically)
python -m contextworker
# Run specific agents
python -m contextworker --agents gardener harvester
# Create schedules for tenant
python -m contextworker.schedules create --tenant-id myproject# Standalone (infrastructure only)
pip install contextworker
# With Commerce modules (full stack)
pip install contextcommerce # Includes contextworker as dependency# Temporal
export TEMPORAL_HOST="localhost:7233"
export WORKER_TASK_QUEUE="default"
export WORKER_MAX_CONCURRENT="10"
# Database (for activities that access Commerce)
export DATABASE_URL="postgres://user:pass@localhost/db"
# Tenant
export TENANT_ID="default"
export PROJECT_DIR="."- Full Documentation — complete guides and API reference
- Technical Reference — architecture deep-dive
- Contributing Guide — Golden Paths for adding functionality
- Temporal Docs — workflow engine documentation
See our Contributing Guide for:
- Golden Path: Adding an Agent — registry pattern, lifecycle
- Golden Path: Adding a Workflow — Temporal patterns, activities
- Golden Path: Adding a Schedule — cron configuration
This project is licensed under the terms specified in LICENSE.md.