text/x-rust
•
6.29 KB
•
195 lines
//! Remote CI runner management
//!
//! Handles heartbeat checking, job dispatch, and runner lifecycle management.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::interval;
use crate::models::{CiJob, CiJobStatus};
use rubhub_state::{CurrentJob, RemoteJobAssignment, RunnerJobEvent, RunnerRegistry};
/// Heartbeat timeout in seconds
const HEARTBEAT_TIMEOUT_SECS: i64 = 60;
/// Heartbeat check interval in seconds
const HEARTBEAT_CHECK_INTERVAL_SECS: u64 = 10;
/// Server heartbeat interval (how often we send heartbeats to runners)
const SERVER_HEARTBEAT_INTERVAL_SECS: u64 = 30;
/// Spawn the heartbeat checker background task
pub fn spawn_heartbeat_checker(
runner_registry: Arc<RunnerRegistry>,
ci_root: Arc<Path>,
) {
tokio::spawn(async move {
let mut check_interval = interval(Duration::from_secs(HEARTBEAT_CHECK_INTERVAL_SECS));
loop {
check_interval.tick().await;
let now = OffsetDateTime::now_utc();
let timeout = time::Duration::seconds(HEARTBEAT_TIMEOUT_SECS);
// Check all runners for timeout
let runner_ids = runner_registry.runner_ids();
for runner_id in runner_ids {
if let Some(runner) = runner_registry.get(runner_id) {
let (timed_out, current_job) = {
let runner = runner.read().await;
let elapsed = now - runner.last_heartbeat;
(elapsed > timeout, runner.current_job.clone())
};
if timed_out {
// Runner timed out - handle disconnect
handle_runner_disconnect(&runner_registry, &ci_root, runner_id, current_job)
.await;
}
}
}
}
});
}
/// Spawn the server heartbeat sender (sends periodic heartbeats to all connected runners)
pub fn spawn_server_heartbeat_sender(runner_registry: Arc<RunnerRegistry>) {
tokio::spawn(async move {
let mut heartbeat_interval = interval(Duration::from_secs(SERVER_HEARTBEAT_INTERVAL_SECS));
loop {
heartbeat_interval.tick().await;
let heartbeat_event = RunnerJobEvent::Heartbeat {
timestamp: OffsetDateTime::now_utc(),
};
// Send heartbeat to all connected runners
let runner_ids = runner_registry.runner_ids();
for runner_id in runner_ids {
if let Some(runner) = runner_registry.get(runner_id) {
let tx = {
let runner = runner.read().await;
runner.event_tx.clone()
};
// Ignore send errors (runner might be disconnecting)
let _ = tx.send(heartbeat_event.clone()).await;
}
}
}
});
}
/// Handle a runner disconnect (timeout or explicit disconnect)
async fn handle_runner_disconnect(
runner_registry: &RunnerRegistry,
ci_root: &Path,
runner_id: uuid::Uuid,
current_job: Option<CurrentJob>,
) {
println!("Runner {} timed out, disconnecting", runner_id);
// If the runner was working on a job, mark it as failed
if let Some(job) = current_job
&& let Err(e) = mark_job_failed(ci_root, &job, "Runner disconnected").await
{
eprintln!("Failed to mark job {} as failed: {}", job.job_id, e);
}
// Unregister the runner
runner_registry.unregister(runner_id).await;
}
/// Mark a job as failed
async fn mark_job_failed(
ci_root: &Path,
current_job: &CurrentJob,
reason: &str,
) -> anyhow::Result<()> {
// Construct job directory directly from CurrentJob
let job_dir = ci_root
.join(¤t_job.owner)
.join(¤t_job.project)
.join(current_job.job_id.to_string());
// Load the job metadata
let job_file = job_dir.join("job.json");
let content = tokio::fs::read_to_string(&job_file).await?;
let mut job: CiJob = serde_json::from_str(&content)?;
// Update job status
job.status = CiJobStatus::Failed;
job.finished_at = Some(OffsetDateTime::now_utc());
// Mark all pending jobs within the run as failed
for job_result in &mut job.jobs {
if job_result.status == CiJobStatus::Running || job_result.status == CiJobStatus::Pending {
job_result.status = CiJobStatus::Failed;
job_result.finished_at = Some(OffsetDateTime::now_utc());
}
}
// Save updated job metadata atomically
let content = serde_json::to_string_pretty(&job)?;
let tmp_file = job_file.with_extension("json.tmp");
tokio::fs::write(&tmp_file, &content).await?;
tokio::fs::rename(&tmp_file, &job_file).await?;
println!("Job {} marked as failed: {}", current_job.job_id, reason);
Ok(())
}
/// Assign a job to a remote runner
pub async fn assign_job_to_runner(
runner_registry: &RunnerRegistry,
runner_id: uuid::Uuid,
job: &CiJob,
workflow_json: serde_json::Value,
job_token: String,
) -> anyhow::Result<()> {
let runner = runner_registry
.get(runner_id)
.ok_or_else(|| anyhow::anyhow!("Runner not found"))?;
// Create job assignment
let assignment = RemoteJobAssignment {
job_id: job.id,
owner: job.owner.clone(),
project: job.project.clone(),
branch: job.branch.clone(),
commit_hash: job.commit_hash.clone(),
workflow: workflow_json,
job_token,
};
// Send job to runner
let tx = {
let mut runner = runner.write().await;
runner.current_job = Some(CurrentJob {
job_id: job.id,
owner: job.owner.clone(),
project: job.project.clone(),
});
runner.event_tx.clone()
};
tx.send(RunnerJobEvent::Job(assignment))
.await
.map_err(|_| anyhow::anyhow!("Failed to send job to runner"))?;
println!("Assigned job {} to runner {}", job.id, runner_id);
Ok(())
}
/// Find an available remote runner for a job
pub async fn find_available_runner(runner_registry: &RunnerRegistry) -> Option<uuid::Uuid> {
// For now, find any idle runner
// In the future, this could match based on workflow requirements
runner_registry.find_idle_runner(None, None).await
}