text/x-rust
•
17.42 KB
•
598 lines
//! Runner client service
//!
//! Connects to a master RubHub instance to receive and execute CI jobs.
use std::path::Path;
use std::process::Stdio;
use std::time::Duration;
use anyhow::{Context, Result};
use base64::Engine;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use uuid::Uuid;
use crate::GlobalState;
use crate::models::{CiWorkflow, WorkflowJob};
use rubhub_state::{RemoteJobAssignment, RunnerClientConfig, RunnerTags};
const HEARTBEAT_INTERVAL_SECS: u64 = 30;
const RECONNECT_DELAY_SECS: u64 = 5;
const MAX_RECONNECT_DELAY_SECS: u64 = 60;
// Markers for step tracking (same as in ci.rs)
const STEP_START_MARKER: &str = ">>>RUBHUB_STEP_START:";
const STEP_END_MARKER: &str = ">>>RUBHUB_STEP_END:";
const MARKER_SUFFIX: &str = "<<<";
const PODMAN_IMAGE: &str = "nixos/nix:latest";
const NIX_STORE_VOLUME: &str = "rubhub-nix-store";
const NIX_CACHE_VOLUME: &str = "rubhub-nix-cache";
/// Start the runner client that connects to a master instance
pub fn spawn_runner_client(state: GlobalState, config: RunnerClientConfig) {
tokio::spawn(async move {
let mut reconnect_delay = RECONNECT_DELAY_SECS;
loop {
println!(
"Runner client: Connecting to master at {}",
config.master_url
);
match run_runner_client(&state, &config).await {
Ok(()) => {
println!("Runner client: Disconnected from master");
reconnect_delay = RECONNECT_DELAY_SECS; // Reset delay on clean disconnect
}
Err(e) => {
eprintln!("Runner client: Error: {}", e);
}
}
// Wait before reconnecting with exponential backoff
println!(
"Runner client: Reconnecting in {} seconds...",
reconnect_delay
);
tokio::time::sleep(Duration::from_secs(reconnect_delay)).await;
reconnect_delay = (reconnect_delay * 2).min(MAX_RECONNECT_DELAY_SECS);
}
});
}
/// Run the runner client connection loop
async fn run_runner_client(state: &GlobalState, config: &RunnerClientConfig) -> Result<()> {
let client = Client::builder()
.timeout(Duration::from_secs(0)) // No timeout for SSE
.build()?;
// Build tags for this runner
let tags = RunnerTags::current();
// Connect to SSE endpoint
let connect_url = format!(
"{}/.runners/connect?os={}&arch={}&tags={}",
config.master_url,
tags.os,
tags.arch,
tags.extra.join(",")
);
let response = client
.get(&connect_url)
.header("Authorization", format!("Bearer {}", config.token))
.send()
.await
.context("Failed to connect to master")?;
if !response.status().is_success() {
anyhow::bail!(
"Master rejected connection: {} {}",
response.status(),
response.text().await.unwrap_or_default()
);
}
println!(
"Runner client: Connected to master (os={}, arch={})",
tags.os, tags.arch
);
// Spawn heartbeat task
let heartbeat_client = client.clone();
let heartbeat_url = format!("{}/.runners/heartbeat", config.master_url);
let heartbeat_token = config.token.clone();
let heartbeat_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
loop {
interval.tick().await;
let _ = send_heartbeat(&heartbeat_client, &heartbeat_url, &heartbeat_token).await;
}
});
// Process SSE events
let mut buffer = String::new();
let mut bytes_stream = response.bytes_stream();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.context("Error reading SSE stream")?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
// Process complete SSE messages
while let Some(pos) = buffer.find("\n\n") {
let message = buffer[..pos].to_string();
buffer = buffer[pos + 2..].to_string();
if let Some(data) = message.strip_prefix("data: ")
&& let Err(e) = handle_sse_event(state, config, &client, data).await
{
eprintln!("Runner client: Error handling event: {}", e);
}
}
}
// Clean up heartbeat task
heartbeat_handle.abort();
Ok(())
}
/// Handle an SSE event from the master
async fn handle_sse_event(
state: &GlobalState,
config: &RunnerClientConfig,
client: &Client,
data: &str,
) -> Result<()> {
#[derive(Deserialize)]
#[serde(tag = "type")]
enum Event {
Heartbeat {
#[allow(dead_code)]
timestamp: String,
},
Job(RemoteJobAssignment),
Cancel {
job_id: Uuid,
},
}
let event: Event = serde_json::from_str(data).context("Failed to parse SSE event")?;
match event {
Event::Heartbeat { .. } => {
// Server heartbeat, just acknowledge
}
Event::Job(assignment) => {
println!(
"Runner client: Received job {} for {}/{}",
assignment.job_id, assignment.owner, assignment.project
);
if let Err(e) = execute_job(state, config, client, assignment).await {
eprintln!("Runner client: Job execution error: {}", e);
}
}
Event::Cancel { job_id } => {
println!("Runner client: Received cancel for job {}", job_id);
// TODO: Implement job cancellation
}
}
Ok(())
}
/// Execute a job received from the master
async fn execute_job(
state: &GlobalState,
config: &RunnerClientConfig,
client: &Client,
assignment: RemoteJobAssignment,
) -> Result<()> {
let job_id = assignment.job_id;
// Parse the workflow from JSON
let workflow: CiWorkflow =
serde_json::from_value(assignment.workflow).context("Failed to parse workflow")?;
// Download source tarball
let source_url = format!(
"{}/.runners/jobs/{}/{}/{}/source.tar.gz",
config.master_url, assignment.owner, assignment.project, job_id
);
println!("Runner client: Downloading source for job {}", job_id);
let response = client
.get(&source_url)
.header("Authorization", format!("Bearer {}", config.token))
.send()
.await
.context("Failed to download source")?;
if !response.status().is_success() {
anyhow::bail!("Failed to download source: {}", response.status());
}
let tarball = response.bytes().await.context("Failed to read tarball")?;
// Extract to temp directory
let temp_dir = tempfile::tempdir().context("Failed to create temp directory")?;
let work_dir = temp_dir.path();
// Extract tarball
let tar_gz = flate2::read::GzDecoder::new(&tarball[..]);
let mut archive = tar::Archive::new(tar_gz);
archive
.unpack(work_dir)
.context("Failed to extract tarball")?;
println!(
"Runner client: Executing job {} ({} jobs)",
job_id,
workflow.jobs.len()
);
// Execute each job in the workflow
let mut overall_success = true;
let mut job_results = Vec::new();
for workflow_job in &workflow.jobs {
println!("Runner client: Running job '{}'", workflow_job.name);
let (exit_code, step_results) = run_job_in_container(
state,
config,
client,
job_id,
&assignment.owner,
&assignment.project,
&assignment.job_token,
&workflow,
workflow_job,
work_dir,
)
.await?;
let job_success = exit_code == 0;
job_results.push(JobResultUpdate {
name: workflow_job.name.clone(),
exit_code: Some(exit_code),
steps: step_results,
});
if !job_success {
overall_success = false;
println!(
"Runner client: Job '{}' failed with exit code {}",
workflow_job.name, exit_code
);
break; // Stop on first failure
}
}
// Report completion to master
let complete_url = format!(
"{}/.runners/jobs/{}/{}/{}/complete",
config.master_url, assignment.owner, assignment.project, job_id
);
let complete_request = JobCompleteRequest {
success: overall_success,
exit_code: job_results.last().and_then(|j| j.exit_code),
jobs: job_results,
job_token: assignment.job_token.clone(),
};
let response = client
.post(&complete_url)
.header("Authorization", format!("Bearer {}", config.token))
.json(&complete_request)
.send()
.await
.context("Failed to report completion")?;
if !response.status().is_success() {
eprintln!(
"Runner client: Failed to report completion: {}",
response.status()
);
}
println!(
"Runner client: Job {} completed (success={})",
job_id, overall_success
);
Ok(())
}
/// Run a single job in a container and stream logs back
async fn run_job_in_container(
_state: &GlobalState,
config: &RunnerClientConfig,
client: &Client,
job_id: Uuid,
owner: &str,
project: &str,
job_token: &str,
workflow: &CiWorkflow,
job: &WorkflowJob,
work_dir: &Path,
) -> Result<(i32, Vec<StepResultUpdate>)> {
// Generate the job script with all steps
let inner_script = generate_inner_script(job);
// Write scripts to work directory
let inner_script_path = work_dir.join(".rubhub-ci-inner.sh");
let script_path = work_dir.join(".rubhub-ci-script.sh");
std::fs::write(&inner_script_path, &inner_script)?;
let script_content = if workflow.packages.is_empty() {
"#!/bin/sh\nsh /home/anon/workspace/.rubhub-ci-inner.sh\n".to_string()
} else {
let packages = workflow.packages.join(" ");
format!(
"#!/bin/sh\nnix-shell -I nixpkgs=channel:nixpkgs-unstable --option sandbox false -p {} --run 'sh /home/anon/workspace/.rubhub-ci-inner.sh'\n",
packages
)
};
std::fs::write(&script_path, &script_content)?;
// Run podman
let mut child = Command::new("podman")
.args([
"run",
"--rm",
"--timeout=1800",
"-e",
"HOME=/home/anon",
"-e",
"USER=anon",
"-v",
&format!("{}:/home/anon/workspace", work_dir.display()),
"-v",
&format!("{}:/nix", NIX_STORE_VOLUME),
"-v",
&format!("{}:/home/anon/.cache/nix", NIX_CACHE_VOLUME),
"-w",
"/home/anon/workspace",
PODMAN_IMAGE,
"sh",
"/home/anon/workspace/.rubhub-ci-script.sh",
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("Failed to spawn podman")?;
let stdout = child.stdout.take().context("Failed to get stdout")?;
let stderr = child.stderr.take().context("Failed to get stderr")?;
// Stream logs and parse step markers
let step_results = stream_logs_to_master(
config, client, job_id, owner, project, &job.name, job_token, job, stdout, stderr,
)
.await?;
let status = child.wait().await.context("Failed to wait for podman")?;
let exit_code = status.code().unwrap_or(1);
Ok((exit_code, step_results))
}
/// Stream logs to master and parse step markers
async fn stream_logs_to_master(
config: &RunnerClientConfig,
client: &Client,
job_id: Uuid,
owner: &str,
project: &str,
job_name: &str,
job_token: &str,
job: &WorkflowJob,
stdout: impl tokio::io::AsyncRead + Unpin,
stderr: impl tokio::io::AsyncRead + Unpin,
) -> Result<Vec<StepResultUpdate>> {
let mut stdout_lines = BufReader::new(stdout).lines();
let mut stderr_lines = BufReader::new(stderr).lines();
let mut stdout_done = false;
let mut stderr_done = false;
let mut log_buffer = Vec::new();
let mut step_results: Vec<StepResultUpdate> = job
.steps
.iter()
.map(|s| StepResultUpdate {
name: s.name.clone(),
exit_code: None,
})
.collect();
let log_url = format!(
"{}/.runners/jobs/{}/{}/{}/log",
config.master_url, owner, project, job_id
);
// Flush logs periodically
let mut last_flush = std::time::Instant::now();
let flush_interval = Duration::from_millis(500);
loop {
if stdout_done && stderr_done {
break;
}
tokio::select! {
result = stdout_lines.next_line(), if !stdout_done => {
match result {
Ok(Some(line)) => {
log_buffer.extend_from_slice(line.as_bytes());
log_buffer.push(b'\n');
}
_ => stdout_done = true,
}
}
result = stderr_lines.next_line(), if !stderr_done => {
match result {
Ok(Some(line)) => {
// Check for step markers
if line.starts_with(STEP_START_MARKER) && line.ends_with(MARKER_SUFFIX) {
// Step start marker - don't write to log
} else if line.starts_with(STEP_END_MARKER) && line.ends_with(MARKER_SUFFIX) {
// Step end marker - parse exit code
let content = &line[STEP_END_MARKER.len()..line.len() - MARKER_SUFFIX.len()];
if let Some((step_name, exit_code_str)) = content.rsplit_once(':') {
let exit_code = exit_code_str.parse::<i32>().unwrap_or(1);
if let Some(step) = step_results.iter_mut().find(|s| s.name == step_name) {
step.exit_code = Some(exit_code);
}
}
} else {
// Regular stderr line
log_buffer.extend_from_slice(line.as_bytes());
log_buffer.push(b'\n');
}
}
_ => stderr_done = true,
}
}
}
// Flush logs periodically
if last_flush.elapsed() >= flush_interval && !log_buffer.is_empty() {
send_log_chunk(
client,
&log_url,
&config.token,
job_name,
job_token,
&log_buffer,
)
.await?;
log_buffer.clear();
last_flush = std::time::Instant::now();
}
}
// Flush remaining logs
if !log_buffer.is_empty() {
send_log_chunk(
client,
&log_url,
&config.token,
job_name,
job_token,
&log_buffer,
)
.await?;
}
Ok(step_results)
}
/// Send a log chunk to the master
async fn send_log_chunk(
client: &Client,
url: &str,
token: &str,
job_name: &str,
job_token: &str,
data: &[u8],
) -> Result<()> {
#[derive(Serialize)]
struct LogChunk {
job_name: String,
data: String,
job_token: String,
}
let chunk = LogChunk {
job_name: job_name.to_string(),
data: base64::engine::general_purpose::STANDARD.encode(data),
job_token: job_token.to_string(),
};
let response = client
.post(url)
.header("Authorization", format!("Bearer {}", token))
.json(&chunk)
.send()
.await?;
if !response.status().is_success() {
eprintln!("Failed to send log chunk: {}", response.status());
}
Ok(())
}
/// Send a heartbeat to the master
async fn send_heartbeat(client: &Client, url: &str, token: &str) -> Result<()> {
#[derive(Serialize)]
struct Heartbeat {}
let _ = client
.post(url)
.header("Authorization", format!("Bearer {}", token))
.json(&Heartbeat {})
.send()
.await?;
Ok(())
}
/// Generate the inner script that runs all steps with markers
fn generate_inner_script(job: &WorkflowJob) -> String {
let mut script = String::new();
script.push_str("#!/bin/sh\n\n");
for step in &job.steps {
script.push_str(&format!(
"echo '{}{}{}' >&2\n",
STEP_START_MARKER, step.name, MARKER_SUFFIX
));
script.push_str("(\n");
script.push_str(&step.run);
if !step.run.ends_with('\n') {
script.push('\n');
}
script.push_str(")\n");
script.push_str("__exit=$?\n");
script.push_str(&format!(
"echo '{}{}:'\"$__exit\"'{}' >&2\n",
STEP_END_MARKER, step.name, MARKER_SUFFIX
));
script.push_str("[ $__exit -ne 0 ] && exit $__exit\n\n");
}
script.push_str("exit 0\n");
script
}
#[derive(Serialize)]
struct JobCompleteRequest {
success: bool,
exit_code: Option<i32>,
jobs: Vec<JobResultUpdate>,
job_token: String,
}
#[derive(Serialize)]
struct JobResultUpdate {
name: String,
exit_code: Option<i32>,
steps: Vec<StepResultUpdate>,
}
#[derive(Serialize)]
struct StepResultUpdate {
name: String,
exit_code: Option<i32>,
}