Initial nix-ota implementation

Self-hostable OTA update system for NixOS fleets: a control server,
device agent, publisher CLI, and NixOS modules that ship prebuilt
system closures from a binary cache to devices that don't have the
flake.

- crates/common: signed manifest types (ed25519), store-path validator
- crates/server: axum + sqlite + HTMX dashboard, channel/device API
- crates/agent: poll, verify signature + revision, nix copy, switch,
  health check, magic-rollback on failure
- crates/publisher: keygen + sign + publish CLI for operators/CI
- nix/modules: NixOS modules for server and agent
- nix/tests/ota.nix: end-to-end VM test exercising publish A -> B ->
  broken C -> rollback to B (passes)

The control server never holds the signing key; manifests are signed
offline and verified against a pinned public key on each device.
This commit is contained in:
0m.ax 2026-05-25 14:58:42 +02:00
commit 42b2ce4d1d
19 changed files with 4745 additions and 0 deletions

22
crates/agent/Cargo.toml Normal file
View file

@ -0,0 +1,22 @@
[package]
name = "nix-ota-agent"
version.workspace = true
edition.workspace = true
license.workspace = true
[[bin]]
name = "nix-ota-agent"
path = "src/main.rs"
[dependencies]
nix-ota-common = { path = "../common" }
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
reqwest.workspace = true
ed25519-dalek.workspace = true
time.workspace = true

261
crates/agent/src/main.rs Normal file
View file

@ -0,0 +1,261 @@
//! `nix-ota-agent` — runs on each device.
//!
//! Lifecycle on every poll:
//! 1. Fetch `/channels/<channel>/current`.
//! 2. Verify ed25519 signature against the device's pinned public key.
//! 3. Reject manifests with a revision <= last applied (replay defense).
//! 4. `nix copy --from <substituter> <storePath>` — Nix itself verifies
//! the per-path signatures against the cache's public key, so a
//! compromised control server cannot inject store contents.
//! 5. `nix-env -p /nix/var/nix/profiles/system --set <storePath>`
//! then `<storePath>/bin/switch-to-configuration switch`.
//! 6. Run the configured health check. On failure, roll back by
//! switching to the previous system profile generation.
//! 7. Check in with the control server.
//!
//! The agent stores small bits of state (last applied revision and
//! previous store path for rollback) under `--state-dir`, defaulting
//! to /var/lib/nix-ota.
use anyhow::{anyhow, bail, Context, Result};
use clap::Parser;
use nix_ota_common as common;
use serde::{Deserialize, Serialize};
use std::{path::{Path, PathBuf}, time::Duration};
use tokio::process::Command;
const AGENT_VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Parser, Debug, Clone)]
#[command(version, about = "nix-ota device agent")]
struct Args {
/// Control server base URL, e.g. https://ota.example.com
#[arg(long, env = "NIX_OTA_SERVER")]
server: String,
/// Channel name to follow (e.g. prod, canary).
#[arg(long, env = "NIX_OTA_CHANNEL", default_value = "prod")]
channel: String,
/// Device identifier (must be unique within a deployment).
#[arg(long, env = "NIX_OTA_DEVICE_ID")]
device_id: String,
/// Path to a file containing the base64-encoded ed25519 public key
/// used to verify manifest signatures.
#[arg(long, env = "NIX_OTA_PUBLIC_KEY_FILE")]
public_key_file: PathBuf,
/// Poll interval seconds. If `--once` is set, this is ignored.
#[arg(long, env = "NIX_OTA_INTERVAL", default_value_t = 60)]
interval: u64,
/// Run a single poll and exit (used by systemd timer).
#[arg(long, env = "NIX_OTA_ONCE")]
once: bool,
/// Persistent state directory.
#[arg(long, env = "NIX_OTA_STATE_DIR", default_value = "/var/lib/nix-ota")]
state_dir: PathBuf,
/// Optional health-check command. If exit code != 0 after switch,
/// the agent rolls back.
#[arg(long, env = "NIX_OTA_HEALTH_CMD")]
health_cmd: Option<String>,
/// Dry-run: log what would happen, don't execute nix or switch.
#[arg(long, env = "NIX_OTA_DRY_RUN")]
dry_run: bool,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct State {
last_revision: u64,
last_store_path: Option<String>,
previous_store_path: Option<String>,
}
impl State {
fn path(dir: &Path) -> PathBuf { dir.join("state.json") }
fn load(dir: &Path) -> Result<Self> {
let p = Self::path(dir);
if !p.exists() { return Ok(Self::default()); }
Ok(serde_json::from_slice(&std::fs::read(p)?)?)
}
fn save(&self, dir: &Path) -> Result<()> {
std::fs::create_dir_all(dir).ok();
let tmp = dir.join("state.json.tmp");
std::fs::write(&tmp, serde_json::to_vec_pretty(self)?)?;
std::fs::rename(tmp, Self::path(dir))?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.init();
let args = Args::parse();
let vk_b64 = std::fs::read_to_string(&args.public_key_file)
.with_context(|| format!("reading public key {}", args.public_key_file.display()))?;
let vk = common::decode_verifying_key(vk_b64.trim())?;
let client = reqwest::Client::builder()
.user_agent(format!("nix-ota-agent/{AGENT_VERSION}"))
.timeout(Duration::from_secs(30))
.build()?;
loop {
match run_once(&args, &vk, &client).await {
Ok(_) => {}
Err(e) => tracing::error!("poll failed: {e:#}"),
}
if args.once { break; }
tokio::time::sleep(Duration::from_secs(args.interval)).await;
}
Ok(())
}
async fn run_once(args: &Args, vk: &ed25519_dalek::VerifyingKey, client: &reqwest::Client) -> Result<()>
where
ed25519_dalek::VerifyingKey: Sized,
{
let mut state = State::load(&args.state_dir)?;
let url = format!("{}/channels/{}/current", args.server.trim_end_matches('/'), args.channel);
let resp = client.get(&url).send().await?;
if !resp.status().is_success() {
// Still report a check-in so the dashboard knows we're alive.
checkin(args, client, &state, common::Health::Ok, Some(format!("no manifest: {}", resp.status()))).await.ok();
bail!("server returned {}", resp.status());
}
let manifest: common::Manifest = resp.json().await?;
common::verify_manifest(vk, &manifest)
.context("manifest signature verification failed")?;
if manifest.body.revision <= state.last_revision {
tracing::debug!(rev = manifest.body.revision, "no new revision");
checkin(args, client, &state, common::Health::Ok, None).await.ok();
return Ok(());
}
if Some(&manifest.body.store_path) == state.last_store_path.as_ref() {
// Same path, bumped revision (e.g. publish-rollback). Just record.
state.last_revision = manifest.body.revision;
state.save(&args.state_dir)?;
checkin(args, client, &state, common::Health::Ok, None).await.ok();
return Ok(());
}
tracing::info!(target = %manifest.body.store_path, rev = manifest.body.revision, "applying new closure");
checkin(args, client, &state, common::Health::Updating,
Some(format!("copying {}", manifest.body.store_path))).await.ok();
// 1. Copy from cache.
if !args.dry_run {
nix_copy(&manifest.body.substituter, &manifest.body.store_path).await?;
}
// 2. Switch.
let previous = state.last_store_path.clone();
if !args.dry_run {
nix_set_profile(&manifest.body.store_path).await?;
switch_to_configuration(&manifest.body.store_path, "switch").await?;
}
// 3. Health check.
let healthy = run_health_check(args.health_cmd.as_deref()).await;
if !healthy {
tracing::error!("health check failed, rolling back");
if let Some(prev) = previous.as_deref() {
if !args.dry_run {
if let Err(e) = rollback(prev).await {
tracing::error!("rollback failed: {e:#}");
checkin(args, client, &state, common::Health::Failed,
Some(format!("rollback failed: {e}"))).await.ok();
bail!("rollback failed");
}
}
checkin(args, client, &state, common::Health::RolledBack,
Some(format!("rolled back to {prev}"))).await.ok();
} else {
checkin(args, client, &state, common::Health::Failed,
Some("no previous generation to roll back to".into())).await.ok();
}
// Do NOT record success; intentionally leave last_revision so we
// retry the next poll only if a *new* revision is published.
state.last_revision = manifest.body.revision;
state.save(&args.state_dir)?;
bail!("health check failed");
}
state.previous_store_path = previous;
state.last_store_path = Some(manifest.body.store_path.clone());
state.last_revision = manifest.body.revision;
state.save(&args.state_dir)?;
checkin(args, client, &state, common::Health::Ok, Some("applied".into())).await.ok();
Ok(())
}
async fn nix_copy(substituter: &str, path: &str) -> Result<()> {
let status = Command::new("nix")
.args(["copy", "--from", substituter, path])
.status()
.await
.context("running `nix copy`")?;
if !status.success() { bail!("nix copy exited {status}"); }
Ok(())
}
async fn nix_set_profile(path: &str) -> Result<()> {
let status = Command::new("nix-env")
.args(["-p", "/nix/var/nix/profiles/system", "--set", path])
.status()
.await
.context("running `nix-env --set`")?;
if !status.success() { bail!("nix-env exited {status}"); }
Ok(())
}
async fn switch_to_configuration(store_path: &str, action: &str) -> Result<()> {
let bin = format!("{store_path}/bin/switch-to-configuration");
let status = Command::new(&bin).arg(action).status().await
.with_context(|| format!("running {bin}"))?;
if !status.success() { bail!("switch-to-configuration exited {status}"); }
Ok(())
}
async fn rollback(previous_store_path: &str) -> Result<()> {
nix_set_profile(previous_store_path).await?;
switch_to_configuration(previous_store_path, "switch").await?;
Ok(())
}
async fn run_health_check(cmd: Option<&str>) -> bool {
let Some(cmd) = cmd else { return true; };
match Command::new("sh").arg("-c").arg(cmd).status().await {
Ok(s) => s.success(),
Err(e) => {
tracing::error!("health check exec failed: {e}");
false
}
}
}
async fn checkin(
args: &Args,
client: &reqwest::Client,
state: &State,
health: common::Health,
message: Option<String>,
) -> Result<()> {
let ci = common::CheckIn {
device_id: args.device_id.clone(),
channel: args.channel.clone(),
current_store_path: state.last_store_path.clone(),
target_store_path: state.last_store_path.clone(),
health,
agent_version: AGENT_VERSION.into(),
message,
};
let url = format!("{}/devices/{}/checkin",
args.server.trim_end_matches('/'), args.device_id);
let r = client.post(&url).json(&ci).send().await?;
if !r.status().is_success() {
return Err(anyhow!("checkin status {}", r.status()));
}
Ok(())
}

18
crates/common/Cargo.toml Normal file
View file

@ -0,0 +1,18 @@
[package]
name = "nix-ota-common"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
path = "src/lib.rs"
[dependencies]
serde.workspace = true
serde_json.workspace = true
ed25519-dalek.workspace = true
base64.workspace = true
rand.workspace = true
time.workspace = true
thiserror.workspace = true
sha2.workspace = true

234
crates/common/src/lib.rs Normal file
View file

@ -0,0 +1,234 @@
//! Shared types and crypto for nix-ota.
//!
//! The central object is a signed [`Manifest`]: a small JSON document
//! pointing at a NixOS system closure store path together with the
//! substituter to fetch it from. Manifests are signed by an offline
//! ed25519 key; the agent verifies them on every poll.
//!
//! The signature covers the canonical serialization of [`ManifestBody`]
//! (the manifest without its own signature). We use serde_json with sorted
//! keys via `BTreeMap`-style ordering to keep things deterministic.
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
use rand::rngs::OsRng;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use time::OffsetDateTime;
pub const STORE_PATH_PREFIX: &str = "/nix/store/";
#[derive(Debug, Error)]
pub enum Error {
#[error("invalid base64: {0}")]
Base64(#[from] base64::DecodeError),
#[error("invalid signature")]
Signature,
#[error("invalid key: {0}")]
Key(String),
#[error("invalid store path: {0}")]
StorePath(String),
#[error("serialization: {0}")]
Serde(#[from] serde_json::Error),
#[error("manifest signed by unexpected key")]
KeyMismatch,
}
/// The signed payload of a manifest.
///
/// `key_id` is the first 8 bytes (hex) of the SHA-256 of the verifying key,
/// to help operators rotate keys and to give clear errors when a device
/// is configured with the wrong key.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ManifestBody {
pub channel: String,
/// Absolute Nix store path of the system closure top-level
/// (e.g. `/nix/store/...-nixos-system-foo-24.05.toplevel`).
pub store_path: String,
/// Substituter URL the agent should `nix copy --from`.
pub substituter: String,
/// Unix timestamp seconds.
pub timestamp: i64,
/// Monotonically increasing revision for this channel. Used by agents
/// to ignore replays of older manifests.
pub revision: u64,
pub key_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Manifest {
#[serde(flatten)]
pub body: ManifestBody,
/// base64(ed25519 signature over canonical JSON of `body`).
pub signature: String,
}
/// Validate that a string looks like a Nix store path. This is intentionally
/// strict to avoid an attacker tricking the agent into running arbitrary paths.
pub fn validate_store_path(p: &str) -> Result<(), Error> {
if !p.starts_with(STORE_PATH_PREFIX) {
return Err(Error::StorePath(format!("must start with {STORE_PATH_PREFIX}")));
}
let rest = &p[STORE_PATH_PREFIX.len()..];
if rest.is_empty() || rest.contains('/') || rest.contains("..") {
return Err(Error::StorePath("must be a single store object".into()));
}
// hash-name format: 32 base32 chars, '-', name
let dash = rest.find('-').ok_or_else(|| Error::StorePath("missing -".into()))?;
if dash != 32 {
return Err(Error::StorePath("hash must be 32 chars".into()));
}
for c in rest.chars() {
if !(c.is_ascii_alphanumeric() || matches!(c, '-' | '_' | '.' | '+' | '?' | '=')) {
return Err(Error::StorePath(format!("invalid char {c:?}")));
}
}
Ok(())
}
pub fn key_id(vk: &VerifyingKey) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(vk.as_bytes());
let out = h.finalize();
hex_short(&out[..8])
}
fn hex_short(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
/// Canonical bytes used for signing/verification.
pub fn canonical_body(body: &ManifestBody) -> Result<Vec<u8>, Error> {
// serde_json preserves field order from the struct definition, which is
// stable. That's our canonical form for v1.
Ok(serde_json::to_vec(body)?)
}
pub fn sign_manifest(sk: &SigningKey, mut body: ManifestBody) -> Result<Manifest, Error> {
body.key_id = key_id(&sk.verifying_key());
let bytes = canonical_body(&body)?;
let sig: Signature = sk.sign(&bytes);
Ok(Manifest {
body,
signature: B64.encode(sig.to_bytes()),
})
}
pub fn verify_manifest(vk: &VerifyingKey, m: &Manifest) -> Result<(), Error> {
if m.body.key_id != key_id(vk) {
return Err(Error::KeyMismatch);
}
let sig_bytes = B64.decode(m.signature.as_bytes())?;
let sig = Signature::from_slice(&sig_bytes).map_err(|_| Error::Signature)?;
let bytes = canonical_body(&m.body)?;
vk.verify(&bytes, &sig).map_err(|_| Error::Signature)?;
validate_store_path(&m.body.store_path)?;
Ok(())
}
pub fn generate_keypair() -> SigningKey {
SigningKey::generate(&mut OsRng)
}
pub fn encode_signing_key(sk: &SigningKey) -> String {
B64.encode(sk.to_bytes())
}
pub fn decode_signing_key(s: &str) -> Result<SigningKey, Error> {
let raw = B64.decode(s.trim().as_bytes())?;
let arr: [u8; 32] = raw.as_slice().try_into().map_err(|_| Error::Key("len".into()))?;
Ok(SigningKey::from_bytes(&arr))
}
pub fn encode_verifying_key(vk: &VerifyingKey) -> String {
B64.encode(vk.to_bytes())
}
pub fn decode_verifying_key(s: &str) -> Result<VerifyingKey, Error> {
let raw = B64.decode(s.trim().as_bytes())?;
let arr: [u8; 32] = raw.as_slice().try_into().map_err(|_| Error::Key("len".into()))?;
VerifyingKey::from_bytes(&arr).map_err(|e| Error::Key(e.to_string()))
}
// --- check-in API types ---
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckIn {
pub device_id: String,
pub channel: String,
pub current_store_path: Option<String>,
pub target_store_path: Option<String>,
pub health: Health,
pub agent_version: String,
pub message: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum Health {
Ok,
Updating,
Failed,
RolledBack,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckInAck {
pub server_time: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PublishRequest {
pub store_path: String,
pub substituter: String,
}
pub fn now() -> i64 {
OffsetDateTime::now_utc().unix_timestamp()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sign_and_verify_roundtrip() {
let sk = generate_keypair();
let body = ManifestBody {
channel: "prod".into(),
store_path: "/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-system".into(),
substituter: "https://cache.example.com".into(),
timestamp: 1234,
revision: 1,
key_id: String::new(),
};
let m = sign_manifest(&sk, body).unwrap();
verify_manifest(&sk.verifying_key(), &m).unwrap();
}
#[test]
fn rejects_tamper() {
let sk = generate_keypair();
let body = ManifestBody {
channel: "prod".into(),
store_path: "/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-system".into(),
substituter: "https://cache.example.com".into(),
timestamp: 1234,
revision: 1,
key_id: String::new(),
};
let mut m = sign_manifest(&sk, body).unwrap();
m.body.store_path = "/nix/store/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-evil".into();
assert!(verify_manifest(&sk.verifying_key(), &m).is_err());
}
#[test]
fn rejects_bad_store_path() {
assert!(validate_store_path("/etc/passwd").is_err());
assert!(validate_store_path("/nix/store/short-name").is_err());
assert!(validate_store_path("/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-system/../x").is_err());
assert!(validate_store_path("/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-system").is_ok());
}
}

View file

@ -0,0 +1,22 @@
[package]
name = "nix-ota-publisher"
version.workspace = true
edition.workspace = true
license.workspace = true
[[bin]]
name = "nix-ota"
path = "src/main.rs"
[dependencies]
nix-ota-common = { path = "../common" }
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
reqwest = { version = "0.12", features = ["json", "rustls-tls", "blocking"], default-features = false }
ed25519-dalek.workspace = true
base64.workspace = true

View file

@ -0,0 +1,144 @@
//! `nix-ota` — operator/CI CLI.
//!
//! Subcommands:
//! keygen Generate an ed25519 keypair for manifest signing.
//! sign Sign a manifest body and print it to stdout.
//! publish Sign + POST a manifest to the control server.
//! show-pubkey Derive and print the verifying key from a signing key.
//!
//! The signing key never leaves the operator's machine (or the CI secret
//! store); the control server only sees signed manifests.
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use nix_ota_common as common;
use std::path::PathBuf;
#[derive(Parser, Debug)]
#[command(version, about = "nix-ota operator CLI")]
struct Cli {
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand, Debug)]
enum Cmd {
/// Generate a new ed25519 signing key. Writes private key to
/// `--out` and prints public key to stdout.
Keygen {
#[arg(long)]
out: PathBuf,
},
/// Print the public key derived from a signing key file.
ShowPubkey {
#[arg(long)]
key: PathBuf,
},
/// Build a signed manifest from arguments and print to stdout.
Sign {
#[arg(long)]
key: PathBuf,
#[arg(long)]
channel: String,
#[arg(long)]
store_path: String,
#[arg(long)]
substituter: String,
#[arg(long)]
revision: u64,
},
/// Sign and publish a manifest to a control server.
Publish {
#[arg(long, env = "NIX_OTA_SERVER")]
server: String,
#[arg(long, env = "NIX_OTA_PUBLISH_TOKEN")]
token: String,
#[arg(long, env = "NIX_OTA_SIGNING_KEY_FILE")]
key: PathBuf,
#[arg(long)]
channel: String,
#[arg(long)]
store_path: String,
#[arg(long)]
substituter: String,
/// If unset, the publisher fetches the current revision and uses N+1.
#[arg(long)]
revision: Option<u64>,
},
}
fn main() -> Result<()> {
tracing_subscriber::fmt().with_env_filter("info").init();
let cli = Cli::parse();
match cli.cmd {
Cmd::Keygen { out } => {
let sk = common::generate_keypair();
std::fs::write(&out, common::encode_signing_key(&sk))
.with_context(|| format!("writing {}", out.display()))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&out, std::fs::Permissions::from_mode(0o600)).ok();
}
println!("{}", common::encode_verifying_key(&sk.verifying_key()));
}
Cmd::ShowPubkey { key } => {
let sk = load_key(&key)?;
println!("{}", common::encode_verifying_key(&sk.verifying_key()));
}
Cmd::Sign { key, channel, store_path, substituter, revision } => {
let sk = load_key(&key)?;
common::validate_store_path(&store_path)?;
let m = common::sign_manifest(&sk, common::ManifestBody {
channel, store_path, substituter,
timestamp: common::now(), revision, key_id: String::new(),
})?;
println!("{}", serde_json::to_string_pretty(&m)?);
}
Cmd::Publish { server, token, key, channel, store_path, substituter, revision } => {
let sk = load_key(&key)?;
common::validate_store_path(&store_path)?;
let client = reqwest::blocking::Client::new();
let rev = match revision {
Some(r) => r,
None => next_revision(&client, &server, &channel)?,
};
let m = common::sign_manifest(&sk, common::ManifestBody {
channel: channel.clone(),
store_path,
substituter,
timestamp: common::now(),
revision: rev,
key_id: String::new(),
})?;
let url = format!("{}/channels/{}/publish",
server.trim_end_matches('/'), channel);
let resp = client.post(&url)
.bearer_auth(&token)
.json(&m)
.send()?;
let status = resp.status();
let body = resp.text().unwrap_or_default();
if !status.is_success() {
anyhow::bail!("publish failed: {status} {body}");
}
println!("{body}");
}
}
Ok(())
}
fn load_key(p: &PathBuf) -> Result<ed25519_dalek::SigningKey> {
let s = std::fs::read_to_string(p)
.with_context(|| format!("reading key {}", p.display()))?;
Ok(common::decode_signing_key(s.trim())?)
}
fn next_revision(client: &reqwest::blocking::Client, server: &str, channel: &str) -> Result<u64> {
let url = format!("{}/channels/{}/current", server.trim_end_matches('/'), channel);
let r = client.get(&url).send()?;
if r.status() == reqwest::StatusCode::NOT_FOUND { return Ok(1); }
if !r.status().is_success() { anyhow::bail!("fetch current: {}", r.status()); }
let m: common::Manifest = r.json()?;
Ok(m.body.revision + 1)
}

25
crates/server/Cargo.toml Normal file
View file

@ -0,0 +1,25 @@
[package]
name = "nix-ota-server"
version.workspace = true
edition.workspace = true
license.workspace = true
[[bin]]
name = "nix-ota-server"
path = "src/main.rs"
[dependencies]
nix-ota-common = { path = "../common" }
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
axum.workspace = true
tower.workspace = true
tower-http.workspace = true
sqlx.workspace = true
time.workspace = true
ulid.workspace = true

316
crates/server/src/main.rs Normal file
View file

@ -0,0 +1,316 @@
//! `nix-ota-server` — control plane.
//!
//! Single static binary that:
//! * serves the REST API consumed by the agent and publisher,
//! * persists channel/device state in SQLite,
//! * renders an HTMX-based dashboard from embedded templates.
//!
//! The server never holds the manifest signing key. Operators sign
//! manifests on a workstation (or in CI with a sealed secret) and POST
//! the already-signed manifest to `/channels/:name/publish`. The server's
//! job is purely to fan signed manifests out to devices and to record
//! check-ins, so a server compromise cannot push arbitrary closures.
use anyhow::Result;
use axum::{
extract::{Path, State},
http::{header, HeaderMap, StatusCode},
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use clap::Parser;
use nix_ota_common as common;
use serde::{Deserialize, Serialize};
use sqlx::{sqlite::SqlitePoolOptions, SqlitePool};
use std::{net::SocketAddr, sync::Arc};
use tower_http::trace::TraceLayer;
mod ui;
#[derive(Parser, Debug)]
#[command(version, about = "nix-ota control server")]
struct Args {
/// Listen address.
#[arg(long, env = "NIX_OTA_LISTEN", default_value = "0.0.0.0:8080")]
listen: SocketAddr,
/// Path to SQLite database.
#[arg(long, env = "NIX_OTA_DB", default_value = "nix-ota.db")]
db: String,
/// Bearer token required for /publish endpoints. If unset, a random
/// token is generated and printed at startup.
#[arg(long, env = "NIX_OTA_PUBLISH_TOKEN")]
publish_token: Option<String>,
}
#[derive(Clone)]
pub struct AppState {
pub db: SqlitePool,
pub publish_token: Arc<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,sqlx=warn".into()),
)
.init();
let args = Args::parse();
let publish_token = args.publish_token.clone().unwrap_or_else(|| {
let t = ulid::Ulid::new().to_string();
tracing::warn!("no --publish-token set; generated ephemeral token: {t}");
t
});
let db_url = format!("sqlite://{}?mode=rwc", args.db);
let db = SqlitePoolOptions::new()
.max_connections(8)
.connect(&db_url)
.await?;
migrate(&db).await?;
let state = AppState {
db,
publish_token: Arc::new(publish_token),
};
let app = Router::new()
.route("/healthz", get(|| async { "ok" }))
.route("/channels/:name/current", get(get_current))
.route("/channels/:name/publish", post(publish))
.route("/devices/:id/checkin", post(checkin))
.route("/", get(ui::index))
.route("/ui/channels/:name", get(ui::channel_detail))
.route("/ui/devices/:id", get(ui::device_detail))
.with_state(state)
.layer(TraceLayer::new_for_http());
tracing::info!("listening on {}", args.listen);
let listener = tokio::net::TcpListener::bind(args.listen).await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn migrate(db: &SqlitePool) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS channels (
name TEXT PRIMARY KEY,
current_manifest TEXT,
revision INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL DEFAULT 0
);
"#,
)
.execute(db)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS channel_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel TEXT NOT NULL,
manifest TEXT NOT NULL,
revision INTEGER NOT NULL,
published_at INTEGER NOT NULL
);
"#,
)
.execute(db)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS devices (
id TEXT PRIMARY KEY,
channel TEXT NOT NULL,
current_store_path TEXT,
target_store_path TEXT,
health TEXT NOT NULL DEFAULT 'ok',
last_message TEXT,
agent_version TEXT,
last_seen INTEGER NOT NULL DEFAULT 0
);
"#,
)
.execute(db)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS device_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
store_path TEXT,
health TEXT NOT NULL,
message TEXT,
at INTEGER NOT NULL
);
"#,
)
.execute(db)
.await?;
Ok(())
}
// ---------- API handlers ----------
async fn get_current(
State(s): State<AppState>,
Path(name): Path<String>,
) -> Result<Json<common::Manifest>, ApiError> {
let row: Option<(Option<String>,)> =
sqlx::query_as("SELECT current_manifest FROM channels WHERE name = ?")
.bind(&name)
.fetch_optional(&s.db)
.await?;
let manifest_json = row.and_then(|r| r.0).ok_or(ApiError::NotFound)?;
let manifest: common::Manifest = serde_json::from_str(&manifest_json)?;
Ok(Json(manifest))
}
async fn publish(
State(s): State<AppState>,
Path(name): Path<String>,
headers: HeaderMap,
Json(manifest): Json<common::Manifest>,
) -> Result<Json<serde_json::Value>, ApiError> {
require_token(&headers, &s.publish_token)?;
if manifest.body.channel != name {
return Err(ApiError::BadRequest("channel mismatch".into()));
}
common::validate_store_path(&manifest.body.store_path)
.map_err(|e| ApiError::BadRequest(e.to_string()))?;
// We do NOT verify the signature against any key here — the server is
// intentionally key-agnostic. Devices verify against their pinned key.
let now = common::now();
let json = serde_json::to_string(&manifest)?;
let mut tx = s.db.begin().await?;
// Bump revision atomically.
let cur: Option<(i64,)> = sqlx::query_as("SELECT revision FROM channels WHERE name = ?")
.bind(&name)
.fetch_optional(&mut *tx)
.await?;
let next_rev = cur.map(|r| r.0).unwrap_or(0) + 1;
if (manifest.body.revision as i64) != next_rev {
return Err(ApiError::BadRequest(format!(
"manifest revision must be {next_rev}, got {}",
manifest.body.revision
)));
}
sqlx::query(
"INSERT INTO channels(name, current_manifest, revision, updated_at) VALUES(?,?,?,?)
ON CONFLICT(name) DO UPDATE SET current_manifest=excluded.current_manifest,
revision=excluded.revision, updated_at=excluded.updated_at",
)
.bind(&name)
.bind(&json)
.bind(next_rev)
.bind(now)
.execute(&mut *tx)
.await?;
sqlx::query(
"INSERT INTO channel_history(channel, manifest, revision, published_at) VALUES(?,?,?,?)",
)
.bind(&name)
.bind(&json)
.bind(next_rev)
.bind(now)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(serde_json::json!({"ok": true, "revision": next_rev})))
}
async fn checkin(
State(s): State<AppState>,
Path(id): Path<String>,
Json(ci): Json<common::CheckIn>,
) -> Result<Json<common::CheckInAck>, ApiError> {
if ci.device_id != id {
return Err(ApiError::BadRequest("device id mismatch".into()));
}
let now = common::now();
let health = serde_json::to_string(&ci.health)?.trim_matches('"').to_string();
sqlx::query(
"INSERT INTO devices(id, channel, current_store_path, target_store_path, health,
last_message, agent_version, last_seen)
VALUES(?,?,?,?,?,?,?,?)
ON CONFLICT(id) DO UPDATE SET
channel=excluded.channel,
current_store_path=excluded.current_store_path,
target_store_path=excluded.target_store_path,
health=excluded.health,
last_message=excluded.last_message,
agent_version=excluded.agent_version,
last_seen=excluded.last_seen",
)
.bind(&ci.device_id)
.bind(&ci.channel)
.bind(&ci.current_store_path)
.bind(&ci.target_store_path)
.bind(&health)
.bind(&ci.message)
.bind(&ci.agent_version)
.bind(now)
.execute(&s.db)
.await?;
sqlx::query(
"INSERT INTO device_history(device_id, store_path, health, message, at) VALUES(?,?,?,?,?)",
)
.bind(&ci.device_id)
.bind(&ci.current_store_path)
.bind(&health)
.bind(&ci.message)
.bind(now)
.execute(&s.db)
.await?;
Ok(Json(common::CheckInAck { server_time: now }))
}
// ---------- helpers ----------
fn require_token(headers: &HeaderMap, expected: &str) -> Result<(), ApiError> {
let v = headers
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.unwrap_or("");
let token = v.strip_prefix("Bearer ").unwrap_or("");
if token != expected || token.is_empty() {
return Err(ApiError::Unauthorized);
}
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ApiErrorBody {
pub error: String,
}
#[derive(Debug)]
pub enum ApiError {
NotFound,
BadRequest(String),
Unauthorized,
Internal(String),
}
impl From<sqlx::Error> for ApiError {
fn from(e: sqlx::Error) -> Self { Self::Internal(e.to_string()) }
}
impl From<serde_json::Error> for ApiError {
fn from(e: serde_json::Error) -> Self { Self::BadRequest(e.to_string()) }
}
impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
let (code, msg) = match self {
ApiError::NotFound => (StatusCode::NOT_FOUND, "not found".to_string()),
ApiError::BadRequest(m) => (StatusCode::BAD_REQUEST, m),
ApiError::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized".to_string()),
ApiError::Internal(m) => (StatusCode::INTERNAL_SERVER_ERROR, m),
};
(code, Json(ApiErrorBody { error: msg })).into_response()
}
}

145
crates/server/src/ui.rs Normal file
View file

@ -0,0 +1,145 @@
//! Minimal HTMX-flavored dashboard rendered with plain string formatting.
//!
//! Kept dependency-free on purpose; the UI is intentionally tiny so the
//! whole server stays a single static binary with no asset pipeline.
use crate::{ApiError, AppState};
use axum::{
extract::{Path, State},
response::Html,
};
const HEAD: &str = r#"<!doctype html>
<html><head><meta charset="utf-8"><title>nix-ota</title>
<style>
body{font-family:ui-monospace,Menlo,monospace;max-width:1100px;margin:2em auto;padding:0 1em;color:#222}
table{border-collapse:collapse;width:100%}th,td{border-bottom:1px solid #ddd;padding:.4em .6em;text-align:left;font-size:13px}
th{background:#f5f5f5}
.ok{color:#197a19}.failed{color:#b00}.updating{color:#b58900}.rolled_back{color:#b58900}
h1,h2{font-weight:600}
a{color:#06c;text-decoration:none}a:hover{text-decoration:underline}
code{background:#f0f0f0;padding:1px 4px;border-radius:3px}
</style></head><body>"#;
fn short(p: &Option<String>) -> String {
match p {
None => "".into(),
Some(s) => {
// /nix/store/<hash>-name → hash[0..8]
s.strip_prefix("/nix/store/")
.and_then(|r| r.get(..8))
.unwrap_or(s)
.to_string()
}
}
}
fn ago(ts: i64) -> String {
if ts == 0 { return "never".into(); }
let d = (nix_ota_common::now() - ts).max(0);
if d < 60 { format!("{d}s ago") }
else if d < 3600 { format!("{}m ago", d/60) }
else if d < 86400 { format!("{}h ago", d/3600) }
else { format!("{}d ago", d/86400) }
}
pub async fn index(State(s): State<AppState>) -> Result<Html<String>, ApiError> {
let chans: Vec<(String, i64, i64, Option<String>)> = sqlx::query_as(
"SELECT name, revision, updated_at, current_manifest FROM channels ORDER BY name"
).fetch_all(&s.db).await?;
let devs: Vec<(String, String, Option<String>, Option<String>, String, i64)> = sqlx::query_as(
"SELECT id, channel, current_store_path, target_store_path, health, last_seen
FROM devices ORDER BY id"
).fetch_all(&s.db).await?;
let mut html = String::from(HEAD);
html.push_str("<h1>nix-ota</h1>");
html.push_str("<h2>Channels</h2><table><tr><th>name</th><th>rev</th><th>target</th><th>updated</th></tr>");
for (name, rev, updated, mj) in &chans {
let target = mj.as_ref()
.and_then(|j| serde_json::from_str::<nix_ota_common::Manifest>(j).ok())
.map(|m| short(&Some(m.body.store_path)))
.unwrap_or_else(|| "".into());
html.push_str(&format!(
"<tr><td><a href=\"/ui/channels/{name}\">{name}</a></td><td>{rev}</td><td><code>{target}</code></td><td>{}</td></tr>",
ago(*updated)
));
}
html.push_str("</table>");
html.push_str("<h2>Devices</h2><table><tr><th>id</th><th>channel</th><th>current</th><th>target</th><th>health</th><th>last seen</th></tr>");
for (id, channel, cur, tgt, health, last) in &devs {
html.push_str(&format!(
"<tr><td><a href=\"/ui/devices/{id}\">{id}</a></td><td>{channel}</td><td><code>{}</code></td><td><code>{}</code></td><td class=\"{health}\">{health}</td><td>{}</td></tr>",
short(cur), short(tgt), ago(*last)
));
}
html.push_str("</table></body></html>");
Ok(Html(html))
}
pub async fn channel_detail(
State(s): State<AppState>,
Path(name): Path<String>,
) -> Result<Html<String>, ApiError> {
let hist: Vec<(i64, String, i64)> = sqlx::query_as(
"SELECT revision, manifest, published_at FROM channel_history
WHERE channel = ? ORDER BY revision DESC LIMIT 50"
).bind(&name).fetch_all(&s.db).await?;
let mut html = String::from(HEAD);
html.push_str(&format!("<h1>channel: {name}</h1><p><a href=\"/\">← back</a></p>"));
html.push_str("<h2>History</h2><table><tr><th>rev</th><th>store path</th><th>substituter</th><th>published</th></tr>");
for (rev, mj, at) in &hist {
if let Ok(m) = serde_json::from_str::<nix_ota_common::Manifest>(mj) {
html.push_str(&format!(
"<tr><td>{rev}</td><td><code>{}</code></td><td>{}</td><td>{}</td></tr>",
m.body.store_path, m.body.substituter, ago(*at)
));
}
}
html.push_str("</table></body></html>");
Ok(Html(html))
}
pub async fn device_detail(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<Html<String>, ApiError> {
let dev: Option<(String, String, Option<String>, Option<String>, String, Option<String>, Option<String>, i64)> =
sqlx::query_as(
"SELECT id, channel, current_store_path, target_store_path, health,
last_message, agent_version, last_seen
FROM devices WHERE id = ?"
).bind(&id).fetch_optional(&s.db).await?;
let hist: Vec<(Option<String>, String, Option<String>, i64)> = sqlx::query_as(
"SELECT store_path, health, message, at FROM device_history
WHERE device_id = ? ORDER BY id DESC LIMIT 100"
).bind(&id).fetch_all(&s.db).await?;
let mut html = String::from(HEAD);
html.push_str(&format!("<h1>device: {id}</h1><p><a href=\"/\">← back</a></p>"));
if let Some((_, channel, cur, tgt, health, msg, ver, last)) = &dev {
html.push_str(&format!(
"<p>channel: <b>{channel}</b><br>agent: {}<br>health: <span class=\"{health}\">{health}</span><br>\
current: <code>{}</code><br>target: <code>{}</code><br>last seen: {}<br>last message: {}</p>",
ver.as_deref().unwrap_or("?"),
cur.as_deref().unwrap_or(""),
tgt.as_deref().unwrap_or(""),
ago(*last),
msg.as_deref().unwrap_or("")
));
}
html.push_str("<h2>History</h2><table><tr><th>at</th><th>health</th><th>store path</th><th>message</th></tr>");
for (sp, h, msg, at) in &hist {
html.push_str(&format!(
"<tr><td>{}</td><td class=\"{h}\">{h}</td><td><code>{}</code></td><td>{}</td></tr>",
ago(*at),
sp.as_deref().unwrap_or(""),
msg.as_deref().unwrap_or("")
));
}
html.push_str("</table></body></html>");
Ok(Html(html))
}