Login
4 branches 0 tags
Ben (Desktop/Arch) Allow for tags in issues d1f00f7 12 days ago 249 Commits
rubhub / src / lib.rs
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;

use sd_notify::NotifyState;
use sd_notify::notify;
use tokio::net::{TcpListener, TcpSocket};
use tokio::runtime::Builder;

mod controllers;
mod extractors;
mod http;
mod models;
mod services;
mod ssh;
mod views;

pub use models::{Project, ProjectSummary, UserModel};
pub use rubhub_auth_store::User;
pub use rubhub_state::{AccessType, AppConfig, ContentPage, GlobalState, RepoEvent, RepoEventInfo};
use tokio::time::interval;

/// Create and bind a TCP listener with appropriate socket options
pub fn create_listener(addr: SocketAddr, reuse_port: bool) -> io::Result<TcpListener> {
    let socket = if addr.is_ipv4() {
        TcpSocket::new_v4()?
    } else {
        TcpSocket::new_v6()?
    };

    socket.set_reuseaddr(true)?;

    #[cfg(target_os = "linux")]
    socket.set_reuseport(reuse_port)?;

    socket.bind(addr)?;
    socket.listen(1024)
}

/// Create both HTTP and SSH listeners, returning listeners and their bound addresses
pub fn create_listeners(
    config: &AppConfig,
) -> io::Result<(TcpListener, SocketAddr, TcpListener, SocketAddr)> {
    let http_listener = create_listener(config.http_bind_addr, config.reuse_port)?;
    let http_addr = http_listener.local_addr()?;

    let ssh_listener = create_listener(config.ssh_bind_addr, config.reuse_port)?;
    let ssh_addr = ssh_listener.local_addr()?;

    Ok((http_listener, http_addr, ssh_listener, ssh_addr))
}

fn systemd_integration() {
    // Tell systemd we are ready (no-op if not under systemd)
    let _ = notify(false, &[NotifyState::Ready]);

    // WATCHDOG_USEC is only set if watchdog is enabled *and* systemd manages us
    let watchdog_usec = match std::env::var("WATCHDOG_USEC") {
        Ok(v) => v.parse::<u64>().ok(),
        Err(_) => None,
    };

    let watchdog_usec = match watchdog_usec {
        Some(v) if v > 0 => v,
        _ => return, // no watchdog → nothing to do
    };

    // systemd recommends pinging at least every WatchdogSec / 2
    // we use /3 for extra margin
    let interval_duration = Duration::from_micros(watchdog_usec / 3);

    tokio::spawn(async move {
        let mut ticker = interval(interval_duration);

        loop {
            ticker.tick().await;
            let _ = notify(false, &[NotifyState::Watchdog]);
        }
    });
}

pub async fn run<T: Future>(
    state: GlobalState,
    http_listener: TcpListener,
    ssh_listener: TcpListener,
    kill: T,
) {
    println!("[{:?}] - RubHub started", state.process_start.elapsed());

    // Validate and start CI system
    if state.config.local_podman_ci {
        match services::ci::validate_podman().await {
            Ok(version) => {
                state
                    .local_podman_ci_available
                    .store(true, Ordering::Relaxed);
                println!(
                    "[{:?}] - CI backend: {}",
                    state.process_start.elapsed(),
                    version
                );
            }
            Err(e) => {
                eprintln!(
                    "[{:?}] - Warning: Local Podman CI disabled - {}",
                    state.process_start.elapsed(),
                    e
                );
            }
        }
    }

    if state.ci_available() {
        let (ci_tx, ci_rx) = tokio::sync::mpsc::channel(256);
        state.set_ci_sender(Box::new(ci_tx.clone()));
        services::ci::spawn_ci_listener(state.clone(), ci_tx);
        services::ci::spawn_ci_worker(state.clone(), ci_rx);
        println!("[{:?}] - CI system started", state.process_start.elapsed());
    } else {
        println!("[{:?}] - CI system disabled", state.process_start.elapsed());
    }

    // Start remote runner management if runner tokens are configured
    if !state.config.runner_tokens.is_empty() {
        services::runner::spawn_heartbeat_checker(state.clone());
        services::runner::spawn_server_heartbeat_sender(state.clone());
        println!(
            "[{:?}] - Remote CI runner system started ({} tokens configured)",
            state.process_start.elapsed(),
            state.config.runner_tokens.len()
        );
    }

    // Start runner client if configured to connect to a master instance
    if let Some(ref runner_config) = state.config.runner {
        services::runner_client::spawn_runner_client(state.clone(), runner_config.clone());
        println!(
            "[{:?}] - Runner client connecting to {}",
            state.process_start.elapsed(),
            runner_config.master_url
        );
    }

    #[cfg(unix)]
    let terminate = async {
        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    let http_server = http::http_server(state.clone(), http_listener)
        .await
        .expect("Couldn't start http_server");
    let ssh_server = ssh::ssh_server(state.clone(), ssh_listener)
        .await
        .expect("Couldn't start ssh_server");

    println!("[{:?}] - RubHub ready", state.process_start.elapsed());

    systemd_integration();

    tokio::select! {
        http_res = http_server => {
            eprintln!("HTTP server stopped: {:?}", http_res);
        }
        ssh_res = ssh_server => {
            eprintln!("SSH server stopped: {:?}", ssh_res);
        }
        signal_res = tokio::signal::ctrl_c() => {
            eprintln!("Received Signal: {:?}", signal_res);
        }
        term_res = terminate => {
            eprintln!("Received Terminate Signal: {:?}", term_res);
        }
        _ = kill => {
            eprintln!("Received Kill!");
        }
    }
}

pub fn run_multi_thread(config: AppConfig, process_start: std::time::Instant) {
    let runtime = Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Couldn't start tokio runtime");

    runtime.block_on(async {
        // Create listeners inside the runtime
        let (http_listener, http_addr, ssh_listener, ssh_addr) =
            create_listeners(&config).expect("Failed to create listeners");

        // Update config with actual addresses
        let config = config.update_bound_addresses(http_addr, ssh_addr);

        let state = GlobalState::new(config, process_start)
            .expect("Error creating GlobalState from AppConfig");

        let kill = std::future::pending::<()>();
        run(state, http_listener, ssh_listener, kill).await
    });
}