Updated metars with redis caching on invalid metars

This commit is contained in:
2025-04-10 09:03:38 -04:00
parent d04f5d0414
commit 9bcad27ca5
9 changed files with 414 additions and 72 deletions

View File

@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::str::FromStr;
use actix_web::web::Json;
use futures_util::try_join;
use moka::future::Cache;
use serde::{Deserialize, Serialize};
use sqlx::{Execute, Postgres, QueryBuilder};
use crate::airports::model::airport_category::AirportCategory;
@@ -172,7 +174,7 @@ impl Airport {
let metar_fut = async {
if metar {
match Metar::find_all(&[icao]).await {
match Metar::find_all(&vec![icao.to_string()]).await {
Ok(m) => Some(m.into_iter().nth(0)),
Err(err) => {
log::error!("{}", err);
@@ -223,7 +225,7 @@ impl Airport {
Some(m) => Some(m),
None => None,
},
None => None
None => None,
};
airport_row.map(|row| {
@@ -281,22 +283,49 @@ impl Airport {
let airport_rows: Vec<AirportRow> = airport_query.fetch_all(pool).await?;
let mut airports: Vec<Airport> = airport_rows.into_iter().map(From::from).collect();
// Bulk update airports with runways and frequencies
if !airports.is_empty() {
let icaos: Vec<String> = airports.iter().map(|a| a.icao.clone()).collect();
let mut runway_map = Runway::select_all_map(icaos.clone()).await?;
let mut frequency_map = Frequency::select_all_map(icaos.clone()).await?;
let mut metar_map: HashMap<String, Metar> = HashMap::new();
if query.metars.unwrap_or_else(|| false) {
let icaos_list: Vec<&str> = icaos.iter().map(|x| &**x).collect();
let metars = Metar::find_all(&icaos_list).await?;
metar_map = metars.into_iter()
.map(|metar| (metar.station_id.clone(), metar))
.collect();
if airports.is_empty() {
return Ok(airports);
}
// Bulk update airport sub-fields
let icaos: Vec<String> = airports.iter().map(|a| a.icao.clone()).collect();
let runway_future = Runway::select_all_map(icaos.clone());
let frequency_future = Frequency::select_all_map(icaos.clone());
let metar_future = if query.metars.unwrap_or(false) {
Some(Metar::find_all(&icaos))
} else {
None
};
let (runway_map, frequency_map, mut metars_opt) = match metar_future {
Some(future_metars) => {
let (runway_map, frequency_map, metars) =
try_join!(runway_future, frequency_future, future_metars)?;
(
runway_map,
frequency_map,
Some(
metars
.into_iter()
.map(|m| (m.station_id.clone(), m))
.collect::<HashMap<_, _>>(),
),
)
}
for airport in airports.iter_mut() {
airport.runways = runway_map.remove(&airport.icao).unwrap_or_default();
airport.frequencies = frequency_map.remove(&airport.icao).unwrap_or_default();
None => {
let (runway_map, frequency_map) = try_join!(runway_future, frequency_future)?;
(runway_map, frequency_map, None)
}
};
for airport in airports.iter_mut() {
airport.runways = runway_map.get(&airport.icao).cloned().unwrap_or_default();
airport.frequencies = frequency_map
.get(&airport.icao)
.cloned()
.unwrap_or_default();
if let Some(ref mut metar_map) = metars_opt {
airport.latest_metar = metar_map.remove(&airport.icao);
}
}

View File

@@ -7,7 +7,7 @@ use crate::error::ApiResult;
const TABLE_NAME: &str = "frequencies";
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Frequency {
#[serde(rename = "id")]
pub frequency_id: String,

View File

@@ -7,7 +7,7 @@ use crate::error::ApiResult;
const TABLE_NAME: &str = "runways";
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Runway {
#[serde(rename = "id")]
pub runway_id: String,

View File

@@ -1,8 +1,9 @@
use std::env;
use actix_cors::Cors;
use actix_web::{App, HttpServer, middleware::Logger};
use actix_web::{App, HttpServer, middleware::Logger, web};
use dotenv::from_filename;
use moka::future::Cache;
use crate::auth::hash;
use crate::users::{User, ADMIN_ROLE};

View File

@@ -2,7 +2,10 @@ use crate::error::Error;
use crate::{error::ApiResult, db};
use chrono::{DateTime, Datelike, Utc};
use std::collections::HashSet;
use moka::future::Cache;
use redis::{AsyncCommands, RedisResult};
use serde::{Deserialize, Serialize};
use crate::db::redis_async_connection;
const TABLE_NAME: &str = "metars";
@@ -195,13 +198,39 @@ impl Default for Metar {
}
#[derive(Serialize, Deserialize, sqlx::FromRow, Debug)]
struct MetarDb {
struct MetarRow {
icao: String,
observation_time: DateTime<Utc>,
raw_text: String,
data: serde_json::Value,
}
impl MetarRow {
async fn insert(&self) -> ApiResult<()> {
let pool = db::pool();
sqlx::query(&format!(
r#"
INSERT INTO {} (
icao,
observation_time,
raw_text,
data
)
VALUES ($1, $2, $3, $4, $5)
"#,
TABLE_NAME,
))
.bind(self.icao.clone())
.bind(self.observation_time.clone())
.bind(self.raw_text.clone())
.bind(self.data.clone())
.execute(pool)
.await?;
Ok(())
}
}
impl Metar {
fn parse_multiple(metar_strings: &Vec<&str>) -> ApiResult<Vec<Self>> {
let mut metars: Vec<Metar> = vec![];
@@ -794,14 +823,17 @@ impl Metar {
Ok(metar)
}
fn get_missing_metar_icaos(db_metars: &Vec<Self>, station_icaos: &[&str]) -> Vec<String> {
async fn get_missing_metar_icaos(
db_metars: &Vec<Self>,
station_icaos: &Vec<String>,
) -> Vec<String> {
let mut missing_metar_icaos: Vec<String> = vec![];
let current_time = chrono::Local::now().naive_local().and_utc().timestamp();
let db_metars_set: HashSet<&str> = db_metars
.iter()
.map(|icao| icao.station_id.as_str())
.collect();
let station_icaos_set: HashSet<&str> = station_icaos.to_owned().into_iter().collect();
let station_icaos_set: HashSet<&str> = station_icaos.iter().map(|s| s.as_str()).collect();
for difference in db_metars_set.symmetric_difference(&station_icaos_set) {
missing_metar_icaos.push(difference.to_string());
}
@@ -865,14 +897,14 @@ impl Metar {
Ok(metars)
}
fn from_db(metar_db: MetarDb) -> ApiResult<Metar> {
fn from_db(metar_db: MetarRow) -> ApiResult<Metar> {
let metar: Metar = serde_json::from_value(metar_db.data)?;
Ok(metar)
}
fn to_db(&self) -> ApiResult<MetarDb> {
fn to_db(&self) -> ApiResult<MetarRow> {
let data = serde_json::to_value(self)?;
Ok(MetarDb {
Ok(MetarRow {
icao: self.station_id.clone(),
observation_time: self.observation_time,
raw_text: self.raw_text.clone(),
@@ -880,13 +912,13 @@ impl Metar {
})
}
pub async fn find_all(icao_list: &[&str]) -> ApiResult<Vec<Self>> {
pub async fn find_all(icao_list: &Vec<String>) -> ApiResult<Vec<Self>> {
if icao_list.is_empty() {
return Ok(Vec::new());
}
let pool = db::pool();
let metar_dbs: Vec<MetarDb> = match sqlx::query_as::<_, MetarDb>(&format!(
let metar_rows: Vec<MetarRow> = sqlx::query_as::<_, MetarRow>(&format!(
r#"
SELECT DISTINCT ON (icao) * FROM {} WHERE icao = ANY($1) ORDER BY icao, observation_time DESC
"#,
@@ -894,28 +926,34 @@ impl Metar {
))
.bind(icao_list)
.fetch_all(pool)
.await
{
Ok(m) => m,
Err(err) => {
return Err(Error::new(
500,
format!("Unable to find METARs with input {:?}: {}", icao_list, err),
));
}
};
let mut metars: Vec<Metar> = metar_dbs
.await?;
let mut metars: Vec<Metar> = metar_rows
.into_iter()
.filter_map(|metar_db| Metar::from_db(metar_db).ok())
.collect();
let mut conn = redis_async_connection().await?;
// Check for missing metars
let missing_icao_list = Self::get_missing_metar_icaos(&metars, icao_list);
let missing_icao_list = Self::get_missing_metar_icaos(&metars, icao_list).await;
if !missing_icao_list.is_empty() {
log::trace!("Retrieving missing METAR data for {:?}", missing_icao_list);
let missing_icao_list: Vec<&str> = missing_icao_list.iter().map(|s| s.as_str()).collect();
let mut missing_icao_list = Self::get_remote_metars(&missing_icao_list)
let mut updated_missing_icao_list: Vec<&str> = Vec::new();
for icao in &missing_icao_list {
let result: RedisResult<Option<bool>> = conn.get(icao).await;
match result {
Ok(Some(value)) => {
if value {
updated_missing_icao_list.push(icao);
}
}
Ok(None) => {
updated_missing_icao_list.push(icao);
}
Err(err) => return Err(err.into()),
}
}
let mut missing_icao_list = Self::get_remote_metars(&updated_missing_icao_list)
.await
.unwrap_or_else(|err| {
log::warn!("Unable to get remote METAR data; {}", err);
@@ -925,37 +963,28 @@ impl Metar {
if missing_icao_list.len() > 0 {
// Insert missing METARs
for missing_metar in &missing_icao_list {
let _: RedisResult<()> = conn.set(&missing_metar.station_id, true).await;
missing_metar.insert().await?;
}
metars.append(&mut missing_icao_list)
}
// Invalidate the still missing icaos
let still_missing_icao_list =
Self::get_missing_metar_icaos(&missing_icao_list, icao_list).await;
if !still_missing_icao_list.is_empty() {
for icao in still_missing_icao_list {
let _: RedisResult<()> = conn.set_ex(&icao, false, 3600).await;
}
}
}
Ok(metars)
}
pub async fn insert(&self) -> ApiResult<()> {
let pool = db::pool();
let metar: MetarDb = self.to_db()?;
sqlx::query(&format!(
r#"
INSERT INTO {} (
icao,
observation_time,
raw_text,
data
)
VALUES ($1, $2, $3, $4)
"#,
TABLE_NAME,
))
.bind(metar.icao)
.bind(metar.observation_time)
.bind(metar.raw_text)
.bind(metar.data)
.execute(pool)
.await?;
let metar: MetarRow = self.to_db()?;
metar.insert().await?;
Ok(())
}
}

View File

@@ -17,7 +17,7 @@ async fn find_all(req: HttpRequest) -> HttpResponse {
Some(i) => i,
None => return HttpResponse::UnprocessableEntity().body("Missing icaos parameter"),
};
let icaos: Vec<&str> = icao_string.split(',').collect();
let icaos: Vec<String> = icao_string.split(',').map(|s| s.to_string()).collect();
let metars = match Metar::find_all(&icaos).await {
Ok(a) => a,