Login
4 branches 0 tags
Ben (Desktop/Arch) Remote CI 3cb47c1 14 days ago 247 Commits
rubhub / crates / state / src / runner.rs
//! Runner registry for remote CI runners

use std::sync::Arc;

use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::mpsc;
use uuid::Uuid;

/// Tags describing runner capabilities
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RunnerTags {
    /// Operating system: linux, darwin, freebsd
    pub os: String,
    /// CPU architecture: x86_64, aarch64
    pub arch: String,
    /// Additional tags like "nixos"
    #[serde(default)]
    pub extra: Vec<String>,
}

impl RunnerTags {
    /// Create tags for the current system
    pub fn current() -> Self {
        let os = std::env::consts::OS.to_string();
        let arch = match std::env::consts::ARCH {
            "x86_64" | "amd64" => "x86_64".to_string(),
            "aarch64" | "arm64" => "aarch64".to_string(),
            other => other.to_string(),
        };

        Self {
            os,
            arch,
            extra: vec!["nixos".to_string()], // Hardcoded for now
        }
    }

    /// Check if this runner can handle jobs requiring the given tags
    pub fn matches(&self, required_os: Option<&str>, required_arch: Option<&str>) -> bool {
        if let Some(os) = required_os {
            if self.os != os {
                return false;
            }
        }
        if let Some(arch) = required_arch {
            if self.arch != arch {
                return false;
            }
        }
        true
    }
}

/// Events sent to runners via SSE
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum RunnerJobEvent {
    /// Periodic heartbeat from server
    Heartbeat {
        #[serde(with = "time::serde::rfc3339")]
        timestamp: OffsetDateTime,
    },
    /// Job assignment
    Job(RemoteJobAssignment),
    /// Cancel a running job
    Cancel { job_id: Uuid },
}

/// Job assignment sent to runner
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteJobAssignment {
    pub job_id: Uuid,
    pub owner: String,
    pub project: String,
    pub branch: String,
    pub commit_hash: String,
    /// Workflow definition as JSON (allows avoiding circular dependency)
    pub workflow: serde_json::Value,
}

/// A connected remote runner
#[derive(Debug)]
pub struct ConnectedRunner {
    pub id: Uuid,
    pub token_hash: String,
    pub tags: RunnerTags,
    pub connected_at: OffsetDateTime,
    pub last_heartbeat: OffsetDateTime,
    pub current_job: Option<Uuid>,
    /// Channel to send events to this runner
    pub event_tx: mpsc::Sender<RunnerJobEvent>,
}

impl ConnectedRunner {
    pub fn new(
        token_hash: String,
        tags: RunnerTags,
        event_tx: mpsc::Sender<RunnerJobEvent>,
    ) -> Self {
        let now = OffsetDateTime::now_utc();
        Self {
            id: Uuid::now_v7(),
            token_hash,
            tags,
            connected_at: now,
            last_heartbeat: now,
            current_job: None,
            event_tx,
        }
    }

    /// Check if this runner is idle (not currently running a job)
    pub fn is_idle(&self) -> bool {
        self.current_job.is_none()
    }
}

/// In-memory registry of connected runners
#[derive(Debug, Default)]
pub struct RunnerRegistry {
    /// Map from runner ID to runner state
    runners: DashMap<Uuid, Arc<tokio::sync::RwLock<ConnectedRunner>>>,
    /// Map from token_hash to runner ID (for auth lookup)
    token_to_runner: DashMap<String, Uuid>,
}

impl RunnerRegistry {
    pub fn new() -> Self {
        Self::default()
    }

    /// Register a new runner
    pub fn register(&self, runner: ConnectedRunner) -> Uuid {
        let id = runner.id;
        let token_hash = runner.token_hash.clone();
        self.runners
            .insert(id, Arc::new(tokio::sync::RwLock::new(runner)));
        self.token_to_runner.insert(token_hash, id);
        id
    }

    /// Unregister a runner by ID
    pub async fn unregister(&self, runner_id: Uuid) -> Option<Arc<tokio::sync::RwLock<ConnectedRunner>>> {
        if let Some((_, runner)) = self.runners.remove(&runner_id) {
            // Clean up token mapping
            let token_hash = {
                let runner_guard = runner.read().await;
                runner_guard.token_hash.clone()
            };
            self.token_to_runner.remove(&token_hash);
            Some(runner)
        } else {
            None
        }
    }

    /// Get a runner by ID
    pub fn get(&self, runner_id: Uuid) -> Option<Arc<tokio::sync::RwLock<ConnectedRunner>>> {
        self.runners.get(&runner_id).map(|r| r.value().clone())
    }

    /// Get a runner by token hash
    pub fn get_by_token(&self, token_hash: &str) -> Option<Arc<tokio::sync::RwLock<ConnectedRunner>>> {
        self.token_to_runner
            .get(token_hash)
            .and_then(|id| self.get(*id))
    }

    /// Get all runner IDs
    pub fn runner_ids(&self) -> Vec<Uuid> {
        self.runners.iter().map(|r| *r.key()).collect()
    }

    /// Count of connected runners
    pub fn count(&self) -> usize {
        self.runners.len()
    }

    /// Find an idle runner matching the given requirements
    pub async fn find_idle_runner(
        &self,
        required_os: Option<&str>,
        required_arch: Option<&str>,
    ) -> Option<Uuid> {
        for entry in self.runners.iter() {
            let runner = entry.value().read().await;
            if runner.is_idle() && runner.tags.matches(required_os, required_arch) {
                return Some(runner.id);
            }
        }
        None
    }
}