text/x-rust
•
4.15 KB
•
136 lines
//! CI job execution
//!
//! Handles dispatching jobs to local or remote executors.
pub mod local;
pub mod remote;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::Result;
use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::mpsc;
use crate::models::{CiJob, CiJobRequest, CiJobResult, CiJobStatus, CiStepResult};
use crate::storage::save_job;
use rubhub_state::RunnerRegistry;
/// Configuration for the CI executor
#[derive(Clone)]
pub struct ExecutorConfig {
pub ci_root: Arc<Path>,
pub git_root: Arc<Path>,
pub local_podman_available: Arc<AtomicBool>,
pub runner_registry: Arc<RunnerRegistry>,
}
/// Spawn the CI worker that processes jobs from the queue
pub fn spawn_worker(config: ExecutorConfig, mut rx: mpsc::Receiver<CiJobRequest>) {
tokio::spawn(async move {
while let Some(request) = rx.recv().await {
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = dispatch_job(&config, request).await {
eprintln!("CI: Job dispatch error: {}", e);
}
});
}
});
}
/// Dispatch a job to either a remote runner or local execution
async fn dispatch_job(config: &ExecutorConfig, request: CiJobRequest) -> Result<()> {
// Check for available remote runners first (priority)
if let Some(runner_id) = remote::find_available_runner(&config.runner_registry).await {
// Create the job record
let ci_root = &config.ci_root;
let mut run = CiJob::new(
request.owner.clone(),
request.project.clone(),
request.workflow_name.clone(),
request.branch.clone(),
request.commit_hash.clone(),
);
// Initialize job results from workflow jobs
run.jobs = request
.workflow
.jobs
.iter()
.map(|j| {
let steps = j
.steps
.iter()
.map(|s| CiStepResult::new(s.name.clone()))
.collect();
CiJobResult::new(j.name.clone(), steps)
})
.collect();
// Generate job token for runner authentication
let job_token = uuid::Uuid::new_v4().to_string();
run.job_token = Some(job_token.clone());
// Ensure run directory exists
let run_dir = run.job_dir(ci_root);
fs::create_dir_all(&run_dir).await?;
// Create directories for each job's logs
for workflow_job in &request.workflow.jobs {
let job_result_dir = run.job_result_dir(ci_root, &workflow_job.name);
fs::create_dir_all(&job_result_dir).await?;
}
// Save initial run state
save_job(&run, ci_root).await?;
// Serialize workflow to JSON for the runner
let workflow_json = serde_json::to_value(&request.workflow)?;
// Mark as running
run.status = CiJobStatus::Running;
run.started_at = Some(OffsetDateTime::now_utc());
save_job(&run, ci_root).await?;
// Assign to remote runner
if let Err(e) = remote::assign_job_to_runner(
&config.runner_registry,
runner_id,
&run,
workflow_json,
job_token,
)
.await
{
eprintln!(
"CI: Failed to assign job to runner: {}, falling back to local",
e
);
// Clean up the job directory we created before falling back
let _ = fs::remove_dir_all(&run_dir).await;
return local::process_job(config, request).await;
}
println!(
"CI: Dispatched run {} to remote runner {}",
run.id, runner_id
);
return Ok(());
}
// No remote runners available, check for local podman
if config.local_podman_available.load(Ordering::Relaxed) {
return local::process_job(config, request).await;
}
// No CI backend available
eprintln!(
"CI: No CI backend available for {}/{} (no remote runners or local podman)",
request.owner, request.project
);
Ok(())
}