Login
4 branches 0 tags
Ben (T14/NixOS) Improved flake 41eb128 11 days ago 252 Commits
rubhub / crates / ci / src / executor / mod.rs
//! CI job execution
//!
//! Handles dispatching jobs to local or remote executors.

pub mod local;
pub mod remote;

use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use anyhow::Result;
use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::mpsc;

use crate::models::{CiJob, CiJobRequest, CiJobResult, CiJobStatus, CiStepResult};
use crate::storage::save_job;
use rubhub_state::RunnerRegistry;

/// Configuration for the CI executor
#[derive(Clone)]
pub struct ExecutorConfig {
    pub ci_root: Arc<Path>,
    pub git_root: Arc<Path>,
    pub local_podman_available: Arc<AtomicBool>,
    pub runner_registry: Arc<RunnerRegistry>,
}

/// Spawn the CI worker that processes jobs from the queue
pub fn spawn_worker(config: ExecutorConfig, mut rx: mpsc::Receiver<CiJobRequest>) {
    tokio::spawn(async move {
        while let Some(request) = rx.recv().await {
            let config = config.clone();
            tokio::spawn(async move {
                if let Err(e) = dispatch_job(&config, request).await {
                    eprintln!("CI: Job dispatch error: {}", e);
                }
            });
        }
    });
}

/// Dispatch a job to either a remote runner or local execution
async fn dispatch_job(config: &ExecutorConfig, request: CiJobRequest) -> Result<()> {
    // Check for available remote runners first (priority)
    if let Some(runner_id) = remote::find_available_runner(&config.runner_registry).await {
        // Create the job record
        let ci_root = &config.ci_root;

        let mut run = CiJob::new(
            request.owner.clone(),
            request.project.clone(),
            request.workflow_name.clone(),
            request.branch.clone(),
            request.commit_hash.clone(),
        );

        // Initialize job results from workflow jobs
        run.jobs = request
            .workflow
            .jobs
            .iter()
            .map(|j| {
                let steps = j
                    .steps
                    .iter()
                    .map(|s| CiStepResult::new(s.name.clone()))
                    .collect();
                CiJobResult::new(j.name.clone(), steps)
            })
            .collect();

        // Generate job token for runner authentication
        let job_token = uuid::Uuid::new_v4().to_string();
        run.job_token = Some(job_token.clone());

        // Ensure run directory exists
        let run_dir = run.job_dir(ci_root);
        fs::create_dir_all(&run_dir).await?;

        // Create directories for each job's logs
        for workflow_job in &request.workflow.jobs {
            let job_result_dir = run.job_result_dir(ci_root, &workflow_job.name);
            fs::create_dir_all(&job_result_dir).await?;
        }

        // Save initial run state
        save_job(&run, ci_root).await?;

        // Serialize workflow to JSON for the runner
        let workflow_json = serde_json::to_value(&request.workflow)?;

        // Mark as running
        run.status = CiJobStatus::Running;
        run.started_at = Some(OffsetDateTime::now_utc());
        save_job(&run, ci_root).await?;

        // Assign to remote runner
        if let Err(e) = remote::assign_job_to_runner(
            &config.runner_registry,
            runner_id,
            &run,
            workflow_json,
            job_token,
        )
        .await
        {
            eprintln!(
                "CI: Failed to assign job to runner: {}, falling back to local",
                e
            );
            // Clean up the job directory we created before falling back
            let _ = fs::remove_dir_all(&run_dir).await;
            return local::process_job(config, request).await;
        }

        println!(
            "CI: Dispatched run {} to remote runner {}",
            run.id, runner_id
        );
        return Ok(());
    }

    // No remote runners available, check for local podman
    if config.local_podman_available.load(Ordering::Relaxed) {
        return local::process_job(config, request).await;
    }

    // No CI backend available
    eprintln!(
        "CI: No CI backend available for {}/{} (no remote runners or local podman)",
        request.owner, request.project
    );
    Ok(())
}