text/x-rust
•
10.28 KB
•
337 lines
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use sd_notify::NotifyState;
use sd_notify::notify;
use tokio::net::{TcpListener, TcpSocket};
use tokio::runtime::Builder;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
mod controllers;
mod extractors;
mod http;
mod models;
mod services;
mod ssh;
mod views;
pub use models::{Project, ProjectSummary, UserModel};
pub use rubhub_auth_store::User;
pub use rubhub_state::{AccessType, AppConfig, ContentPage, GlobalState, RepoEvent, RepoEventInfo};
use tokio::time::interval;
/// Create and bind a TCP listener with appropriate socket options
pub fn create_listener(addr: SocketAddr, reuse_port: bool) -> io::Result<TcpListener> {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()?
} else {
TcpSocket::new_v6()?
};
socket.set_reuseaddr(true)?;
#[cfg(target_os = "linux")]
socket.set_reuseport(reuse_port)?;
socket.bind(addr)?;
socket.listen(1024)
}
/// Create both HTTP and SSH listeners, returning listeners and their bound addresses
pub fn create_listeners(
config: &AppConfig,
) -> io::Result<(TcpListener, SocketAddr, TcpListener, SocketAddr)> {
let http_listener = create_listener(config.http_bind_addr, config.reuse_port)?;
let http_addr = http_listener.local_addr()?;
let ssh_listener = create_listener(config.ssh_bind_addr, config.reuse_port)?;
let ssh_addr = ssh_listener.local_addr()?;
Ok((http_listener, http_addr, ssh_listener, ssh_addr))
}
fn systemd_integration() {
// Tell systemd we are ready (no-op if not under systemd)
let _ = notify(false, &[NotifyState::Ready]);
// WATCHDOG_USEC is only set if watchdog is enabled *and* systemd manages us
let watchdog_usec = match std::env::var("WATCHDOG_USEC") {
Ok(v) => v.parse::<u64>().ok(),
Err(_) => None,
};
let watchdog_usec = match watchdog_usec {
Some(v) if v > 0 => v,
_ => return, // no watchdog → nothing to do
};
// systemd recommends pinging at least every WatchdogSec / 2
// we use /3 for extra margin
let interval_duration = Duration::from_micros(watchdog_usec / 3);
tokio::spawn(async move {
let mut ticker = interval(interval_duration);
loop {
ticker.tick().await;
let _ = notify(false, &[NotifyState::Watchdog]);
}
});
}
pub async fn run<T: Future>(
state: GlobalState,
http_listener: TcpListener,
ssh_listener: TcpListener,
kill: T,
) {
println!("[{:?}] - RubHub started", state.process_start.elapsed());
// Validate and start CI system
if state.config.local_podman_ci {
match rubhub_ci::validate_podman().await {
Ok(version) => {
state
.local_podman_ci_available
.store(true, Ordering::Relaxed);
println!(
"[{:?}] - CI backend: {}",
state.process_start.elapsed(),
version
);
}
Err(e) => {
eprintln!(
"[{:?}] - Warning: Local Podman CI disabled - {}",
state.process_start.elapsed(),
e
);
}
}
}
if state.ci_available() {
// Start the CI system with the new crate
let ci_config = rubhub_ci::CiConfig {
ci_root: Arc::from(state.config.ci_root.as_path()),
git_root: Arc::from(state.config.git_root.as_path()),
local_podman_available: state.local_podman_ci_available.clone(),
runner_registry: state.runner_registry.clone(),
};
let ci_handle = rubhub_ci::start(ci_config);
state.set_ci_sender(Box::new(ci_handle.sender().clone()));
// Spawn the event listener (stays in main crate since it needs repository service)
spawn_ci_event_listener(state.clone(), ci_handle);
println!("[{:?}] - CI system started", state.process_start.elapsed());
} else {
println!("[{:?}] - CI system disabled", state.process_start.elapsed());
}
// Start remote runner management if runner tokens are configured
if !state.config.runner_tokens.is_empty() {
rubhub_ci::start_runner_management(
state.runner_registry.clone(),
Arc::from(state.config.ci_root.as_path()),
);
println!(
"[{:?}] - Remote CI runner system started ({} tokens configured)",
state.process_start.elapsed(),
state.config.runner_tokens.len()
);
}
// Start runner client if configured to connect to a master instance
if let Some(ref runner_config) = state.config.runner {
rubhub_ci::start_runner_client(runner_config.clone());
println!(
"[{:?}] - Runner client connecting to {}",
state.process_start.elapsed(),
runner_config.master_url
);
}
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
let http_server = http::http_server(state.clone(), http_listener)
.await
.expect("Couldn't start http_server");
let ssh_server = ssh::ssh_server(state.clone(), ssh_listener)
.await
.expect("Couldn't start ssh_server");
println!("[{:?}] - RubHub ready", state.process_start.elapsed());
systemd_integration();
tokio::select! {
http_res = http_server => {
eprintln!("HTTP server stopped: {:?}", http_res);
}
ssh_res = ssh_server => {
eprintln!("SSH server stopped: {:?}", ssh_res);
}
signal_res = tokio::signal::ctrl_c() => {
eprintln!("Received Signal: {:?}", signal_res);
}
term_res = terminate => {
eprintln!("Received Terminate Signal: {:?}", term_res);
}
_ = kill => {
eprintln!("Received Kill!");
}
}
}
pub fn run_multi_thread(config: AppConfig, process_start: std::time::Instant) {
let runtime = Builder::new_multi_thread()
.enable_all()
.build()
.expect("Couldn't start tokio runtime");
runtime.block_on(async {
// Create listeners inside the runtime
let (http_listener, http_addr, ssh_listener, ssh_addr) =
create_listeners(&config).expect("Failed to create listeners");
// Update config with actual addresses
let config = config.update_bound_addresses(http_addr, ssh_addr);
let state = GlobalState::new(config, process_start)
.expect("Error creating GlobalState from AppConfig");
let kill = std::future::pending::<()>();
run(state, http_listener, ssh_listener, kill).await
});
}
const WORKFLOW_DIR: &str = ".rubhub/workflows";
/// Spawn the CI event listener that watches for BranchUpdated events
fn spawn_ci_event_listener(state: GlobalState, ci_handle: rubhub_ci::CiHandle) {
let rx = state.event_tx.subscribe();
tokio::spawn(async move {
let mut stream = BroadcastStream::new(rx);
while let Some(Ok(event)) = stream.next().await {
if let RepoEvent::BranchUpdated { info, branch } = event
&& let Err(e) = handle_branch_update(
&state,
&ci_handle,
&info.owner,
&info.project,
&branch,
&info.commit_hash,
)
.await
{
eprintln!(
"CI: Error handling branch update for {}/{}: {}",
info.owner, info.project, e
);
}
}
});
}
/// Handle a branch update event - check for workflows and queue jobs
async fn handle_branch_update(
state: &GlobalState,
ci_handle: &rubhub_ci::CiHandle,
owner: &str,
project: &str,
branch: &str,
commit_hash: &str,
) -> anyhow::Result<()> {
// Try to list workflow files in .rubhub/workflows/
let tree = match state
.repo
.get_git_tree(owner, project, branch, WORKFLOW_DIR)
.await
{
Ok(entries) => entries,
Err(_) => return Ok(()), // No workflows directory, nothing to do
};
// Filter for .yaml/.yml files
let workflow_files: Vec<_> = tree
.iter()
.filter(|e| e.filename.ends_with(".yaml") || e.filename.ends_with(".yml"))
.collect();
if workflow_files.is_empty() {
return Ok(());
}
// Process each workflow file
for entry in workflow_files {
let file_path = format!("{}/{}", WORKFLOW_DIR, entry.filename);
// Read workflow file
let content = match state
.repo
.get_git_file(owner, project, branch, &file_path)
.await
{
Ok(bytes) => bytes,
Err(e) => {
eprintln!("CI: Failed to read workflow file {}: {}", file_path, e);
continue;
}
};
// Parse workflow
let content_str = match String::from_utf8(content) {
Ok(s) => s,
Err(_) => {
eprintln!("CI: Workflow file {} is not valid UTF-8", file_path);
continue;
}
};
let workflow: rubhub_ci::CiWorkflow = match serde_yaml::from_str(&content_str) {
Ok(w) => w,
Err(e) => {
eprintln!("CI: Failed to parse workflow {}: {}", file_path, e);
continue;
}
};
// Check if workflow should run on this branch
if !workflow.should_run_on_branch(branch) {
continue;
}
// Queue job
let request = rubhub_ci::CiJobRequest {
owner: owner.to_string(),
project: project.to_string(),
branch: branch.to_string(),
commit_hash: commit_hash.to_string(),
workflow_name: workflow.name.clone(),
workflow,
};
if let Err(e) = ci_handle.submit(request).await {
eprintln!("CI: Failed to queue job: {}", e);
}
}
Ok(())
}