Login
4 branches 0 tags
Ben (Desktop/Arch) Allow for tags in issues d1f00f7 12 days ago 249 Commits
rubhub / src / services / ci.rs
//! CI service
//!
//! Handles CI job execution triggered by repository events.

use std::path::{Path, PathBuf};
use std::process::Stdio;

use anyhow::{Context, Result};
use time::OffsetDateTime;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

use crate::GlobalState;
use crate::models::{
    CiJob, CiJobRequest, CiJobResult, CiJobResultResponse, CiJobStatus, CiRunStatusResponse,
    CiStepResult, CiStepResultResponse, CiWorkflow, WorkflowJob,
};
use crate::services::repository;
use rubhub_state::RepoEvent;

const WORKFLOW_DIR: &str = ".rubhub/workflows";
const PODMAN_IMAGE: &str = "nixos/nix:latest";

// Markers for step tracking - verbose to avoid false positives
const STEP_START_MARKER: &str = ">>>RUBHUB_STEP_START:";
const STEP_END_MARKER: &str = ">>>RUBHUB_STEP_END:";
const MARKER_SUFFIX: &str = "<<<";

/// Check if podman is available and working
pub async fn validate_podman() -> Result<String> {
    let output = Command::new("podman")
        .arg("--version")
        .output()
        .await
        .context("Failed to run podman")?;

    if !output.status.success() {
        anyhow::bail!("podman --version failed");
    }

    let version = String::from_utf8_lossy(&output.stdout).trim().to_string();
    Ok(version)
}

/// Spawn the CI event listener that watches for BranchUpdated events
pub fn spawn_ci_listener(state: GlobalState, job_tx: mpsc::Sender<CiJobRequest>) {
    let rx = state.event_tx.subscribe();

    tokio::spawn(async move {
        let mut stream = BroadcastStream::new(rx);

        while let Some(Ok(event)) = stream.next().await {
            if let RepoEvent::BranchUpdated { info, branch } = event
                && let Err(e) = handle_branch_update(
                    &state,
                    &info.owner,
                    &info.project,
                    &branch,
                    &info.commit_hash,
                    &job_tx,
                )
                .await
            {
                eprintln!(
                    "CI: Error handling branch update for {}/{}: {}",
                    info.owner, info.project, e
                );
            }
        }
    });
}

/// Handle a branch update event - check for workflows and queue jobs
async fn handle_branch_update(
    state: &GlobalState,
    owner: &str,
    project: &str,
    branch: &str,
    commit_hash: &str,
    job_tx: &mpsc::Sender<CiJobRequest>,
) -> Result<()> {
    // Try to list workflow files in .rubhub/workflows/
    let tree = match repository::get_git_tree(state, owner, project, branch, WORKFLOW_DIR).await {
        Ok(entries) => entries,
        Err(_) => return Ok(()), // No workflows directory, nothing to do
    };

    // Filter for .yaml/.yml files
    let workflow_files: Vec<_> = tree
        .iter()
        .filter(|e| e.filename.ends_with(".yaml") || e.filename.ends_with(".yml"))
        .collect();

    if workflow_files.is_empty() {
        return Ok(());
    }

    // Process each workflow file
    for entry in workflow_files {
        let file_path = format!("{}/{}", WORKFLOW_DIR, entry.filename);

        // Read workflow file
        let content =
            match repository::get_git_file(state, owner, project, branch, &file_path).await {
                Ok(bytes) => bytes,
                Err(e) => {
                    eprintln!("CI: Failed to read workflow file {}: {}", file_path, e);
                    continue;
                }
            };

        // Parse workflow
        let content_str = match String::from_utf8(content) {
            Ok(s) => s,
            Err(_) => {
                eprintln!("CI: Workflow file {} is not valid UTF-8", file_path);
                continue;
            }
        };

        let workflow: CiWorkflow = match serde_yaml::from_str(&content_str) {
            Ok(w) => w,
            Err(e) => {
                eprintln!("CI: Failed to parse workflow {}: {}", file_path, e);
                continue;
            }
        };

        // Check if workflow should run on this branch
        if !workflow.should_run_on_branch(branch) {
            continue;
        }

        // Queue job
        let request = CiJobRequest {
            owner: owner.to_string(),
            project: project.to_string(),
            branch: branch.to_string(),
            commit_hash: commit_hash.to_string(),
            workflow_name: workflow.name.clone(),
            workflow,
        };

        if let Err(e) = job_tx.send(request).await {
            eprintln!("CI: Failed to queue job: {}", e);
        }
    }

    Ok(())
}

/// Spawn the CI worker that processes jobs from the queue
pub fn spawn_ci_worker(state: GlobalState, mut rx: mpsc::Receiver<CiJobRequest>) {
    tokio::spawn(async move {
        while let Some(request) = rx.recv().await {
            let state = state.clone();
            tokio::spawn(async move {
                if let Err(e) = dispatch_job(&state, request).await {
                    eprintln!("CI: Job dispatch error: {}", e);
                }
            });
        }
    });
}

/// Dispatch a job to either a remote runner or local execution
async fn dispatch_job(state: &GlobalState, request: CiJobRequest) -> Result<()> {
    use crate::services::runner;

    // Check for available remote runners first (priority)
    if let Some(runner_id) = runner::find_available_runner(state).await {
        // Create the job record
        let ci_root = &state.config.ci_root;

        let mut run = CiJob::new(
            request.owner.clone(),
            request.project.clone(),
            request.workflow_name.clone(),
            request.branch.clone(),
            request.commit_hash.clone(),
        );

        // Initialize job results from workflow jobs
        run.jobs = request
            .workflow
            .jobs
            .iter()
            .map(|j| {
                let steps = j
                    .steps
                    .iter()
                    .map(|s| CiStepResult::new(s.name.clone()))
                    .collect();
                CiJobResult::new(j.name.clone(), steps)
            })
            .collect();

        // Generate job token for runner authentication
        let job_token = uuid::Uuid::new_v4().to_string();
        run.job_token = Some(job_token.clone());

        // Ensure run directory exists
        let run_dir = run.job_dir(ci_root);
        fs::create_dir_all(&run_dir).await?;

        // Create directories for each job's logs
        for workflow_job in &request.workflow.jobs {
            let job_result_dir = run.job_result_dir(ci_root, &workflow_job.name);
            fs::create_dir_all(&job_result_dir).await?;
        }

        // Save initial run state
        save_job(&run, ci_root).await?;

        // Serialize workflow to JSON for the runner
        let workflow_json = serde_json::to_value(&request.workflow)?;

        // Mark as running
        run.status = CiJobStatus::Running;
        run.started_at = Some(OffsetDateTime::now_utc());
        save_job(&run, ci_root).await?;

        // Assign to remote runner
        if let Err(e) =
            runner::assign_job_to_runner(state, runner_id, &run, workflow_json, job_token).await
        {
            eprintln!(
                "CI: Failed to assign job to runner: {}, falling back to local",
                e
            );
            // Clean up the job directory we created before falling back
            let _ = fs::remove_dir_all(&run_dir).await;
            return process_job(state, request).await;
        }

        println!(
            "CI: Dispatched run {} to remote runner {}",
            run.id, runner_id
        );
        return Ok(());
    }

    // No remote runners available, check for local podman
    if state
        .local_podman_ci_available
        .load(std::sync::atomic::Ordering::Relaxed)
    {
        return process_job(state, request).await;
    }

    // No CI backend available
    eprintln!(
        "CI: No CI backend available for {}/{} (no remote runners or local podman)",
        request.owner, request.project
    );
    Ok(())
}

/// Process a single CI run (multiple jobs, each with multiple steps)
async fn process_job(state: &GlobalState, request: CiJobRequest) -> Result<()> {
    let ci_root = &state.config.ci_root;

    // Create CI run record
    let mut run = CiJob::new(
        request.owner.clone(),
        request.project.clone(),
        request.workflow_name.clone(),
        request.branch.clone(),
        request.commit_hash.clone(),
    );

    // Initialize job results from workflow jobs
    run.jobs = request
        .workflow
        .jobs
        .iter()
        .map(|j| {
            let steps = j
                .steps
                .iter()
                .map(|s| CiStepResult::new(s.name.clone()))
                .collect();
            CiJobResult::new(j.name.clone(), steps)
        })
        .collect();

    // Ensure run directory exists
    let run_dir = run.job_dir(ci_root);
    fs::create_dir_all(&run_dir).await?;

    // Create directories for each job's logs
    for workflow_job in &request.workflow.jobs {
        let job_result_dir = run.job_result_dir(ci_root, &workflow_job.name);
        fs::create_dir_all(&job_result_dir).await?;
    }

    // Save initial run state
    save_job(&run, ci_root).await?;

    println!(
        "CI: Starting run {} for {}/{} ({}) with {} jobs",
        run.id,
        run.owner,
        run.project,
        run.workflow_name,
        run.jobs.len()
    );

    // Clone repo to temp directory
    let temp_dir = tempfile::tempdir().context("Failed to create temp directory")?;
    let work_dir = temp_dir.path();

    let repo_path = state
        .config
        .git_root
        .join(&request.owner)
        .join(&request.project);

    // Clone the repo (shallow clone of the specific branch)
    let clone_status = Command::new("git")
        .args(["clone", "--depth", "1", "--branch", &request.branch, "--"])
        .arg(&repo_path)
        .arg(work_dir)
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .status()
        .await
        .context("Failed to spawn git clone")?;

    if !clone_status.success() {
        run.status = CiJobStatus::Failed;
        run.finished_at = Some(OffsetDateTime::now_utc());
        // Mark all jobs as failed
        for job_result in &mut run.jobs {
            job_result.status = CiJobStatus::Failed;
        }
        save_job(&run, ci_root).await?;
        return Ok(());
    }

    // Update run status to running
    run.status = CiJobStatus::Running;
    run.started_at = Some(OffsetDateTime::now_utc());
    save_job(&run, ci_root).await?;

    // Execute jobs sequentially (each job = separate container)
    let mut overall_success = true;
    for (job_idx, workflow_job) in request.workflow.jobs.iter().enumerate() {
        // Mark this job as running
        run.jobs[job_idx].status = CiJobStatus::Running;
        run.jobs[job_idx].started_at = Some(OffsetDateTime::now_utc());
        save_job(&run, ci_root).await?;

        println!(
            "CI: Running job '{}' ({}/{})",
            workflow_job.name,
            job_idx + 1,
            request.workflow.jobs.len()
        );

        // Run all steps in a single container
        let exit_code = run_job_in_container(
            &mut run,
            job_idx,
            &request.workflow,
            workflow_job,
            work_dir,
            ci_root,
        )
        .await?;

        // Update job result
        run.jobs[job_idx].exit_code = Some(exit_code);
        run.jobs[job_idx].finished_at = Some(OffsetDateTime::now_utc());
        run.jobs[job_idx].status = if exit_code == 0 {
            CiJobStatus::Success
        } else {
            CiJobStatus::Failed
        };
        save_job(&run, ci_root).await?;

        if exit_code != 0 {
            overall_success = false;
            println!(
                "CI: Job '{}' failed with exit code {}",
                workflow_job.name, exit_code
            );
            break; // Stop on first failure
        }
    }

    // Update overall status
    run.status = if overall_success {
        CiJobStatus::Success
    } else {
        CiJobStatus::Failed
    };
    run.exit_code = run.jobs.iter().rev().filter_map(|j| j.exit_code).next();
    run.finished_at = Some(OffsetDateTime::now_utc());
    save_job(&run, ci_root).await?;

    println!("CI: Run {} finished with status {:?}", run.id, run.status);

    Ok(())
}

// Named volumes for nix - shared across all CI runs, users, and repos
const NIX_STORE_VOLUME: &str = "rubhub-nix-store";
const NIX_CACHE_VOLUME: &str = "rubhub-nix-cache";

/// Run a job (all its steps) in a single container
async fn run_job_in_container(
    run: &mut CiJob,
    job_idx: usize,
    workflow: &CiWorkflow,
    job: &WorkflowJob,
    work_dir: &Path,
    ci_root: &Path,
) -> Result<i32> {
    // Generate the job script with all steps
    let inner_script = generate_inner_script(job);

    // Write scripts to work directory
    let inner_script_path = work_dir.join(".rubhub-ci-inner.sh");
    let script_path = work_dir.join(".rubhub-ci-script.sh");
    std::fs::write(&inner_script_path, &inner_script)?;

    let script_content = if workflow.packages.is_empty() {
        // No packages - just run the inner script directly
        "#!/bin/sh\nsh /home/anon/workspace/.rubhub-ci-inner.sh\n".to_string()
    } else {
        // Wrap with nix-shell
        let packages = workflow.packages.join(" ");
        format!(
            "#!/bin/sh\nnix-shell -I nixpkgs=channel:nixpkgs-unstable --option sandbox false -p {} --run 'sh /home/anon/workspace/.rubhub-ci-inner.sh'\n",
            packages
        )
    };
    std::fs::write(&script_path, &script_content)?;

    // Run podman (30 min timeout)
    // Use named volumes for /nix and ~/.cache/nix to cache:
    // - /nix: the nix store (downloaded packages)
    // - ~/.cache/nix: channel expressions (nixexprs)
    let mut child = Command::new("podman")
        .args([
            "run",
            "--rm",
            "--timeout=1800",
            "-e",
            "HOME=/home/anon",
            "-e",
            "USER=anon",
            "-v",
            &format!("{}:/home/anon/workspace", work_dir.display()),
            "-v",
            &format!("{}:/nix", NIX_STORE_VOLUME),
            "-v",
            &format!("{}:/home/anon/.cache/nix", NIX_CACHE_VOLUME),
            "-w",
            "/home/anon/workspace",
            PODMAN_IMAGE,
            "sh",
            "/home/anon/workspace/.rubhub-ci-script.sh",
        ])
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .context("Failed to spawn podman")?;

    // Get stdout and stderr handles
    let stdout = child.stdout.take().context("Failed to get stdout")?;
    let stderr = child.stderr.take().context("Failed to get stderr")?;

    let log_path = run.job_log_file(ci_root, &job.name);

    // Stream both stdout and stderr to a single file, parsing step markers from stderr
    let run_clone = run.clone();
    let ci_root_clone = ci_root.to_path_buf();
    let stream_task = tokio::spawn(stream_combined_with_markers(
        stdout,
        stderr,
        log_path,
        run_clone,
        job_idx,
        ci_root_clone,
    ));

    // Wait for process to complete
    let status = child.wait().await.context("Failed to wait for podman")?;

    // Wait for log streaming to finish and get updated run state
    if let Ok(updated_run) = stream_task.await {
        *run = updated_run;
    }

    Ok(status.code().unwrap_or(1))
}

/// Generate the inner script that runs all steps with markers
fn generate_inner_script(job: &WorkflowJob) -> String {
    let mut script = String::new();
    script.push_str("#!/bin/sh\n\n");

    for step in &job.steps {
        // Emit start marker to stderr
        script.push_str(&format!(
            "echo '{}{}{}' >&2\n",
            STEP_START_MARKER, step.name, MARKER_SUFFIX
        ));

        // Run the step in a subshell to capture exit code
        script.push_str("(\n");
        script.push_str(&step.run);
        if !step.run.ends_with('\n') {
            script.push('\n');
        }
        script.push_str(")\n");
        script.push_str("__exit=$?\n");

        // Emit end marker with exit code to stderr
        script.push_str(&format!(
            "echo '{}{}:'\"$__exit\"'{}' >&2\n",
            STEP_END_MARKER, step.name, MARKER_SUFFIX
        ));

        // Exit if step failed
        script.push_str("[ $__exit -ne 0 ] && exit $__exit\n\n");
    }

    // Explicit success exit - without this, the script would exit with 1
    // because the last `[ $__exit -ne 0 ]` test fails when __exit is 0
    script.push_str("exit 0\n");

    script
}

/// Stream both stdout and stderr to a single file, parsing step markers from stderr
async fn stream_combined_with_markers(
    stdout: impl tokio::io::AsyncRead + Unpin,
    stderr: impl tokio::io::AsyncRead + Unpin,
    path: PathBuf,
    mut run: CiJob,
    job_idx: usize,
    ci_root: PathBuf,
) -> CiJob {
    let mut stdout_lines = BufReader::new(stdout).lines();
    let mut stderr_lines = BufReader::new(stderr).lines();
    let Ok(mut file) = fs::File::create(&path).await else {
        return run;
    };

    // Track when each stream is done
    let mut stdout_done = false;
    let mut stderr_done = false;

    loop {
        if stdout_done && stderr_done {
            break;
        }

        tokio::select! {
            result = stdout_lines.next_line(), if !stdout_done => {
                match result {
                    Ok(Some(line)) => {
                        let _ = file.write_all(line.as_bytes()).await;
                        let _ = file.write_all(b"\n").await;
                    }
                    _ => stdout_done = true,
                }
            }
            result = stderr_lines.next_line(), if !stderr_done => {
                match result {
                    Ok(Some(line)) => {
                        // Check for step markers
                        if line.starts_with(STEP_START_MARKER) && line.ends_with(MARKER_SUFFIX) {
                            let step_name = &line[STEP_START_MARKER.len()..line.len() - MARKER_SUFFIX.len()];
                            if let Some(step) = run.jobs[job_idx]
                                .steps
                                .iter_mut()
                                .find(|s| s.name == step_name)
                            {
                                step.status = CiJobStatus::Running;
                                if let Err(e) = save_job(&run, &ci_root).await {
                                    eprintln!("CI: Failed to save job state: {}", e);
                                }
                            }
                        } else if line.starts_with(STEP_END_MARKER) && line.ends_with(MARKER_SUFFIX) {
                            let content = &line[STEP_END_MARKER.len()..line.len() - MARKER_SUFFIX.len()];
                            if let Some((step_name, exit_code_str)) = content.rsplit_once(':') {
                                let exit_code = exit_code_str.parse::<i32>().unwrap_or(1);
                                if let Some(step) = run.jobs[job_idx]
                                    .steps
                                    .iter_mut()
                                    .find(|s| s.name == step_name)
                                {
                                    step.exit_code = Some(exit_code);
                                    step.status = if exit_code == 0 {
                                        CiJobStatus::Success
                                    } else {
                                        CiJobStatus::Failed
                                    };
                                    if let Err(e) = save_job(&run, &ci_root).await {
                                        eprintln!("CI: Failed to save job state: {}", e);
                                    }
                                }
                            }
                        } else {
                            // Regular stderr line - write to file
                            let _ = file.write_all(line.as_bytes()).await;
                            let _ = file.write_all(b"\n").await;
                        }
                    }
                    _ => stderr_done = true,
                }
            }
        }
    }

    run
}

/// Save job metadata to disk atomically (write to temp, then rename)
async fn save_job(job: &CiJob, ci_root: &Path) -> Result<()> {
    let job_file = job.job_file(ci_root);
    let tmp_file = job_file.with_extension("json.tmp");
    let content = serde_json::to_string_pretty(job)?;
    fs::write(&tmp_file, &content).await?;
    fs::rename(&tmp_file, &job_file).await?;
    Ok(())
}

/// Load a job from disk
pub async fn load_job(ci_root: &Path, owner: &str, project: &str, job_id: &str) -> Result<CiJob> {
    let job_file = ci_root
        .join(owner)
        .join(project)
        .join(job_id)
        .join("job.json");

    let content = fs::read_to_string(&job_file)
        .await
        .with_context(|| format!("Failed to read job file: {}", job_file.display()))?;

    let job: CiJob = serde_json::from_str(&content)?;
    Ok(job)
}

/// Load the log file for a specific job within a CI run
pub async fn load_job_log(
    ci_root: &Path,
    owner: &str,
    project: &str,
    run_id: &str,
    job_name: &str,
) -> String {
    let job_dir = ci_root
        .join(owner)
        .join(project)
        .join(run_id)
        .join("jobs")
        .join(job_name);

    fs::read_to_string(job_dir.join("output.log"))
        .await
        .unwrap_or_default()
}

/// Load a CI run with all job logs (for JSON API response)
pub async fn load_run_with_all_logs(
    ci_root: &Path,
    owner: &str,
    project: &str,
    run_id: &str,
) -> Result<CiRunStatusResponse> {
    let run = load_job(ci_root, owner, project, run_id).await?;

    let mut job_responses = Vec::new();
    for job_result in &run.jobs {
        let log = load_job_log(ci_root, owner, project, run_id, &job_result.name).await;

        let step_responses: Vec<CiStepResultResponse> = job_result
            .steps
            .iter()
            .map(|s| CiStepResultResponse {
                name: s.name.clone(),
                status: s.status.as_str().to_string(),
                exit_code: s.exit_code,
            })
            .collect();

        job_responses.push(CiJobResultResponse {
            name: job_result.name.clone(),
            status: job_result.status.as_str().to_string(),
            exit_code: job_result.exit_code,
            steps: step_responses,
            log,
        });
    }

    Ok(CiRunStatusResponse {
        id: run.id.to_string(),
        status: run.status.as_str().to_string(),
        jobs: job_responses,
    })
}

/// List all jobs for a project, sorted by creation time (newest first)
pub async fn list_jobs(ci_root: &Path, owner: &str, project: &str) -> Result<Vec<CiJob>> {
    let project_dir = ci_root.join(owner).join(project);

    if !project_dir.exists() {
        return Ok(vec![]);
    }

    let mut jobs = Vec::new();
    let mut entries = fs::read_dir(&project_dir).await?;

    while let Some(entry) = entries.next_entry().await? {
        let path = entry.path();
        if path.is_dir() {
            let job_file = path.join("job.json");
            if job_file.exists()
                && let Ok(content) = fs::read_to_string(&job_file).await
                && let Ok(job) = serde_json::from_str::<CiJob>(&content)
            {
                jobs.push(job);
            }
        }
    }

    // Sort by created_at descending (newest first)
    jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));

    Ok(jobs)
}