init: newsboard — Rust Axum MQTT->SSE notification board

This commit is contained in:
akn
2026-06-09 14:03:08 +09:00
commit e59f268460
5 changed files with 3078 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
target/
Generated
+2526
View File
File diff suppressed because it is too large Load Diff
+18
View File
@@ -0,0 +1,18 @@
[package]
name = "newsboard"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.7", features = ["macros"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
futures = "0.3"
rumqttc = "0.24"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = { version = "0.4", features = ["serde"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rust-embed = "8"
+303
View File
@@ -0,0 +1,303 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>NewsBoard</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
background: #1e1f2c;
color: #cdd6f4;
font-family: 'SF Mono', 'Cascadia Code', 'Fira Code', monospace;
padding: 2rem;
max-width: 900px;
margin: 0 auto;
}
h1 {
font-size: 1.4rem;
font-weight: 600;
color: #f5e0dc;
margin-bottom: 0.5rem;
letter-spacing: 0.05em;
}
.subtitle {
color: #6c7086;
font-size: 0.8rem;
margin-bottom: 2rem;
}
#notifications {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.card {
background: #262837;
border-radius: 8px;
padding: 0.8rem 1rem;
border-left: 4px solid #6c7086;
animation: slideIn 0.3s ease;
position: relative;
}
@keyframes slideIn {
from { opacity: 0; transform: translateY(-8px); }
to { opacity: 1; transform: translateY(0); }
}
.card.critical { border-left-color: #f38ba8; background: #2a1e24; }
.card.high { border-left-color: #fab387; background: #2a241e; }
.card.medium { border-left-color: #f9e2af; background: #2a281e; }
.card.info { border-left-color: #89b4fa; background: #1e242a; }
.card .meta {
display: flex;
align-items: center;
gap: 0.6rem;
margin-bottom: 0.3rem;
}
.card .badge {
font-size: 0.65rem;
font-weight: 700;
padding: 0.15rem 0.5rem;
border-radius: 4px;
text-transform: uppercase;
letter-spacing: 0.08em;
}
.badge.critical { background: #f38ba8; color: #1e1f2c; }
.badge.high { background: #fab387; color: #1e1f2c; }
.badge.medium { background: #f9e2af; color: #1e1f2c; }
.badge.info { background: #89b4fa; color: #1e1f2c; }
.card .topic {
color: #6c7086;
font-size: 0.7rem;
}
.card .time {
color: #585b70;
font-size: 0.65rem;
margin-left: auto;
}
.card .payload {
color: #cdd6f4;
font-size: 0.85rem;
line-height: 1.5;
word-break: break-word;
}
.card .payload .key {
color: #89b4fa;
}
.card .payload .str {
color: #a6e3a1;
}
.card .payload .num {
color: #fab387;
}
.card .payload .bool {
color: #cba6f7;
}
.empty {
color: #6c7086;
text-align: center;
padding: 3rem;
font-size: 0.9rem;
}
.new-highlight {
animation: pulse 0.6s ease;
}
@keyframes pulse {
0% { box-shadow: 0 0 0 0 rgba(137, 180, 250, 0.3); }
70% { box-shadow: 0 0 0 8px rgba(137, 180, 250, 0); }
100% { box-shadow: 0 0 0 0 rgba(137, 180, 250, 0); }
}
.status {
display: flex;
align-items: center;
gap: 0.5rem;
margin-bottom: 1.5rem;
font-size: 0.75rem;
color: #6c7086;
}
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: #6c7086;
}
.status-dot.connected { background: #a6e3a1; }
.legend {
display: flex;
gap: 1rem;
margin-bottom: 1.5rem;
font-size: 0.7rem;
color: #6c7086;
flex-wrap: wrap;
}
.legend-item {
display: flex;
align-items: center;
gap: 0.3rem;
}
.legend-dot {
width: 10px;
height: 10px;
border-radius: 50%;
}
</style>
</head>
<body>
<h1>📬 NewsBoard</h1>
<div class="subtitle">notification center · sugarsource.club</div>
<div class="status">
<span class="status-dot" id="statusDot"></span>
<span id="statusText">connecting...</span>
</div>
<div class="legend">
<span class="legend-item"><span class="legend-dot" style="background:#f38ba8"></span> Critical</span>
<span class="legend-item"><span class="legend-dot" style="background:#fab387"></span> High</span>
<span class="legend-item"><span class="legend-dot" style="background:#f9e2af"></span> Medium</span>
<span class="legend-item"><span class="legend-dot" style="background:#89b4fa"></span> Info</span>
</div>
<div id="notifications">
<div class="empty">waiting for notifications...</div>
</div>
<script>
const el = document.getElementById('notifications');
const statusDot = document.getElementById('statusDot');
const statusText = document.getElementById('statusText');
const MAX_VISIBLE = 200;
let hasHistory = false;
function severityClass(s) {
const map = { critical: 'critical', high: 'high', medium: 'medium' };
return map[s] || 'info';
}
function formatTime(iso) {
const d = new Date(iso);
return d.toLocaleString('zh-CN', {
month: '2-digit', day: '2-digit',
hour: '2-digit', minute: '2-digit', second: '2-digit',
hour12: false
});
}
function renderHighlightedJson(payload) {
try {
const parsed = JSON.parse(payload);
return JSON.stringify(parsed, null, 2)
.replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;')
.replace(/"([^"]+)":/g, '<span class="key">"$1"</span>:')
.replace(/: "([^"]+)"/g, ': <span class="str">"$1"</span>')
.replace(/: (\d+\.?\d*)/g, ': <span class="num">$1</span>')
.replace(/: (true|false)/g, ': <span class="bool">$1</span>');
} catch {
return escapeHtml(payload);
}
}
function escapeHtml(text) {
const d = document.createElement('div');
d.textContent = text;
return d.innerHTML;
}
function addCard(notif, animate) {
const card = document.createElement('div');
card.className = `card ${severityClass(notif.severity)}`;
if (animate) card.classList.add('new-highlight');
const topicShort = notif.topic.replace(/^notify\//, '');
card.innerHTML = `
<div class="meta">
<span class="badge ${severityClass(notif.severity)}">${notif.severity}</span>
<span class="topic">${escapeHtml(topicShort)}</span>
<span class="time">${formatTime(notif.created_at)}</span>
</div>
<div class="payload">${renderHighlightedJson(notif.payload)}</div>
`;
el.insertBefore(card, el.firstChild);
while (el.children.length > MAX_VISIBLE) {
el.removeChild(el.lastChild);
}
}
function clearEmpty() {
const empty = el.querySelector('.empty');
if (empty) empty.remove();
}
async function loadHistory() {
try {
const res = await fetch('/history');
const data = await res.json();
if (data.length > 0) {
clearEmpty();
data.reverse().forEach(n => addCard(n, false));
}
hasHistory = true;
} catch (e) {
console.error('history fetch failed:', e);
}
}
function connectSSE() {
const evtSource = new EventSource('/events');
evtSource.onopen = () => {
statusDot.className = 'status-dot connected';
statusText.textContent = 'connected';
};
evtSource.onmessage = (e) => {
try {
const notif = JSON.parse(e.data);
clearEmpty();
addCard(notif, true);
} catch (err) {
console.error('SSE parse error:', err);
}
};
evtSource.onerror = () => {
statusDot.className = 'status-dot';
statusText.textContent = 'disconnected, retrying...';
};
}
loadHistory();
connectSSE();
</script>
</body>
</html>
+230
View File
@@ -0,0 +1,230 @@
use axum::{
extract::State,
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse,
},
routing::get,
Router,
};
use chrono::Utc;
use rumqttc::{AsyncClient, MqttOptions, Packet, QoS};
use rust_embed::RustEmbed;
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::SqlitePool;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use tracing::{error, info, warn};
#[derive(RustEmbed)]
#[folder = "src/"]
struct Assets;
#[derive(Clone, Serialize, Deserialize, Debug, sqlx::FromRow)]
struct Notification {
id: i64,
topic: String,
payload: String,
severity: String,
created_at: String,
}
#[derive(Clone)]
struct AppState {
pool: SqlitePool,
tx: broadcast::Sender<String>,
}
async fn init_db(pool: &SqlitePool) -> sqlx::Result<()> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'info',
created_at TEXT NOT NULL
)",
)
.execute(pool)
.await?;
// keep only latest 1000
sqlx::query(
"DELETE FROM notifications WHERE id NOT IN (
SELECT id FROM notifications ORDER BY id DESC LIMIT 1000
)",
)
.execute(pool)
.await?;
Ok(())
}
fn extract_severity(payload: &str) -> String {
serde_json::from_str::<serde_json::Value>(payload)
.ok()
.and_then(|v| v.get("severity").and_then(|s| s.as_str().map(String::from)))
.unwrap_or_else(|| "info".into())
}
async fn insert_notification(
pool: &SqlitePool,
topic: &str,
payload: &str,
) -> sqlx::Result<i64> {
let severity = extract_severity(payload);
let now = Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO notifications (topic, payload, severity, created_at) VALUES (?, ?, ?, ?)",
)
.bind(topic)
.bind(payload)
.bind(&severity)
.bind(&now)
.execute(pool)
.await?;
let id: (i64,) = sqlx::query_as("SELECT last_insert_rowid()")
.fetch_one(pool)
.await?;
Ok(id.0)
}
async fn run_mqtt(pool: SqlitePool, tx: broadcast::Sender<String>) {
let pass = std::fs::read_to_string("/run/agenix/mqtt-notif-passwd")
.unwrap_or_else(|e| {
error!("Cannot read MQTT password: {e}");
String::new()
})
.trim()
.to_string();
let mut opts = MqttOptions::new("newsboard", "localhost", 1883);
opts.set_credentials("notif-reader", &pass);
opts.set_clean_session(true);
let (client, mut eventloop) = AsyncClient::new(opts, 100);
client.subscribe("notify/#", QoS::AtMostOnce).await.unwrap();
info!("MQTT subscribed to notify/#");
let pool_clone = pool.clone();
let tx_clone = tx.clone();
tokio::spawn(async move {
loop {
match eventloop.poll().await {
Ok(rumqttc::Event::Incoming(Packet::Publish(publish))) => {
let topic = publish.topic.clone();
let payload = String::from_utf8_lossy(&publish.payload).to_string();
match insert_notification(&pool_clone, &topic, &payload).await {
Ok(id) => {
let notif = Notification {
id,
topic: topic.clone(),
payload: payload.clone(),
severity: extract_severity(&payload),
created_at: Utc::now().to_rfc3339(),
};
let json = serde_json::to_string(&notif).unwrap();
let _ = tx_clone.send(json);
info!(topic = %topic, "notification stored");
}
Err(e) => error!("DB insert failed: {e}"),
}
}
Ok(rumqttc::Event::Incoming(Packet::ConnAck(_))) => {
info!("MQTT connected");
}
Ok(_) => {}
Err(e) => {
warn!("MQTT error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
// reconnect loop
}
}
}
});
}
async fn handle_root() -> impl IntoResponse {
let content = Assets::get("index.html")
.map(|f| f.data.to_vec())
.unwrap_or_default();
axum::response::Html(content)
}
async fn handle_history(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let rows = sqlx::query_as::<_, Notification>(
"SELECT id, topic, payload, severity, created_at FROM notifications ORDER BY id DESC LIMIT 200",
)
.fetch_all(&state.pool)
.await
.unwrap_or_default();
axum::Json(rows)
}
async fn handle_events(
State(state): State<Arc<AppState>>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
let rx = state.tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|result| match result {
Ok(msg) => Some(Ok(Event::default().data(msg))),
Err(_) => None,
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter("newsboard=info")
.init();
let db_path = std::env::var("NEWSBOARD_DB")
.unwrap_or_else(|_| "/var/lib/newsboard/news.db".into());
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect(&format!("sqlite:{db_path}"))
.await
.expect("Failed to connect to SQLite");
init_db(&pool).await.expect("Failed to init DB");
let (tx, _) = broadcast::channel::<String>(256);
run_mqtt(pool.clone(), tx.clone()).await;
let state = Arc::new(AppState { pool, tx });
let app = Router::new()
.route("/", get(handle_root))
.route("/events", get(handle_events))
.route("/history", get(handle_history))
.with_state(state);
let port: u16 = std::env::var("NEWSBOARD_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(3800);
let addr = format!("127.0.0.1:{port}");
info!("Listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.expect("Failed to bind");
axum::serve(listener, app)
.await
.expect("Server error");
}