Login
4 branches 0 tags
Ben (Desktop/Arch) Fixed CI icon 1fa3c4d 16 days ago 236 Commits
rubhub / src / services / ci.rs
//! CI service
//!
//! Handles CI job execution triggered by repository events.

use std::path::{Path, PathBuf};
use std::process::Stdio;

use anyhow::{Context, Result};
use time::OffsetDateTime;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

use crate::GlobalState;
use crate::models::{CiJob, CiJobRequest, CiJobStatus, CiWorkflow};
use crate::services::repository;
use rubhub_state::RepoEvent;

const WORKFLOW_DIR: &str = ".rubhub/workflows";
const PODMAN_IMAGE: &str = "nixos/nix:latest";

/// Spawn the CI event listener that watches for BranchUpdated events
pub fn spawn_ci_listener(state: GlobalState, job_tx: mpsc::Sender<CiJobRequest>) {
    let rx = state.event_tx.subscribe();

    tokio::spawn(async move {
        let mut stream = BroadcastStream::new(rx);

        while let Some(Ok(event)) = stream.next().await {
            if let RepoEvent::BranchUpdated { info, branch } = event
                && let Err(e) = handle_branch_update(
                    &state,
                    &info.owner,
                    &info.project,
                    &branch,
                    &info.commit_hash,
                    &job_tx,
                )
                .await
            {
                eprintln!(
                    "CI: Error handling branch update for {}/{}: {}",
                    info.owner, info.project, e
                );
            }
        }
    });
}

/// Handle a branch update event - check for workflows and queue jobs
async fn handle_branch_update(
    state: &GlobalState,
    owner: &str,
    project: &str,
    branch: &str,
    commit_hash: &str,
    job_tx: &mpsc::Sender<CiJobRequest>,
) -> Result<()> {
    // Try to list workflow files in .rubhub/workflows/
    let tree = match repository::get_git_tree(state, owner, project, branch, WORKFLOW_DIR).await {
        Ok(entries) => entries,
        Err(_) => return Ok(()), // No workflows directory, nothing to do
    };

    // Filter for .yaml/.yml files
    let workflow_files: Vec<_> = tree
        .iter()
        .filter(|e| e.filename.ends_with(".yaml") || e.filename.ends_with(".yml"))
        .collect();

    if workflow_files.is_empty() {
        return Ok(());
    }

    // Process each workflow file
    for entry in workflow_files {
        let file_path = format!("{}/{}", WORKFLOW_DIR, entry.filename);

        // Read workflow file
        let content =
            match repository::get_git_file(state, owner, project, branch, &file_path).await {
                Ok(bytes) => bytes,
                Err(e) => {
                    eprintln!("CI: Failed to read workflow file {}: {}", file_path, e);
                    continue;
                }
            };

        // Parse workflow
        let content_str = match String::from_utf8(content) {
            Ok(s) => s,
            Err(_) => {
                eprintln!("CI: Workflow file {} is not valid UTF-8", file_path);
                continue;
            }
        };

        let workflow: CiWorkflow = match serde_yaml::from_str(&content_str) {
            Ok(w) => w,
            Err(e) => {
                eprintln!("CI: Failed to parse workflow {}: {}", file_path, e);
                continue;
            }
        };

        // Check if workflow should run on this branch
        if !workflow.should_run_on_branch(branch) {
            continue;
        }

        // Queue job
        let request = CiJobRequest {
            owner: owner.to_string(),
            project: project.to_string(),
            branch: branch.to_string(),
            commit_hash: commit_hash.to_string(),
            workflow_name: workflow.name.clone(),
            workflow,
        };

        if let Err(e) = job_tx.send(request).await {
            eprintln!("CI: Failed to queue job: {}", e);
        }
    }

    Ok(())
}

/// Spawn the CI worker that processes jobs from the queue
pub fn spawn_ci_worker(state: GlobalState, mut rx: mpsc::Receiver<CiJobRequest>) {
    tokio::spawn(async move {
        while let Some(request) = rx.recv().await {
            let state = state.clone();
            tokio::spawn(async move {
                if let Err(e) = process_job(&state, request).await {
                    eprintln!("CI: Job execution error: {}", e);
                }
            });
        }
    });
}

/// Process a single CI job
async fn process_job(state: &GlobalState, request: CiJobRequest) -> Result<()> {
    let ci_root = &state.config.ci_root;

    // Create job record
    let mut job = CiJob::new(
        request.owner.clone(),
        request.project.clone(),
        request.workflow_name.clone(),
        request.branch.clone(),
        request.commit_hash.clone(),
    );

    // Ensure job directory exists
    let job_dir = job.job_dir(ci_root);
    fs::create_dir_all(&job_dir).await?;

    // Save initial job state
    save_job(&job, ci_root).await?;

    println!(
        "CI: Starting job {} for {}/{} ({})",
        job.id, job.owner, job.project, job.workflow_name
    );

    // Clone repo to temp directory
    let temp_dir = tempfile::tempdir().context("Failed to create temp directory")?;
    let work_dir = temp_dir.path();

    let repo_path = state
        .config
        .git_root
        .join(&request.owner)
        .join(&request.project);

    // Clone the repo (shallow clone of the specific branch)
    let clone_status = Command::new("git")
        .args(["clone", "--depth", "1", "--branch", &request.branch, "--"])
        .arg(&repo_path)
        .arg(work_dir)
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .status()
        .await
        .context("Failed to spawn git clone")?;

    if !clone_status.success() {
        job.status = CiJobStatus::Failed;
        job.finished_at = Some(OffsetDateTime::now_utc());
        save_job(&job, ci_root).await?;

        // Write error to stderr log
        fs::write(job.stderr_file(ci_root), "Failed to clone repository\n").await?;

        return Ok(());
    }

    // Update job status to running
    job.status = CiJobStatus::Running;
    job.started_at = Some(OffsetDateTime::now_utc());
    save_job(&job, ci_root).await?;

    // Run in podman
    let exit_code = run_in_podman(&job, &request.workflow, work_dir, ci_root).await?;

    // Update final status
    job.status = if exit_code == 0 {
        CiJobStatus::Success
    } else {
        CiJobStatus::Failed
    };
    job.exit_code = Some(exit_code);
    job.finished_at = Some(OffsetDateTime::now_utc());
    save_job(&job, ci_root).await?;

    println!(
        "CI: Job {} finished with status {:?} (exit code: {})",
        job.id, job.status, exit_code
    );

    Ok(())
}

/// Run a workflow script in a podman container
async fn run_in_podman(
    job: &CiJob,
    workflow: &CiWorkflow,
    work_dir: &Path,
    ci_root: &Path,
) -> Result<i32> {
    // Build the shell command
    // We write the script to a temp file to avoid shell escaping issues
    let script_file = work_dir.join(".rubhub-ci-script.sh");
    std::fs::write(&script_file, &workflow.script)?;

    let shell_cmd = if workflow.packages.is_empty() {
        // No packages, just run the script directly
        "sh /home/anon/workspace/.rubhub-ci-script.sh".to_string()
    } else {
        // Use nix-shell to provide packages
        // -I nixpkgs=... points to unstable for newer packages
        // --option sandbox false is needed because nix sandboxing doesn't work well inside containers
        let packages = workflow.packages.join(" ");
        format!(
            "nix-shell -I nixpkgs=channel:nixpkgs-unstable --option sandbox false -p {} --run 'sh /home/anon/workspace/.rubhub-ci-script.sh'",
            packages
        )
    };

    // Run podman (30 min timeout, container deleted after run)
    let mut child = Command::new("podman")
        .args([
            "run",
            "--rm",
            "--timeout=1800",
            "-e",
            "HOME=/home/anon",
            "-e",
            "USER=anon",
            "-v",
            &format!("{}:/home/anon/workspace", work_dir.display()),
            "-w",
            "/home/anon/workspace",
            PODMAN_IMAGE,
            "sh",
            "-c",
            &shell_cmd,
        ])
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .context("Failed to spawn podman")?;

    // Stream stdout and stderr to separate files as they happen
    let stdout = child.stdout.take().context("Failed to get stdout")?;
    let stderr = child.stderr.take().context("Failed to get stderr")?;

    let stdout_path = job.stdout_file(ci_root);
    let stderr_path = job.stderr_file(ci_root);

    let stdout_task = tokio::spawn(stream_to_file(stdout, stdout_path));
    let stderr_task = tokio::spawn(stream_to_file(stderr, stderr_path));

    // Wait for process to complete
    let status = child.wait().await.context("Failed to wait for podman")?;

    // Wait for log streaming to finish
    let _ = stdout_task.await;
    let _ = stderr_task.await;

    Ok(status.code().unwrap_or(1))
}

/// Stream reader content to a file line by line
async fn stream_to_file<R: tokio::io::AsyncRead + Unpin>(reader: R, path: PathBuf) {
    let mut lines = BufReader::new(reader).lines();
    let Ok(mut file) = fs::File::create(&path).await else {
        return;
    };

    while let Ok(Some(line)) = lines.next_line().await {
        let _ = file.write_all(line.as_bytes()).await;
        let _ = file.write_all(b"\n").await;
    }
}

/// Save job metadata to disk
async fn save_job(job: &CiJob, ci_root: &Path) -> Result<()> {
    let job_file = job.job_file(ci_root);
    let content = serde_json::to_string_pretty(job)?;
    fs::write(&job_file, content).await?;
    Ok(())
}

/// Load a job from disk
pub async fn load_job(ci_root: &Path, owner: &str, project: &str, job_id: &str) -> Result<CiJob> {
    let job_file = ci_root
        .join(owner)
        .join(project)
        .join(job_id)
        .join("job.json");

    let content = fs::read_to_string(&job_file)
        .await
        .with_context(|| format!("Failed to read job file: {}", job_file.display()))?;

    let job: CiJob = serde_json::from_str(&content)?;
    Ok(job)
}

/// Load stdout and stderr logs for a job
pub async fn load_job_logs(
    ci_root: &Path,
    owner: &str,
    project: &str,
    job_id: &str,
) -> (String, String) {
    let job_dir = ci_root.join(owner).join(project).join(job_id);

    let stdout = fs::read_to_string(job_dir.join("stdout.log"))
        .await
        .unwrap_or_default();

    let stderr = fs::read_to_string(job_dir.join("stderr.log"))
        .await
        .unwrap_or_default();

    (stdout, stderr)
}

/// List all jobs for a project, sorted by creation time (newest first)
pub async fn list_jobs(ci_root: &Path, owner: &str, project: &str) -> Result<Vec<CiJob>> {
    let project_dir = ci_root.join(owner).join(project);

    if !project_dir.exists() {
        return Ok(vec![]);
    }

    let mut jobs = Vec::new();
    let mut entries = fs::read_dir(&project_dir).await?;

    while let Some(entry) = entries.next_entry().await? {
        let path = entry.path();
        if path.is_dir() {
            let job_file = path.join("job.json");
            if job_file.exists()
                && let Ok(content) = fs::read_to_string(&job_file).await
                && let Ok(job) = serde_json::from_str::<CiJob>(&content)
            {
                jobs.push(job);
            }
        }
    }

    // Sort by created_at descending (newest first)
    jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));

    Ok(jobs)
}