- Quick Start
- Manual Setup
- Project Structure
- Architecture
- Database Schema
- API Documentation
- CLI Usage
- Additional Information
- Plugin Model
- Testing in Kubernetes with Kind
- Improvement
To quickly get started with the Task Service, use the following command:
make bootstrap
docker-compose -f docker-compose.demo.yaml upThis will start all the necessary services, including the server, database, and worker.
For a more detailed setup process, follow these steps:
Install Pixi and activate the shell:
make bootstrap
# Run Database
docker-compose up -dStart the server:
make run-serverAccess at https://127.0.0.1:8080
Build and test:
make build-cli
./bin/task-cli --helpInstall dependencies and build:
npm install
npm run devAccess at https://127.0.0.1:3000
Start worker instances:
./bin/task-cli serve --log-level debugtask/
├── cmd/
│ ├── cli/ # CLI for task management
│ └── server/ # Server entry point
├── pkg/
│ ├── config/ # Configuration management
│ ├── gen/ # GRPC generated code
│ ├── plugins/ # Plugin model
│ ├── worker/ # Worker code
│ └── x/ # Utility functions
├── idl/
│ └── proto/ # Protocol buffer definitions
├── clients/
│ └── dashboard/ # NextJS Dashboard
├── charts/
│ └── task/ # Helm charts for deployment
├── server/
│ ├── repository/ # Database ORM
│ └── root/ # Server Root
│ └── route/ # All Server Routes
└── docs/ # Documentation files
The Task Service follows a distributed architecture with separate components for the control plane and data plane. Here's a high-level overview of the system:
graph TD
%% Clients
A[Dashboard Client] -->|Sends Request| B(Server)
C[CLI Client] -->|Sends Request| B(Server)
%% Control Plane
subgraph Control Plane
B(Server) -->|Reads/Writes| D[(PostgreSQL Database)]
end
%% Data Plane
subgraph Data Plane
E[Agent] -->|Initiates Connection| B[Server]
B[Server] -->|Publish W| E[Agent]
E -->|Creates CRD| H[CRD]
F[Controller] -->|Watches CRD| H
F -->|Executes Task| J[Task Execution]
F -->|Sends Status Update| B
end
This architecture allows for:
- Separation of concerns between the control plane (server) and data plane (workers)
- Scalability of worker nodes to handle increased workloads
- Asynchronous task execution through message queuing
- Real-time status updates from workers to the server
The server interacts with the database for persistent storage of tasks and their history. Here's a summary of the database operations:
-
Read Operations
- Get Task by ID
- Purpose: Retrieve details of a specific task
- Frequency: On-demand, triggered by API requests
- List All Tasks
- Purpose: Retrieve a list of all tasks
- Frequency: On-demand, typically for dashboard or reporting
- List Task History
- Purpose: Retrieve the status change history of a specific task
- Frequency: On-demand, for detailed task analysis
- Get Task by ID
-
Write Operations
- Create New Task
- Purpose: Store a newly created task
- Frequency: Each time a new task is submitted
- Update Task Status
- Purpose: Modify the status of an existing task
- Frequency: As task states change (e.g., from queued to running to completed)
- Create Task History Entry
- Purpose: Log task status changes and creation events
- Frequency: On task creation and each status change
- Create New Task
The Task Service uses a PostgreSQL database to store task and task history information. Below is an Entity-Relationship Diagram (ERD) representing the database schema:
erDiagram
%% Task Model
TASK {
int id PK
string name
int type
int status
jsonb payload
int retries
int priority
timestamp created_at
}
%% TaskHistory Model
TASK_HISTORY {
int id PK
int task_id FK
int status
string details
timestamp created_at
}
%% Relationships
TASK ||--o{ TASK_HISTORY : has
%% Indexes (described as comments)
%% Indexes for TASK
%% - idx_type_status (type, status)
%% - idx_created_at (created_at)
%% - idx_status_created_at (status, created_at)
%% Indexes for TASK_HISTORY
%% - idx_task_id_created_at (task_id, created_at)
Note: Ideally, we should create separate tables for tasks 📝 and task executions ⚙️. When a task is created, it should be added to the task table. Upon triggering an execution, a corresponding entry should be created in the execution table, and the execution data should be published to the PostgreSQL queue for processing 📬. This way, the task status remains unchanged, and only the execution status is updated in the execution table ✅.
-
TASK
- Stores information about individual tasks
id: Unique identifier for the task (Primary Key)name: Name of the tasktype: Type of the task (e.g., send_email, run_query)status: Current status of the task (e.g., pending, running, completed)payload: JSON object containing task-specific parametersretries: Number of retry attempts for the taskpriority: Priority level of the taskcreated_at: Timestamp of task creation
-
TASK_HISTORY
- Tracks the history of status changes for tasks
id: Unique identifier for the history entry (Primary Key)task_id: Foreign Key referencing the TASK tablestatus: Status of the task at the time of the history entrydetails: Additional details about the status changecreated_at: Timestamp of the history entry creation
- One TASK can have many TASK_HISTORY entries (one-to-many relationship)
To optimize query performance, the following indexes are implemented:
-
TASK table
idx_type_status: Composite index ontypeandstatuscolumnsidx_created_at: Index oncreated_atcolumnidx_status_created_at: Composite index onstatusandcreated_atcolumns
-
TASK_HISTORY table
idx_task_id_created_at: Composite index ontask_idandcreated_atcolumns
These indexes improve the efficiency of common queries such as filtering tasks by type and status, sorting by creation time, and retrieving task history.
The worker process follows a specific flow for task execution and error handling. Here's a detailed view of the worker's operation:
graph TD
A[Receive Message] --> B{Update Status: RUNNING}
B -->|Success| C[Run Task]
B -->|Failure| D[Log Error]
D --> K[Move to Next Message]
C --> E{Task Execution}
E -->|Success| F[Update Status: SUCCEEDED]
E -->|Failure| G[Retry Logic]
G --> H{Retry Attempt <= 3?}
H -->|Yes| I[Backoff]
I --> J[Update Status: RETRYING]
J --> C
H -->|No| K[Update Status: FAILED]
F --> L[Move to Next Message]
K --> L
The Task Service CLI provides several commands to manage tasks. Here's a detailed overview of each command and its available flags:
Create a new task with the specified name, type, and parameters.
task-cli task create [task name] --type [task type] --parameter [key=value]Flags:
--type,-t: Type of the task (e.g., send_email, run_query)--parameter,-p: Additional parameters for the task as key=value pairs (can be used multiple times)
Example:
task-cli task create "Send Newsletter" --type send_email --parameter [email protected] --parameter subject="Weekly Update"Retrieve and display the details of a specific task by its ID.
task-cli task get --id [task ID] [flags]Flags:
--id,-i: ID of the task (required)--output,-o: Output format (table, json, yaml) (default: "table")
Example:
task-cli task get --id 123 --output jsonRetrieve and display the history of a specific task by its ID.
task-cli history --id [task ID] [flags]Flags:
--id,-i: ID of the task (required)--output,-o: Output format (table, json, yaml) (default: "table")
Example:
task-cli history --id 123 --output yamlRetrieve and display a list of all tasks.
task-cli task list [flags]Flags:
--output,-o: Output format (table, json, yaml) (default: "table")--pageNumber,-n: Page number for pagination (default: 1)--pageCount,-c: Number of items per page (default: 30)
Examples:
task-cli task list
task-cli task list --output json
task-cli task list --pageNumber 2 --pageCount 20Retrieve the status counts of all tasks in the system.
task-cli task statusAliases: s, stat
Example:
task-cli task status
task-cli task sThis command will display the count of tasks for each status (e.g., PENDING, RUNNING, SUCCEEDED, FAILED).
Run end-to-end tests against the system to verify its functionality.
task-cli end2end [flags]Flags:
--num-tasks,-n: Number of tasks to create for the test (default: 100, max: 100)
Example:
task-cli end2end
task-cli end2end -n 50This command will:
- Create the specified number of tasks (default 100)
- Monitor the tasks' completion status for up to 3 minutes
- Display progress every 5 seconds
- Report the final result (success or partial completion)
The test creates a mix of "run_query" and "send_email" task types to simulate a realistic workload.
The following flag is available for all task commands:
--log-level: Set the logging level (default: "error")--address: Control Plane Address (default: "http://127.0.0.1:8080")
Example:
task-cli task list --log-level debugAll commands that display task information support three output formats:
table: Displays the information in a formatted table (default)json: Outputs the data in JSON formatyaml: Outputs the data in YAML format
Use the --output or -o flag to specify the desired format.
- Control plane (server) manages task creation, scheduling, and status updates
- Data plane (workers) executes tasks (Currently part of same binary)
- RiverQueue used for communication between control and data planes using postgres as queue backend
- Explore the UI or CLI to create and manage tasks
The Task Service uses a plugin-based architecture to allow for extensibility and customization of task execution. This model enables users to create their own task types and implement custom logic for task execution.
-
Plugin Interface: All plugins must implement the
Plugininterface defined in@pkg/plugins/plugins.go. This interface requires aRunmethod:type Plugin interface { Run(parameters map[string]string) error }
-
Plugin Registration: Plugins are registered in the
NewPluginfunction in@pkg/plugins/plugins.go. This function acts as a factory, creating the appropriate plugin based on the task type:func NewPlugin(pluginType string) (Plugin, error) { switch pluginType { case email.PLUGIN_NAME: return &email.Email{}, nil case query.PLUGIN_NAME: return &query.Query{}, nil // Add more plugin types here default: return nil, fmt.Errorf("unknown plugin type: %s", pluginType) } }
-
Custom Plugin Implementation: Users can create their own plugins by implementing the
Plugininterface. For example, theEmailplugin in@pkg/email/email.go:var PLUGIN_NAME = "send_email" type Email struct {} func (e *Email) Run(parameters map[string]string) error { // Implementation of email sending logic return nil }
-
Task Execution: When a task is executed, the system uses the
NewPluginfunction to create the appropriate plugin based on the task type. It then calls theRunmethod of the plugin, passing any necessary parameters.
To create a new plugin:
- Create a new package in the
@pkg/pluginsdirectory for your plugin. - Implement the
Plugininterface in your new package. - Add your plugin to the
NewPluginfunction in@pkg/plugins/plugins.go.
This modular approach allows for easy extension of the Task Service with new task types and functionalities.
This section guides you through setting up and testing the Task Service in a local Kubernetes cluster using Kind (Kubernetes in Docker) and Helm charts.
- Create a Kind cluster:
kind create cluster --name task-service- Set kubectl context to the new cluster:
kubectl cluster-info --context kind-task-service- Add the necessary Helm repositories:
make helmInstall the Task Service Helm chart:
helm install task-service ./charts/task -n taskCheck that all pods are running:
kubectl get podskubectl port-forward service/task 80 -n task
- Port-forward the Task Service:
kubectl port-forward -n task svc/task 8080:80-
Access the service at
http://127.0.0.1:8080 -
Use CLI to verify the connection:
./bin/task-cli task l --address http://127.0.0.1:8080To delete the Kind cluster and all resources:
kind delete cluster --name task-serviceThis setup allows you to test the entire Task Service stack, including the server, workers, and dependencies, in a local Kubernetes environment. It's an excellent way to validate the Helm charts and ensure everything works together as expected in a Kubernetes setting.