Login
4 branches 0 tags
Ben (T14/NixOS) Improved flake 41eb128 12 days ago 252 Commits
rubhub / crates / ci / src / runner_client.rs
//! Runner client service
//!
//! Connects to a master RubHub instance to receive and execute CI jobs.

use std::path::Path;
use std::process::Stdio;
use std::time::Duration;

use anyhow::{Context, Result};
use base64::Engine;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use uuid::Uuid;

use crate::executor::local::generate_inner_script;
use crate::models::{CiWorkflow, WorkflowJob};
use rubhub_state::{RemoteJobAssignment, RunnerClientConfig, RunnerTags};

const HEARTBEAT_INTERVAL_SECS: u64 = 30;
const RECONNECT_DELAY_SECS: u64 = 5;
const MAX_RECONNECT_DELAY_SECS: u64 = 60;

// Markers for step tracking (same as in local.rs)
const STEP_START_MARKER: &str = ">>>RUBHUB_STEP_START:";
const STEP_END_MARKER: &str = ">>>RUBHUB_STEP_END:";
const MARKER_SUFFIX: &str = "<<<";

const PODMAN_IMAGE: &str = "nixos/nix:latest";
const NIX_STORE_VOLUME: &str = "rubhub-nix-store";
const NIX_CACHE_VOLUME: &str = "rubhub-nix-cache";

/// Start the runner client that connects to a master instance
pub fn spawn_runner_client(config: RunnerClientConfig) {
    tokio::spawn(async move {
        let mut reconnect_delay = RECONNECT_DELAY_SECS;

        loop {
            println!(
                "Runner client: Connecting to master at {}",
                config.master_url
            );

            match run_runner_client(&config).await {
                Ok(()) => {
                    println!("Runner client: Disconnected from master");
                    reconnect_delay = RECONNECT_DELAY_SECS; // Reset delay on clean disconnect
                }
                Err(e) => {
                    eprintln!("Runner client: Error: {}", e);
                }
            }

            // Wait before reconnecting with exponential backoff
            println!(
                "Runner client: Reconnecting in {} seconds...",
                reconnect_delay
            );
            tokio::time::sleep(Duration::from_secs(reconnect_delay)).await;
            reconnect_delay = (reconnect_delay * 2).min(MAX_RECONNECT_DELAY_SECS);
        }
    });
}

/// Run the runner client connection loop
async fn run_runner_client(config: &RunnerClientConfig) -> Result<()> {
    let client = Client::builder()
        .timeout(Duration::from_secs(0)) // No timeout for SSE
        .build()?;

    // Build tags for this runner
    let tags = RunnerTags::current();

    // Connect to SSE endpoint
    let connect_url = format!(
        "{}/.runners/connect?os={}&arch={}&tags={}",
        config.master_url,
        tags.os,
        tags.arch,
        tags.extra.join(",")
    );

    let response = client
        .get(&connect_url)
        .header("Authorization", format!("Bearer {}", config.token))
        .send()
        .await
        .context("Failed to connect to master")?;

    if !response.status().is_success() {
        anyhow::bail!(
            "Master rejected connection: {} {}",
            response.status(),
            response.text().await.unwrap_or_default()
        );
    }

    println!(
        "Runner client: Connected to master (os={}, arch={})",
        tags.os, tags.arch
    );

    // Spawn heartbeat task
    let heartbeat_client = client.clone();
    let heartbeat_url = format!("{}/.runners/heartbeat", config.master_url);
    let heartbeat_token = config.token.clone();
    let heartbeat_handle = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
        loop {
            interval.tick().await;
            let _ = send_heartbeat(&heartbeat_client, &heartbeat_url, &heartbeat_token).await;
        }
    });

    // Process SSE events
    let mut buffer = String::new();
    let mut bytes_stream = response.bytes_stream();

    use futures::StreamExt;
    while let Some(chunk) = bytes_stream.next().await {
        let chunk = chunk.context("Error reading SSE stream")?;
        buffer.push_str(&String::from_utf8_lossy(&chunk));

        // Process complete SSE messages
        while let Some(pos) = buffer.find("\n\n") {
            let message = buffer[..pos].to_string();
            buffer = buffer[pos + 2..].to_string();

            if let Some(data) = message.strip_prefix("data: ")
                && let Err(e) = handle_sse_event(config, &client, data).await
            {
                eprintln!("Runner client: Error handling event: {}", e);
            }
        }
    }

    // Clean up heartbeat task
    heartbeat_handle.abort();

    Ok(())
}

/// Handle an SSE event from the master
async fn handle_sse_event(
    config: &RunnerClientConfig,
    client: &Client,
    data: &str,
) -> Result<()> {
    #[derive(Deserialize)]
    #[serde(tag = "type")]
    enum Event {
        Heartbeat {
            #[allow(dead_code)]
            timestamp: String,
        },
        Job(RemoteJobAssignment),
        Cancel {
            job_id: Uuid,
        },
    }

    let event: Event = serde_json::from_str(data).context("Failed to parse SSE event")?;

    match event {
        Event::Heartbeat { .. } => {
            // Server heartbeat, just acknowledge
        }
        Event::Job(assignment) => {
            println!(
                "Runner client: Received job {} for {}/{}",
                assignment.job_id, assignment.owner, assignment.project
            );
            if let Err(e) = execute_job(config, client, assignment).await {
                eprintln!("Runner client: Job execution error: {}", e);
            }
        }
        Event::Cancel { job_id } => {
            println!("Runner client: Received cancel for job {}", job_id);
            // TODO: Implement job cancellation
        }
    }

    Ok(())
}

/// Execute a job received from the master
async fn execute_job(
    config: &RunnerClientConfig,
    client: &Client,
    assignment: RemoteJobAssignment,
) -> Result<()> {
    let job_id = assignment.job_id;

    // Parse the workflow from JSON
    let workflow: CiWorkflow =
        serde_json::from_value(assignment.workflow).context("Failed to parse workflow")?;

    // Download source tarball
    let source_url = format!(
        "{}/.runners/jobs/{}/{}/{}/source.tar.gz",
        config.master_url, assignment.owner, assignment.project, job_id
    );

    println!("Runner client: Downloading source for job {}", job_id);

    let response = client
        .get(&source_url)
        .header("Authorization", format!("Bearer {}", config.token))
        .send()
        .await
        .context("Failed to download source")?;

    if !response.status().is_success() {
        anyhow::bail!("Failed to download source: {}", response.status());
    }

    let tarball = response.bytes().await.context("Failed to read tarball")?;

    // Extract to temp directory
    let temp_dir = tempfile::tempdir().context("Failed to create temp directory")?;
    let work_dir = temp_dir.path();

    // Extract tarball
    let tar_gz = flate2::read::GzDecoder::new(&tarball[..]);
    let mut archive = tar::Archive::new(tar_gz);
    archive
        .unpack(work_dir)
        .context("Failed to extract tarball")?;

    println!(
        "Runner client: Executing job {} ({} jobs)",
        job_id,
        workflow.jobs.len()
    );

    // Execute each job in the workflow
    let mut overall_success = true;
    let mut job_results = Vec::new();

    for workflow_job in &workflow.jobs {
        println!("Runner client: Running job '{}'", workflow_job.name);

        let (exit_code, step_results) = run_job_in_container(
            config,
            client,
            job_id,
            &assignment.owner,
            &assignment.project,
            &assignment.job_token,
            &workflow,
            workflow_job,
            work_dir,
        )
        .await?;

        let job_success = exit_code == 0;
        job_results.push(JobResultUpdate {
            name: workflow_job.name.clone(),
            exit_code: Some(exit_code),
            steps: step_results,
        });

        if !job_success {
            overall_success = false;
            println!(
                "Runner client: Job '{}' failed with exit code {}",
                workflow_job.name, exit_code
            );
            break; // Stop on first failure
        }
    }

    // Report completion to master
    let complete_url = format!(
        "{}/.runners/jobs/{}/{}/{}/complete",
        config.master_url, assignment.owner, assignment.project, job_id
    );

    let complete_request = JobCompleteRequest {
        success: overall_success,
        exit_code: job_results.last().and_then(|j| j.exit_code),
        jobs: job_results,
        job_token: assignment.job_token.clone(),
    };

    let response = client
        .post(&complete_url)
        .header("Authorization", format!("Bearer {}", config.token))
        .json(&complete_request)
        .send()
        .await
        .context("Failed to report completion")?;

    if !response.status().is_success() {
        eprintln!(
            "Runner client: Failed to report completion: {}",
            response.status()
        );
    }

    println!(
        "Runner client: Job {} completed (success={})",
        job_id, overall_success
    );

    Ok(())
}

/// Run a single job in a container and stream logs back
async fn run_job_in_container(
    config: &RunnerClientConfig,
    client: &Client,
    job_id: Uuid,
    owner: &str,
    project: &str,
    job_token: &str,
    workflow: &CiWorkflow,
    job: &WorkflowJob,
    work_dir: &Path,
) -> Result<(i32, Vec<StepResultUpdate>)> {
    // Generate the job script with all steps
    let inner_script = generate_inner_script(job);

    // Write scripts to work directory
    let inner_script_path = work_dir.join(".rubhub-ci-inner.sh");
    let script_path = work_dir.join(".rubhub-ci-script.sh");
    std::fs::write(&inner_script_path, &inner_script)?;

    let script_content = if workflow.packages.is_empty() {
        "#!/bin/sh\nsh /home/anon/workspace/.rubhub-ci-inner.sh\n".to_string()
    } else {
        let packages = workflow.packages.join(" ");
        format!(
            "#!/bin/sh\nnix-shell -I nixpkgs=channel:nixpkgs-unstable --option sandbox false -p {} --run 'sh /home/anon/workspace/.rubhub-ci-inner.sh'\n",
            packages
        )
    };
    std::fs::write(&script_path, &script_content)?;

    // Run podman
    let mut child = Command::new("podman")
        .args([
            "run",
            "--rm",
            "--timeout=1800",
            "-e",
            "HOME=/home/anon",
            "-e",
            "USER=anon",
            "-v",
            &format!("{}:/home/anon/workspace", work_dir.display()),
            "-v",
            &format!("{}:/nix", NIX_STORE_VOLUME),
            "-v",
            &format!("{}:/home/anon/.cache/nix", NIX_CACHE_VOLUME),
            "-w",
            "/home/anon/workspace",
            PODMAN_IMAGE,
            "sh",
            "/home/anon/workspace/.rubhub-ci-script.sh",
        ])
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .context("Failed to spawn podman")?;

    let stdout = child.stdout.take().context("Failed to get stdout")?;
    let stderr = child.stderr.take().context("Failed to get stderr")?;

    // Stream logs and parse step markers
    let step_results = stream_logs_to_master(
        config, client, job_id, owner, project, &job.name, job_token, job, stdout, stderr,
    )
    .await?;

    let status = child.wait().await.context("Failed to wait for podman")?;
    let exit_code = status.code().unwrap_or(1);

    Ok((exit_code, step_results))
}

/// Stream logs to master and parse step markers
async fn stream_logs_to_master(
    config: &RunnerClientConfig,
    client: &Client,
    job_id: Uuid,
    owner: &str,
    project: &str,
    job_name: &str,
    job_token: &str,
    job: &WorkflowJob,
    stdout: impl tokio::io::AsyncRead + Unpin,
    stderr: impl tokio::io::AsyncRead + Unpin,
) -> Result<Vec<StepResultUpdate>> {
    let mut stdout_lines = BufReader::new(stdout).lines();
    let mut stderr_lines = BufReader::new(stderr).lines();
    let mut stdout_done = false;
    let mut stderr_done = false;

    let mut log_buffer = Vec::new();
    let mut step_results: Vec<StepResultUpdate> = job
        .steps
        .iter()
        .map(|s| StepResultUpdate {
            name: s.name.clone(),
            exit_code: None,
        })
        .collect();

    let log_url = format!(
        "{}/.runners/jobs/{}/{}/{}/log",
        config.master_url, owner, project, job_id
    );

    // Flush logs periodically
    let mut last_flush = std::time::Instant::now();
    let flush_interval = Duration::from_millis(500);

    loop {
        if stdout_done && stderr_done {
            break;
        }

        tokio::select! {
            result = stdout_lines.next_line(), if !stdout_done => {
                match result {
                    Ok(Some(line)) => {
                        log_buffer.extend_from_slice(line.as_bytes());
                        log_buffer.push(b'\n');
                    }
                    _ => stdout_done = true,
                }
            }
            result = stderr_lines.next_line(), if !stderr_done => {
                match result {
                    Ok(Some(line)) => {
                        // Check for step markers
                        if line.starts_with(STEP_START_MARKER) && line.ends_with(MARKER_SUFFIX) {
                            // Step start marker - don't write to log
                        } else if line.starts_with(STEP_END_MARKER) && line.ends_with(MARKER_SUFFIX) {
                            // Step end marker - parse exit code
                            let content = &line[STEP_END_MARKER.len()..line.len() - MARKER_SUFFIX.len()];
                            if let Some((step_name, exit_code_str)) = content.rsplit_once(':') {
                                let exit_code = exit_code_str.parse::<i32>().unwrap_or(1);
                                if let Some(step) = step_results.iter_mut().find(|s| s.name == step_name) {
                                    step.exit_code = Some(exit_code);
                                }
                            }
                        } else {
                            // Regular stderr line
                            log_buffer.extend_from_slice(line.as_bytes());
                            log_buffer.push(b'\n');
                        }
                    }
                    _ => stderr_done = true,
                }
            }
        }

        // Flush logs periodically
        if last_flush.elapsed() >= flush_interval && !log_buffer.is_empty() {
            send_log_chunk(
                client,
                &log_url,
                &config.token,
                job_name,
                job_token,
                &log_buffer,
            )
            .await?;
            log_buffer.clear();
            last_flush = std::time::Instant::now();
        }
    }

    // Flush remaining logs
    if !log_buffer.is_empty() {
        send_log_chunk(
            client,
            &log_url,
            &config.token,
            job_name,
            job_token,
            &log_buffer,
        )
        .await?;
    }

    Ok(step_results)
}

/// Send a log chunk to the master
async fn send_log_chunk(
    client: &Client,
    url: &str,
    token: &str,
    job_name: &str,
    job_token: &str,
    data: &[u8],
) -> Result<()> {
    #[derive(Serialize)]
    struct LogChunk {
        job_name: String,
        data: String,
        job_token: String,
    }

    let chunk = LogChunk {
        job_name: job_name.to_string(),
        data: base64::engine::general_purpose::STANDARD.encode(data),
        job_token: job_token.to_string(),
    };

    let response = client
        .post(url)
        .header("Authorization", format!("Bearer {}", token))
        .json(&chunk)
        .send()
        .await?;

    if !response.status().is_success() {
        eprintln!("Failed to send log chunk: {}", response.status());
    }

    Ok(())
}

/// Send a heartbeat to the master
async fn send_heartbeat(client: &Client, url: &str, token: &str) -> Result<()> {
    #[derive(Serialize)]
    struct Heartbeat {}

    let _ = client
        .post(url)
        .header("Authorization", format!("Bearer {}", token))
        .json(&Heartbeat {})
        .send()
        .await?;

    Ok(())
}

#[derive(Serialize)]
struct JobCompleteRequest {
    success: bool,
    exit_code: Option<i32>,
    jobs: Vec<JobResultUpdate>,
    job_token: String,
}

#[derive(Serialize)]
struct JobResultUpdate {
    name: String,
    exit_code: Option<i32>,
    steps: Vec<StepResultUpdate>,
}

#[derive(Serialize)]
pub struct StepResultUpdate {
    name: String,
    exit_code: Option<i32>,
}