Login
4 branches 0 tags
Ben (Desktop/Arch) Code cleanup ada8ea6 11 days ago 251 Commits
rubhub / crates / ci / src / executor / remote.rs
//! Remote CI runner management
//!
//! Handles heartbeat checking, job dispatch, and runner lifecycle management.

use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use time::OffsetDateTime;
use tokio::time::interval;

use crate::models::{CiJob, CiJobStatus};
use rubhub_state::{CurrentJob, RemoteJobAssignment, RunnerJobEvent, RunnerRegistry};

/// Heartbeat timeout in seconds
const HEARTBEAT_TIMEOUT_SECS: i64 = 60;

/// Heartbeat check interval in seconds
const HEARTBEAT_CHECK_INTERVAL_SECS: u64 = 10;

/// Server heartbeat interval (how often we send heartbeats to runners)
const SERVER_HEARTBEAT_INTERVAL_SECS: u64 = 30;

/// Spawn the heartbeat checker background task
pub fn spawn_heartbeat_checker(
    runner_registry: Arc<RunnerRegistry>,
    ci_root: Arc<Path>,
) {
    tokio::spawn(async move {
        let mut check_interval = interval(Duration::from_secs(HEARTBEAT_CHECK_INTERVAL_SECS));

        loop {
            check_interval.tick().await;

            let now = OffsetDateTime::now_utc();
            let timeout = time::Duration::seconds(HEARTBEAT_TIMEOUT_SECS);

            // Check all runners for timeout
            let runner_ids = runner_registry.runner_ids();
            for runner_id in runner_ids {
                if let Some(runner) = runner_registry.get(runner_id) {
                    let (timed_out, current_job) = {
                        let runner = runner.read().await;
                        let elapsed = now - runner.last_heartbeat;
                        (elapsed > timeout, runner.current_job.clone())
                    };

                    if timed_out {
                        // Runner timed out - handle disconnect
                        handle_runner_disconnect(&runner_registry, &ci_root, runner_id, current_job)
                            .await;
                    }
                }
            }
        }
    });
}

/// Spawn the server heartbeat sender (sends periodic heartbeats to all connected runners)
pub fn spawn_server_heartbeat_sender(runner_registry: Arc<RunnerRegistry>) {
    tokio::spawn(async move {
        let mut heartbeat_interval = interval(Duration::from_secs(SERVER_HEARTBEAT_INTERVAL_SECS));

        loop {
            heartbeat_interval.tick().await;

            let heartbeat_event = RunnerJobEvent::Heartbeat {
                timestamp: OffsetDateTime::now_utc(),
            };

            // Send heartbeat to all connected runners
            let runner_ids = runner_registry.runner_ids();
            for runner_id in runner_ids {
                if let Some(runner) = runner_registry.get(runner_id) {
                    let tx = {
                        let runner = runner.read().await;
                        runner.event_tx.clone()
                    };
                    // Ignore send errors (runner might be disconnecting)
                    let _ = tx.send(heartbeat_event.clone()).await;
                }
            }
        }
    });
}

/// Handle a runner disconnect (timeout or explicit disconnect)
async fn handle_runner_disconnect(
    runner_registry: &RunnerRegistry,
    ci_root: &Path,
    runner_id: uuid::Uuid,
    current_job: Option<CurrentJob>,
) {
    println!("Runner {} timed out, disconnecting", runner_id);

    // If the runner was working on a job, mark it as failed
    if let Some(job) = current_job
        && let Err(e) = mark_job_failed(ci_root, &job, "Runner disconnected").await
    {
        eprintln!("Failed to mark job {} as failed: {}", job.job_id, e);
    }

    // Unregister the runner
    runner_registry.unregister(runner_id).await;
}

/// Mark a job as failed
async fn mark_job_failed(
    ci_root: &Path,
    current_job: &CurrentJob,
    reason: &str,
) -> anyhow::Result<()> {
    // Construct job directory directly from CurrentJob
    let job_dir = ci_root
        .join(&current_job.owner)
        .join(&current_job.project)
        .join(current_job.job_id.to_string());

    // Load the job metadata
    let job_file = job_dir.join("job.json");
    let content = tokio::fs::read_to_string(&job_file).await?;
    let mut job: CiJob = serde_json::from_str(&content)?;

    // Update job status
    job.status = CiJobStatus::Failed;
    job.finished_at = Some(OffsetDateTime::now_utc());

    // Mark all pending jobs within the run as failed
    for job_result in &mut job.jobs {
        if job_result.status == CiJobStatus::Running || job_result.status == CiJobStatus::Pending {
            job_result.status = CiJobStatus::Failed;
            job_result.finished_at = Some(OffsetDateTime::now_utc());
        }
    }

    // Save updated job metadata atomically
    let content = serde_json::to_string_pretty(&job)?;
    let tmp_file = job_file.with_extension("json.tmp");
    tokio::fs::write(&tmp_file, &content).await?;
    tokio::fs::rename(&tmp_file, &job_file).await?;

    println!("Job {} marked as failed: {}", current_job.job_id, reason);

    Ok(())
}

/// Assign a job to a remote runner
pub async fn assign_job_to_runner(
    runner_registry: &RunnerRegistry,
    runner_id: uuid::Uuid,
    job: &CiJob,
    workflow_json: serde_json::Value,
    job_token: String,
) -> anyhow::Result<()> {
    let runner = runner_registry
        .get(runner_id)
        .ok_or_else(|| anyhow::anyhow!("Runner not found"))?;

    // Create job assignment
    let assignment = RemoteJobAssignment {
        job_id: job.id,
        owner: job.owner.clone(),
        project: job.project.clone(),
        branch: job.branch.clone(),
        commit_hash: job.commit_hash.clone(),
        workflow: workflow_json,
        job_token,
    };

    // Send job to runner
    let tx = {
        let mut runner = runner.write().await;
        runner.current_job = Some(CurrentJob {
            job_id: job.id,
            owner: job.owner.clone(),
            project: job.project.clone(),
        });
        runner.event_tx.clone()
    };

    tx.send(RunnerJobEvent::Job(assignment))
        .await
        .map_err(|_| anyhow::anyhow!("Failed to send job to runner"))?;

    println!("Assigned job {} to runner {}", job.id, runner_id);

    Ok(())
}

/// Find an available remote runner for a job
pub async fn find_available_runner(runner_registry: &RunnerRegistry) -> Option<uuid::Uuid> {
    // For now, find any idle runner
    // In the future, this could match based on workflow requirements
    runner_registry.find_idle_runner(None, None).await
}