text/x-rust
•
22.78 KB
•
724 lines
//! CI service
//!
//! Handles CI job execution triggered by repository events.
use std::path::{Path, PathBuf};
use std::process::Stdio;
use anyhow::{Context, Result};
use time::OffsetDateTime;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use crate::GlobalState;
use crate::models::{
CiJob, CiJobRequest, CiJobResult, CiJobResultResponse, CiJobStatus, CiRunStatusResponse,
CiStepResult, CiStepResultResponse, CiWorkflow, WorkflowJob,
};
use crate::services::repository;
use rubhub_state::RepoEvent;
const WORKFLOW_DIR: &str = ".rubhub/workflows";
const PODMAN_IMAGE: &str = "nixos/nix:latest";
// Markers for step tracking - verbose to avoid false positives
const STEP_START_MARKER: &str = ">>>RUBHUB_STEP_START:";
const STEP_END_MARKER: &str = ">>>RUBHUB_STEP_END:";
const MARKER_SUFFIX: &str = "<<<";
/// Check if podman is available and working
pub async fn validate_podman() -> Result<String> {
let output = Command::new("podman")
.arg("--version")
.output()
.await
.context("Failed to run podman")?;
if !output.status.success() {
anyhow::bail!("podman --version failed");
}
let version = String::from_utf8_lossy(&output.stdout).trim().to_string();
Ok(version)
}
/// Spawn the CI event listener that watches for BranchUpdated events
pub fn spawn_ci_listener(state: GlobalState, job_tx: mpsc::Sender<CiJobRequest>) {
let rx = state.event_tx.subscribe();
tokio::spawn(async move {
let mut stream = BroadcastStream::new(rx);
while let Some(Ok(event)) = stream.next().await {
if let RepoEvent::BranchUpdated { info, branch } = event
&& let Err(e) = handle_branch_update(
&state,
&info.owner,
&info.project,
&branch,
&info.commit_hash,
&job_tx,
)
.await
{
eprintln!(
"CI: Error handling branch update for {}/{}: {}",
info.owner, info.project, e
);
}
}
});
}
/// Handle a branch update event - check for workflows and queue jobs
async fn handle_branch_update(
state: &GlobalState,
owner: &str,
project: &str,
branch: &str,
commit_hash: &str,
job_tx: &mpsc::Sender<CiJobRequest>,
) -> Result<()> {
// Try to list workflow files in .rubhub/workflows/
let tree = match repository::get_git_tree(state, owner, project, branch, WORKFLOW_DIR).await {
Ok(entries) => entries,
Err(_) => return Ok(()), // No workflows directory, nothing to do
};
// Filter for .yaml/.yml files
let workflow_files: Vec<_> = tree
.iter()
.filter(|e| e.filename.ends_with(".yaml") || e.filename.ends_with(".yml"))
.collect();
if workflow_files.is_empty() {
return Ok(());
}
// Process each workflow file
for entry in workflow_files {
let file_path = format!("{}/{}", WORKFLOW_DIR, entry.filename);
// Read workflow file
let content =
match repository::get_git_file(state, owner, project, branch, &file_path).await {
Ok(bytes) => bytes,
Err(e) => {
eprintln!("CI: Failed to read workflow file {}: {}", file_path, e);
continue;
}
};
// Parse workflow
let content_str = match String::from_utf8(content) {
Ok(s) => s,
Err(_) => {
eprintln!("CI: Workflow file {} is not valid UTF-8", file_path);
continue;
}
};
let workflow: CiWorkflow = match serde_yaml::from_str(&content_str) {
Ok(w) => w,
Err(e) => {
eprintln!("CI: Failed to parse workflow {}: {}", file_path, e);
continue;
}
};
// Check if workflow should run on this branch
if !workflow.should_run_on_branch(branch) {
continue;
}
// Queue job
let request = CiJobRequest {
owner: owner.to_string(),
project: project.to_string(),
branch: branch.to_string(),
commit_hash: commit_hash.to_string(),
workflow_name: workflow.name.clone(),
workflow,
};
if let Err(e) = job_tx.send(request).await {
eprintln!("CI: Failed to queue job: {}", e);
}
}
Ok(())
}
/// Spawn the CI worker that processes jobs from the queue
pub fn spawn_ci_worker(state: GlobalState, mut rx: mpsc::Receiver<CiJobRequest>) {
tokio::spawn(async move {
while let Some(request) = rx.recv().await {
let state = state.clone();
tokio::spawn(async move {
if let Err(e) = dispatch_job(&state, request).await {
eprintln!("CI: Job dispatch error: {}", e);
}
});
}
});
}
/// Dispatch a job to either a remote runner or local execution
async fn dispatch_job(state: &GlobalState, request: CiJobRequest) -> Result<()> {
use crate::services::runner;
// Check for available remote runners first (priority)
if let Some(runner_id) = runner::find_available_runner(state).await {
// Create the job record
let ci_root = &state.config.ci_root;
let mut run = CiJob::new(
request.owner.clone(),
request.project.clone(),
request.workflow_name.clone(),
request.branch.clone(),
request.commit_hash.clone(),
);
// Initialize job results from workflow jobs
run.jobs = request
.workflow
.jobs
.iter()
.map(|j| {
let steps = j
.steps
.iter()
.map(|s| CiStepResult::new(s.name.clone()))
.collect();
CiJobResult::new(j.name.clone(), steps)
})
.collect();
// Ensure run directory exists
let run_dir = run.job_dir(ci_root);
fs::create_dir_all(&run_dir).await?;
// Create directories for each job's logs
for workflow_job in &request.workflow.jobs {
let job_result_dir = run.job_result_dir(ci_root, &workflow_job.name);
fs::create_dir_all(&job_result_dir).await?;
}
// Save initial run state
save_job(&run, ci_root).await?;
// Serialize workflow to JSON for the runner
let workflow_json = serde_json::to_value(&request.workflow)?;
// Mark as running
run.status = CiJobStatus::Running;
run.started_at = Some(OffsetDateTime::now_utc());
save_job(&run, ci_root).await?;
// Assign to remote runner
if let Err(e) = runner::assign_job_to_runner(state, runner_id, &run, workflow_json).await {
eprintln!("CI: Failed to assign job to runner: {}", e);
// Fall back to local execution if assignment fails
return process_job(state, request).await;
}
println!(
"CI: Dispatched run {} to remote runner {}",
run.id, runner_id
);
return Ok(());
}
// No remote runners available, check for local podman
if state
.local_podman_ci_available
.load(std::sync::atomic::Ordering::Relaxed)
{
return process_job(state, request).await;
}
// No CI backend available
eprintln!(
"CI: No CI backend available for {}/{} (no remote runners or local podman)",
request.owner, request.project
);
Ok(())
}
/// Process a single CI run (multiple jobs, each with multiple steps)
async fn process_job(state: &GlobalState, request: CiJobRequest) -> Result<()> {
let ci_root = &state.config.ci_root;
// Create CI run record
let mut run = CiJob::new(
request.owner.clone(),
request.project.clone(),
request.workflow_name.clone(),
request.branch.clone(),
request.commit_hash.clone(),
);
// Initialize job results from workflow jobs
run.jobs = request
.workflow
.jobs
.iter()
.map(|j| {
let steps = j
.steps
.iter()
.map(|s| CiStepResult::new(s.name.clone()))
.collect();
CiJobResult::new(j.name.clone(), steps)
})
.collect();
// Ensure run directory exists
let run_dir = run.job_dir(ci_root);
fs::create_dir_all(&run_dir).await?;
// Create directories for each job's logs
for workflow_job in &request.workflow.jobs {
let job_result_dir = run.job_result_dir(ci_root, &workflow_job.name);
fs::create_dir_all(&job_result_dir).await?;
}
// Save initial run state
save_job(&run, ci_root).await?;
println!(
"CI: Starting run {} for {}/{} ({}) with {} jobs",
run.id,
run.owner,
run.project,
run.workflow_name,
run.jobs.len()
);
// Clone repo to temp directory
let temp_dir = tempfile::tempdir().context("Failed to create temp directory")?;
let work_dir = temp_dir.path();
let repo_path = state
.config
.git_root
.join(&request.owner)
.join(&request.project);
// Clone the repo (shallow clone of the specific branch)
let clone_status = Command::new("git")
.args(["clone", "--depth", "1", "--branch", &request.branch, "--"])
.arg(&repo_path)
.arg(work_dir)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.await
.context("Failed to spawn git clone")?;
if !clone_status.success() {
run.status = CiJobStatus::Failed;
run.finished_at = Some(OffsetDateTime::now_utc());
// Mark all jobs as failed
for job_result in &mut run.jobs {
job_result.status = CiJobStatus::Failed;
}
save_job(&run, ci_root).await?;
return Ok(());
}
// Update run status to running
run.status = CiJobStatus::Running;
run.started_at = Some(OffsetDateTime::now_utc());
save_job(&run, ci_root).await?;
// Execute jobs sequentially (each job = separate container)
let mut overall_success = true;
for (job_idx, workflow_job) in request.workflow.jobs.iter().enumerate() {
// Mark this job as running
run.jobs[job_idx].status = CiJobStatus::Running;
run.jobs[job_idx].started_at = Some(OffsetDateTime::now_utc());
save_job(&run, ci_root).await?;
println!(
"CI: Running job '{}' ({}/{})",
workflow_job.name,
job_idx + 1,
request.workflow.jobs.len()
);
// Run all steps in a single container
let exit_code = run_job_in_container(
&mut run,
job_idx,
&request.workflow,
workflow_job,
work_dir,
ci_root,
)
.await?;
// Update job result
run.jobs[job_idx].exit_code = Some(exit_code);
run.jobs[job_idx].finished_at = Some(OffsetDateTime::now_utc());
run.jobs[job_idx].status = if exit_code == 0 {
CiJobStatus::Success
} else {
CiJobStatus::Failed
};
save_job(&run, ci_root).await?;
if exit_code != 0 {
overall_success = false;
println!(
"CI: Job '{}' failed with exit code {}",
workflow_job.name, exit_code
);
break; // Stop on first failure
}
}
// Update overall status
run.status = if overall_success {
CiJobStatus::Success
} else {
CiJobStatus::Failed
};
run.exit_code = run.jobs.iter().rev().filter_map(|j| j.exit_code).next();
run.finished_at = Some(OffsetDateTime::now_utc());
save_job(&run, ci_root).await?;
println!("CI: Run {} finished with status {:?}", run.id, run.status);
Ok(())
}
// Named volumes for nix - shared across all CI runs, users, and repos
const NIX_STORE_VOLUME: &str = "rubhub-nix-store";
const NIX_CACHE_VOLUME: &str = "rubhub-nix-cache";
/// Run a job (all its steps) in a single container
async fn run_job_in_container(
run: &mut CiJob,
job_idx: usize,
workflow: &CiWorkflow,
job: &WorkflowJob,
work_dir: &Path,
ci_root: &Path,
) -> Result<i32> {
// Generate the job script with all steps
let inner_script = generate_inner_script(job);
// Write scripts to work directory
let inner_script_path = work_dir.join(".rubhub-ci-inner.sh");
let script_path = work_dir.join(".rubhub-ci-script.sh");
std::fs::write(&inner_script_path, &inner_script)?;
let script_content = if workflow.packages.is_empty() {
// No packages - just run the inner script directly
"#!/bin/sh\nsh /home/anon/workspace/.rubhub-ci-inner.sh\n".to_string()
} else {
// Wrap with nix-shell
let packages = workflow.packages.join(" ");
format!(
"#!/bin/sh\nnix-shell -I nixpkgs=channel:nixpkgs-unstable --option sandbox false -p {} --run 'sh /home/anon/workspace/.rubhub-ci-inner.sh'\n",
packages
)
};
std::fs::write(&script_path, &script_content)?;
// Run podman (30 min timeout)
// Use named volumes for /nix and ~/.cache/nix to cache:
// - /nix: the nix store (downloaded packages)
// - ~/.cache/nix: channel expressions (nixexprs)
let mut child = Command::new("podman")
.args([
"run",
"--rm",
"--timeout=1800",
"-e",
"HOME=/home/anon",
"-e",
"USER=anon",
"-v",
&format!("{}:/home/anon/workspace", work_dir.display()),
"-v",
&format!("{}:/nix", NIX_STORE_VOLUME),
"-v",
&format!("{}:/home/anon/.cache/nix", NIX_CACHE_VOLUME),
"-w",
"/home/anon/workspace",
PODMAN_IMAGE,
"sh",
"/home/anon/workspace/.rubhub-ci-script.sh",
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("Failed to spawn podman")?;
// Get stdout and stderr handles
let stdout = child.stdout.take().context("Failed to get stdout")?;
let stderr = child.stderr.take().context("Failed to get stderr")?;
let log_path = run.job_log_file(ci_root, &job.name);
// Stream both stdout and stderr to a single file, parsing step markers from stderr
let run_clone = run.clone();
let ci_root_clone = ci_root.to_path_buf();
let stream_task = tokio::spawn(stream_combined_with_markers(
stdout,
stderr,
log_path,
run_clone,
job_idx,
ci_root_clone,
));
// Wait for process to complete
let status = child.wait().await.context("Failed to wait for podman")?;
// Wait for log streaming to finish and get updated run state
if let Ok(updated_run) = stream_task.await {
*run = updated_run;
}
Ok(status.code().unwrap_or(1))
}
/// Generate the inner script that runs all steps with markers
fn generate_inner_script(job: &WorkflowJob) -> String {
let mut script = String::new();
script.push_str("#!/bin/sh\n\n");
for step in &job.steps {
// Emit start marker to stderr
script.push_str(&format!(
"echo '{}{}{}' >&2\n",
STEP_START_MARKER, step.name, MARKER_SUFFIX
));
// Run the step in a subshell to capture exit code
script.push_str("(\n");
script.push_str(&step.run);
if !step.run.ends_with('\n') {
script.push('\n');
}
script.push_str(")\n");
script.push_str("__exit=$?\n");
// Emit end marker with exit code to stderr
script.push_str(&format!(
"echo '{}{}:'\"$__exit\"'{}' >&2\n",
STEP_END_MARKER, step.name, MARKER_SUFFIX
));
// Exit if step failed
script.push_str("[ $__exit -ne 0 ] && exit $__exit\n\n");
}
// Explicit success exit - without this, the script would exit with 1
// because the last `[ $__exit -ne 0 ]` test fails when __exit is 0
script.push_str("exit 0\n");
script
}
/// Stream both stdout and stderr to a single file, parsing step markers from stderr
async fn stream_combined_with_markers(
stdout: impl tokio::io::AsyncRead + Unpin,
stderr: impl tokio::io::AsyncRead + Unpin,
path: PathBuf,
mut run: CiJob,
job_idx: usize,
ci_root: PathBuf,
) -> CiJob {
let mut stdout_lines = BufReader::new(stdout).lines();
let mut stderr_lines = BufReader::new(stderr).lines();
let Ok(mut file) = fs::File::create(&path).await else {
return run;
};
// Track when each stream is done
let mut stdout_done = false;
let mut stderr_done = false;
loop {
if stdout_done && stderr_done {
break;
}
tokio::select! {
result = stdout_lines.next_line(), if !stdout_done => {
match result {
Ok(Some(line)) => {
let _ = file.write_all(line.as_bytes()).await;
let _ = file.write_all(b"\n").await;
}
_ => stdout_done = true,
}
}
result = stderr_lines.next_line(), if !stderr_done => {
match result {
Ok(Some(line)) => {
// Check for step markers
if line.starts_with(STEP_START_MARKER) && line.ends_with(MARKER_SUFFIX) {
let step_name = &line[STEP_START_MARKER.len()..line.len() - MARKER_SUFFIX.len()];
if let Some(step) = run.jobs[job_idx]
.steps
.iter_mut()
.find(|s| s.name == step_name)
{
step.status = CiJobStatus::Running;
let _ = save_job(&run, &ci_root).await;
}
} else if line.starts_with(STEP_END_MARKER) && line.ends_with(MARKER_SUFFIX) {
let content = &line[STEP_END_MARKER.len()..line.len() - MARKER_SUFFIX.len()];
if let Some((step_name, exit_code_str)) = content.rsplit_once(':') {
let exit_code = exit_code_str.parse::<i32>().unwrap_or(1);
if let Some(step) = run.jobs[job_idx]
.steps
.iter_mut()
.find(|s| s.name == step_name)
{
step.exit_code = Some(exit_code);
step.status = if exit_code == 0 {
CiJobStatus::Success
} else {
CiJobStatus::Failed
};
let _ = save_job(&run, &ci_root).await;
}
}
} else {
// Regular stderr line - write to file
let _ = file.write_all(line.as_bytes()).await;
let _ = file.write_all(b"\n").await;
}
}
_ => stderr_done = true,
}
}
}
}
run
}
/// Save job metadata to disk
async fn save_job(job: &CiJob, ci_root: &Path) -> Result<()> {
let job_file = job.job_file(ci_root);
let content = serde_json::to_string_pretty(job)?;
fs::write(&job_file, content).await?;
Ok(())
}
/// Load a job from disk
pub async fn load_job(ci_root: &Path, owner: &str, project: &str, job_id: &str) -> Result<CiJob> {
let job_file = ci_root
.join(owner)
.join(project)
.join(job_id)
.join("job.json");
let content = fs::read_to_string(&job_file)
.await
.with_context(|| format!("Failed to read job file: {}", job_file.display()))?;
let job: CiJob = serde_json::from_str(&content)?;
Ok(job)
}
/// Load the log file for a specific job within a CI run
pub async fn load_job_log(
ci_root: &Path,
owner: &str,
project: &str,
run_id: &str,
job_name: &str,
) -> String {
let job_dir = ci_root
.join(owner)
.join(project)
.join(run_id)
.join("jobs")
.join(job_name);
fs::read_to_string(job_dir.join("output.log"))
.await
.unwrap_or_default()
}
/// Load a CI run with all job logs (for JSON API response)
pub async fn load_run_with_all_logs(
ci_root: &Path,
owner: &str,
project: &str,
run_id: &str,
) -> Result<CiRunStatusResponse> {
let run = load_job(ci_root, owner, project, run_id).await?;
let mut job_responses = Vec::new();
for job_result in &run.jobs {
let log = load_job_log(ci_root, owner, project, run_id, &job_result.name).await;
let step_responses: Vec<CiStepResultResponse> = job_result
.steps
.iter()
.map(|s| CiStepResultResponse {
name: s.name.clone(),
status: s.status.as_str().to_string(),
exit_code: s.exit_code,
})
.collect();
job_responses.push(CiJobResultResponse {
name: job_result.name.clone(),
status: job_result.status.as_str().to_string(),
exit_code: job_result.exit_code,
steps: step_responses,
log,
});
}
Ok(CiRunStatusResponse {
id: run.id.to_string(),
status: run.status.as_str().to_string(),
jobs: job_responses,
})
}
/// List all jobs for a project, sorted by creation time (newest first)
pub async fn list_jobs(ci_root: &Path, owner: &str, project: &str) -> Result<Vec<CiJob>> {
let project_dir = ci_root.join(owner).join(project);
if !project_dir.exists() {
return Ok(vec![]);
}
let mut jobs = Vec::new();
let mut entries = fs::read_dir(&project_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
let job_file = path.join("job.json");
if job_file.exists()
&& let Ok(content) = fs::read_to_string(&job_file).await
&& let Ok(job) = serde_json::from_str::<CiJob>(&content)
{
jobs.push(job);
}
}
}
// Sort by created_at descending (newest first)
jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(jobs)
}