Login
4 branches 0 tags
Ben (T14/NixOS) Improved flake 41eb128 11 days ago 252 Commits
rubhub / src / lib.rs
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use sd_notify::NotifyState;
use sd_notify::notify;
use tokio::net::{TcpListener, TcpSocket};
use tokio::runtime::Builder;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

mod controllers;
mod extractors;
mod http;
mod models;
mod services;
mod ssh;
mod views;

pub use models::{Project, ProjectSummary, UserModel};
pub use rubhub_auth_store::User;
pub use rubhub_state::{AccessType, AppConfig, ContentPage, GlobalState, RepoEvent, RepoEventInfo};
use tokio::time::interval;

/// Create and bind a TCP listener with appropriate socket options
pub fn create_listener(addr: SocketAddr, reuse_port: bool) -> io::Result<TcpListener> {
    let socket = if addr.is_ipv4() {
        TcpSocket::new_v4()?
    } else {
        TcpSocket::new_v6()?
    };

    socket.set_reuseaddr(true)?;

    #[cfg(target_os = "linux")]
    socket.set_reuseport(reuse_port)?;

    socket.bind(addr)?;
    socket.listen(1024)
}

/// Create both HTTP and SSH listeners, returning listeners and their bound addresses
pub fn create_listeners(
    config: &AppConfig,
) -> io::Result<(TcpListener, SocketAddr, TcpListener, SocketAddr)> {
    let http_listener = create_listener(config.http_bind_addr, config.reuse_port)?;
    let http_addr = http_listener.local_addr()?;

    let ssh_listener = create_listener(config.ssh_bind_addr, config.reuse_port)?;
    let ssh_addr = ssh_listener.local_addr()?;

    Ok((http_listener, http_addr, ssh_listener, ssh_addr))
}

fn systemd_integration() {
    // Tell systemd we are ready (no-op if not under systemd)
    let _ = notify(false, &[NotifyState::Ready]);

    // WATCHDOG_USEC is only set if watchdog is enabled *and* systemd manages us
    let watchdog_usec = match std::env::var("WATCHDOG_USEC") {
        Ok(v) => v.parse::<u64>().ok(),
        Err(_) => None,
    };

    let watchdog_usec = match watchdog_usec {
        Some(v) if v > 0 => v,
        _ => return, // no watchdog → nothing to do
    };

    // systemd recommends pinging at least every WatchdogSec / 2
    // we use /3 for extra margin
    let interval_duration = Duration::from_micros(watchdog_usec / 3);

    tokio::spawn(async move {
        let mut ticker = interval(interval_duration);

        loop {
            ticker.tick().await;
            let _ = notify(false, &[NotifyState::Watchdog]);
        }
    });
}

pub async fn run<T: Future>(
    state: GlobalState,
    http_listener: TcpListener,
    ssh_listener: TcpListener,
    kill: T,
) {
    println!("[{:?}] - RubHub started", state.process_start.elapsed());

    // Validate and start CI system
    if state.config.local_podman_ci {
        match rubhub_ci::validate_podman().await {
            Ok(version) => {
                state
                    .local_podman_ci_available
                    .store(true, Ordering::Relaxed);
                println!(
                    "[{:?}] - CI backend: {}",
                    state.process_start.elapsed(),
                    version
                );
            }
            Err(e) => {
                eprintln!(
                    "[{:?}] - Warning: Local Podman CI disabled - {}",
                    state.process_start.elapsed(),
                    e
                );
            }
        }
    }

    if state.ci_available() {
        // Start the CI system with the new crate
        let ci_config = rubhub_ci::CiConfig {
            ci_root: Arc::from(state.config.ci_root.as_path()),
            git_root: Arc::from(state.config.git_root.as_path()),
            local_podman_available: state.local_podman_ci_available.clone(),
            runner_registry: state.runner_registry.clone(),
        };
        let ci_handle = rubhub_ci::start(ci_config);
        state.set_ci_sender(Box::new(ci_handle.sender().clone()));

        // Spawn the event listener (stays in main crate since it needs repository service)
        spawn_ci_event_listener(state.clone(), ci_handle);

        println!("[{:?}] - CI system started", state.process_start.elapsed());
    } else {
        println!("[{:?}] - CI system disabled", state.process_start.elapsed());
    }

    // Start remote runner management if runner tokens are configured
    if !state.config.runner_tokens.is_empty() {
        rubhub_ci::start_runner_management(
            state.runner_registry.clone(),
            Arc::from(state.config.ci_root.as_path()),
        );
        println!(
            "[{:?}] - Remote CI runner system started ({} tokens configured)",
            state.process_start.elapsed(),
            state.config.runner_tokens.len()
        );
    }

    // Start runner client if configured to connect to a master instance
    if let Some(ref runner_config) = state.config.runner {
        rubhub_ci::start_runner_client(runner_config.clone());
        println!(
            "[{:?}] - Runner client connecting to {}",
            state.process_start.elapsed(),
            runner_config.master_url
        );
    }

    #[cfg(unix)]
    let terminate = async {
        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    let http_server = http::http_server(state.clone(), http_listener)
        .await
        .expect("Couldn't start http_server");
    let ssh_server = ssh::ssh_server(state.clone(), ssh_listener)
        .await
        .expect("Couldn't start ssh_server");

    println!("[{:?}] - RubHub ready", state.process_start.elapsed());

    systemd_integration();

    tokio::select! {
        http_res = http_server => {
            eprintln!("HTTP server stopped: {:?}", http_res);
        }
        ssh_res = ssh_server => {
            eprintln!("SSH server stopped: {:?}", ssh_res);
        }
        signal_res = tokio::signal::ctrl_c() => {
            eprintln!("Received Signal: {:?}", signal_res);
        }
        term_res = terminate => {
            eprintln!("Received Terminate Signal: {:?}", term_res);
        }
        _ = kill => {
            eprintln!("Received Kill!");
        }
    }
}

pub fn run_multi_thread(config: AppConfig, process_start: std::time::Instant) {
    let runtime = Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Couldn't start tokio runtime");

    runtime.block_on(async {
        // Create listeners inside the runtime
        let (http_listener, http_addr, ssh_listener, ssh_addr) =
            create_listeners(&config).expect("Failed to create listeners");

        // Update config with actual addresses
        let config = config.update_bound_addresses(http_addr, ssh_addr);

        let state = GlobalState::new(config, process_start)
            .expect("Error creating GlobalState from AppConfig");

        let kill = std::future::pending::<()>();
        run(state, http_listener, ssh_listener, kill).await
    });
}

const WORKFLOW_DIR: &str = ".rubhub/workflows";

/// Spawn the CI event listener that watches for BranchUpdated events
fn spawn_ci_event_listener(state: GlobalState, ci_handle: rubhub_ci::CiHandle) {
    let rx = state.event_tx.subscribe();

    tokio::spawn(async move {
        let mut stream = BroadcastStream::new(rx);

        while let Some(Ok(event)) = stream.next().await {
            if let RepoEvent::BranchUpdated { info, branch } = event
                && let Err(e) = handle_branch_update(
                    &state,
                    &ci_handle,
                    &info.owner,
                    &info.project,
                    &branch,
                    &info.commit_hash,
                )
                .await
            {
                eprintln!(
                    "CI: Error handling branch update for {}/{}: {}",
                    info.owner, info.project, e
                );
            }
        }
    });
}

/// Handle a branch update event - check for workflows and queue jobs
async fn handle_branch_update(
    state: &GlobalState,
    ci_handle: &rubhub_ci::CiHandle,
    owner: &str,
    project: &str,
    branch: &str,
    commit_hash: &str,
) -> anyhow::Result<()> {
    // Try to list workflow files in .rubhub/workflows/
    let tree = match state
        .repo
        .get_git_tree(owner, project, branch, WORKFLOW_DIR)
        .await
    {
        Ok(entries) => entries,
        Err(_) => return Ok(()), // No workflows directory, nothing to do
    };

    // Filter for .yaml/.yml files
    let workflow_files: Vec<_> = tree
        .iter()
        .filter(|e| e.filename.ends_with(".yaml") || e.filename.ends_with(".yml"))
        .collect();

    if workflow_files.is_empty() {
        return Ok(());
    }

    // Process each workflow file
    for entry in workflow_files {
        let file_path = format!("{}/{}", WORKFLOW_DIR, entry.filename);

        // Read workflow file
        let content = match state
            .repo
            .get_git_file(owner, project, branch, &file_path)
            .await
        {
            Ok(bytes) => bytes,
            Err(e) => {
                eprintln!("CI: Failed to read workflow file {}: {}", file_path, e);
                continue;
            }
        };

        // Parse workflow
        let content_str = match String::from_utf8(content) {
            Ok(s) => s,
            Err(_) => {
                eprintln!("CI: Workflow file {} is not valid UTF-8", file_path);
                continue;
            }
        };

        let workflow: rubhub_ci::CiWorkflow = match serde_yaml::from_str(&content_str) {
            Ok(w) => w,
            Err(e) => {
                eprintln!("CI: Failed to parse workflow {}: {}", file_path, e);
                continue;
            }
        };

        // Check if workflow should run on this branch
        if !workflow.should_run_on_branch(branch) {
            continue;
        }

        // Queue job
        let request = rubhub_ci::CiJobRequest {
            owner: owner.to_string(),
            project: project.to_string(),
            branch: branch.to_string(),
            commit_hash: commit_hash.to_string(),
            workflow_name: workflow.name.clone(),
            workflow,
        };

        if let Err(e) = ci_handle.submit(request).await {
            eprintln!("CI: Failed to queue job: {}", e);
        }
    }

    Ok(())
}