text/x-rust
•
5.41 KB
•
195 lines
//! Runner registry for remote CI runners
use std::sync::Arc;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::mpsc;
use uuid::Uuid;
/// Tags describing runner capabilities
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RunnerTags {
/// Operating system: linux, darwin, freebsd
pub os: String,
/// CPU architecture: x86_64, aarch64
pub arch: String,
/// Additional tags like "nixos"
#[serde(default)]
pub extra: Vec<String>,
}
impl RunnerTags {
/// Create tags for the current system
pub fn current() -> Self {
let os = std::env::consts::OS.to_string();
let arch = match std::env::consts::ARCH {
"x86_64" | "amd64" => "x86_64".to_string(),
"aarch64" | "arm64" => "aarch64".to_string(),
other => other.to_string(),
};
Self {
os,
arch,
extra: vec!["nixos".to_string()], // Hardcoded for now
}
}
/// Check if this runner can handle jobs requiring the given tags
pub fn matches(&self, required_os: Option<&str>, required_arch: Option<&str>) -> bool {
if let Some(os) = required_os {
if self.os != os {
return false;
}
}
if let Some(arch) = required_arch {
if self.arch != arch {
return false;
}
}
true
}
}
/// Events sent to runners via SSE
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum RunnerJobEvent {
/// Periodic heartbeat from server
Heartbeat {
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
/// Job assignment
Job(RemoteJobAssignment),
/// Cancel a running job
Cancel { job_id: Uuid },
}
/// Job assignment sent to runner
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteJobAssignment {
pub job_id: Uuid,
pub owner: String,
pub project: String,
pub branch: String,
pub commit_hash: String,
/// Workflow definition as JSON (allows avoiding circular dependency)
pub workflow: serde_json::Value,
}
/// A connected remote runner
#[derive(Debug)]
pub struct ConnectedRunner {
pub id: Uuid,
pub token_hash: String,
pub tags: RunnerTags,
pub connected_at: OffsetDateTime,
pub last_heartbeat: OffsetDateTime,
pub current_job: Option<Uuid>,
/// Channel to send events to this runner
pub event_tx: mpsc::Sender<RunnerJobEvent>,
}
impl ConnectedRunner {
pub fn new(
token_hash: String,
tags: RunnerTags,
event_tx: mpsc::Sender<RunnerJobEvent>,
) -> Self {
let now = OffsetDateTime::now_utc();
Self {
id: Uuid::now_v7(),
token_hash,
tags,
connected_at: now,
last_heartbeat: now,
current_job: None,
event_tx,
}
}
/// Check if this runner is idle (not currently running a job)
pub fn is_idle(&self) -> bool {
self.current_job.is_none()
}
}
/// In-memory registry of connected runners
#[derive(Debug, Default)]
pub struct RunnerRegistry {
/// Map from runner ID to runner state
runners: DashMap<Uuid, Arc<tokio::sync::RwLock<ConnectedRunner>>>,
/// Map from token_hash to runner ID (for auth lookup)
token_to_runner: DashMap<String, Uuid>,
}
impl RunnerRegistry {
pub fn new() -> Self {
Self::default()
}
/// Register a new runner
pub fn register(&self, runner: ConnectedRunner) -> Uuid {
let id = runner.id;
let token_hash = runner.token_hash.clone();
self.runners
.insert(id, Arc::new(tokio::sync::RwLock::new(runner)));
self.token_to_runner.insert(token_hash, id);
id
}
/// Unregister a runner by ID
pub async fn unregister(&self, runner_id: Uuid) -> Option<Arc<tokio::sync::RwLock<ConnectedRunner>>> {
if let Some((_, runner)) = self.runners.remove(&runner_id) {
// Clean up token mapping
let token_hash = {
let runner_guard = runner.read().await;
runner_guard.token_hash.clone()
};
self.token_to_runner.remove(&token_hash);
Some(runner)
} else {
None
}
}
/// Get a runner by ID
pub fn get(&self, runner_id: Uuid) -> Option<Arc<tokio::sync::RwLock<ConnectedRunner>>> {
self.runners.get(&runner_id).map(|r| r.value().clone())
}
/// Get a runner by token hash
pub fn get_by_token(&self, token_hash: &str) -> Option<Arc<tokio::sync::RwLock<ConnectedRunner>>> {
self.token_to_runner
.get(token_hash)
.and_then(|id| self.get(*id))
}
/// Get all runner IDs
pub fn runner_ids(&self) -> Vec<Uuid> {
self.runners.iter().map(|r| *r.key()).collect()
}
/// Count of connected runners
pub fn count(&self) -> usize {
self.runners.len()
}
/// Find an idle runner matching the given requirements
pub async fn find_idle_runner(
&self,
required_os: Option<&str>,
required_arch: Option<&str>,
) -> Option<Uuid> {
for entry in self.runners.iter() {
let runner = entry.value().read().await;
if runner.is_idle() && runner.tags.matches(required_os, required_arch) {
return Some(runner.id);
}
}
None
}
}