This commit is contained in:
2025-09-19 19:33:53 -04:00
parent 8844ee75fe
commit 84312d0b50
36 changed files with 799 additions and 694 deletions

View File

@@ -1,21 +1,22 @@
use crate::airports::{Airport, UpdateAirport};
use crate::error::Error;
use crate::http_client::HttpClient;
use crate::metars::MetarCheck;
use crate::metars::utils::parse_metar_time;
use crate::{db, error::ApiResult};
use crate::error::ApiResult;
use chrono::{DateTime, Utc};
use flate2::read::GzDecoder;
use reqwest::header::ETAG;
use serde::{Deserialize, Serialize};
use sqlx::{Postgres, QueryBuilder};
use sqlx::{Pool, Postgres, QueryBuilder};
use std::collections::HashSet;
use std::env;
use std::fmt::Display;
use std::io::{Cursor, Read};
use std::str::FromStr;
use std::sync::OnceLock;
use regex::Regex;
use utoipa::ToSchema;
use crate::state::AppState;
static TIME_OFFSET: OnceLock<i64> = OnceLock::new();
@@ -278,8 +279,7 @@ struct MetarRow {
}
impl MetarRow {
async fn insert(&self) -> ApiResult<()> {
let pool = db::pool();
async fn insert(&self, pool: &Pool<Postgres>) -> ApiResult<()> {
sqlx::query(&format!(
r#"
INSERT INTO {} (
@@ -305,8 +305,7 @@ impl MetarRow {
Ok(())
}
async fn insert_all(metars: Vec<Metar>) -> ApiResult<()> {
let pool = db::pool();
async fn insert_all(pool: &Pool<Postgres>, metars: Vec<Metar>) -> ApiResult<()> {
let chunk_size = 1000;
for chunk in metars.chunks(chunk_size) {
@@ -342,10 +341,10 @@ impl MetarRow {
}
impl Metar {
fn parse_multiple(metar_strings: &Vec<&str>) -> ApiResult<Vec<Self>> {
fn parse_multiple(pool: &Pool<Postgres>, metar_strings: &Vec<&str>) -> ApiResult<Vec<Self>> {
let mut metars: Vec<Self> = vec![];
for metar_string in metar_strings {
match Self::parse(metar_string) {
match Self::parse(pool, metar_string) {
Ok(metar) => metars.push(metar),
Err(e) => {
log::warn!("Failed to parse metar string: {}", e);
@@ -357,7 +356,7 @@ impl Metar {
Ok(metars)
}
fn parse(metar_string: &str) -> ApiResult<Self> {
fn parse(pool: &Pool<Postgres>, metar_string: &str) -> ApiResult<Self> {
if metar_string.is_empty() {
return Err(Error::new(
404,
@@ -368,7 +367,11 @@ impl Metar {
log::trace!("Parsing METAR data: {}", metar_string);
let mut metar: Self = Self::default();
metar.raw_text = metar_string.to_owned();
let mut metar_parts: Vec<&str> = metar_string.split_whitespace().collect();
let mut metar_parts: Vec<&str> = metar_string
.trim()
.trim_matches(|c| c == '"' || c == '\'' || c == '“' || c == '”' || c == '' || c == '')
.trim()
.split_whitespace().collect();
if metar_parts.len() < 4 {
return Err(Error::new(
500,
@@ -380,8 +383,14 @@ impl Metar {
}
// Remove METAR at the start of the text
if metar_parts[0].to_string() == "METAR".to_string() {
let metar_re: Regex = Regex::new(r"(?i)^[\p{P}\s]*METAR[\p{P}\s]*$")?;
let speci_re: Regex = Regex::new(r"(?i)^[\p{P}\s]*SPECI[\p{P}\s]*$")?;
let token = metar_parts[0].trim();
if metar_re.is_match(token) {
metar_parts.remove(0);
} else if speci_re.is_match(token) {
return Err(Error::new(500, format!("Unable to parse SPECI data: {}", metar_string)));
}
// Station Identifier
@@ -880,8 +889,10 @@ impl Metar {
// Update the airport's metar observation time
let icao = metar.icao.clone();
let observation_time = metar.observation_time.clone();
let pool = pool.clone();
tokio::spawn(async move {
match Airport::update(
&pool.clone(),
&icao,
&UpdateAirport {
icao: None,
@@ -982,13 +993,13 @@ impl Metar {
}
pub async fn get_cached_remote_metars(
client: &HttpClient,
state: &AppState,
etag: Option<String>,
) -> ApiResult<(Vec<Self>, String)> {
let base_url = env::var("AVIATION_WEATHER_URL").expect("AVIATION_WEATHER_URL must be set");
let url = format!("{}/data/cache/metars.cache.csv.gz", base_url);
match client.get(&url, etag.clone()).await {
match state.client.get(&url, etag.clone()).await {
Ok(r) => {
let new_etag = r
.headers()
@@ -1006,7 +1017,7 @@ impl Metar {
for line in text.lines() {
// Split off the first column
let raw_text = line.splitn(2, ',').next().unwrap();
match Metar::parse(raw_text) {
match Metar::parse(&state.pool, raw_text) {
Ok(m) => output.push(m),
Err(err) => {
log::warn!("{}", err);
@@ -1026,7 +1037,7 @@ impl Metar {
}
}
pub async fn get_remote_metars(client: &HttpClient, icaos: &Vec<String>) -> ApiResult<Vec<Self>> {
pub async fn get_remote_metars(state: &AppState, icaos: &Vec<String>) -> ApiResult<Vec<Self>> {
let base_url = env::var("AVIATION_WEATHER_URL").expect("AVIATION_WEATHER_URL must be set");
// Query the remote API for the missing METAR data 10 at a time
let icao_chunks = icaos
@@ -1039,7 +1050,7 @@ impl Metar {
"{}/api/data/metar?ids={}&hours=0&order=id,-obs",
base_url, icao_chunk
);
let mut m = match client.get(&url, None).await {
let mut m = match state.client.get(&url, None).await {
Ok(r) => match r.text().await {
Ok(r) => {
let metar_chunk = r
@@ -1047,7 +1058,7 @@ impl Metar {
.split("\n")
.filter(|m| !m.trim().is_empty())
.collect();
match Self::parse_multiple(&metar_chunk) {
match Self::parse_multiple(&state.pool, &metar_chunk) {
Ok(m) => m,
Err(err) => return Err(err),
}
@@ -1076,12 +1087,11 @@ impl Metar {
})
}
pub async fn get_all_distinct(icao_list: &Vec<String>) -> ApiResult<Vec<Self>> {
pub async fn get_all_distinct(pool: &Pool<Postgres>, icao_list: &Vec<String>) -> ApiResult<Vec<Self>> {
if icao_list.is_empty() {
return Ok(Vec::new());
}
let pool = db::pool();
let metar_rows: Vec<MetarRow> = sqlx::query_as::<_, MetarRow>(&format!(
r#"
SELECT DISTINCT ON (icao) * FROM {}
@@ -1101,10 +1111,10 @@ impl Metar {
}
pub async fn get_or_update_metars(
client: &HttpClient,
state: &AppState,
icaos: &Vec<String>,
) -> ApiResult<Vec<Self>> {
let metars = Self::get_all_distinct(&icaos).await?;
let metars = Self::get_all_distinct(&state.pool, &icaos).await?;
let current_time = Utc::now().timestamp();
let mut updated_metars: Vec<Self> = vec![];
@@ -1120,7 +1130,7 @@ impl Metar {
// Handle outdated METARs
if current_time > (metar.observation_time.timestamp() + time_offset()) {
// If the METAR has previously been found, get the updated_at time, otherwise default
let refresh_seconds = match MetarCheck::get(&icao).await {
let refresh_seconds = match MetarCheck::get(state, &icao).await {
Some(c) => current_time - c.updated_at.timestamp(),
None => DEFAULT_REFRESH_DURATION,
};
@@ -1143,15 +1153,15 @@ impl Metar {
// Otherwise add the valid metar to the updated list
else {
found_metar_icaos.insert(icao.clone());
let metar_check = MetarCheck::new(icao, true).await;
metar_check.insert().await?;
let metar_check = MetarCheck::new(state, icao, true).await;
metar_check.insert(state).await?;
updated_metars.push(metar);
}
}
// Add all METARs that were not in the returned database METARs
for icao in &requested_icaos {
match MetarCheck::get(icao).await {
match MetarCheck::get(state, icao).await {
Some(c) => {
if current_time > (c.updated_at.timestamp() + DEFAULT_REFRESH_DURATION) {
missing_metar_icaos.push(icao.to_string());
@@ -1169,7 +1179,7 @@ impl Metar {
"Retrieving missing METAR data for {:?}",
missing_metar_icaos
);
let mut remote_metars = Self::get_remote_metars(client, &missing_metar_icaos)
let mut remote_metars = Self::get_remote_metars(&state, &missing_metar_icaos)
.await
.unwrap_or_else(|err| {
log::warn!("Unable to get remote METAR data; {}", err);
@@ -1179,19 +1189,19 @@ impl Metar {
// Insert missing METARs
if remote_metars.len() > 0 {
for remote_metar in remote_metars.clone() {
remote_metar.insert().await?;
remote_metar.insert(&state.pool).await?;
found_metar_icaos.insert(remote_metar.icao.to_string());
let mut metar_check = MetarCheck::new(remote_metar.icao.clone(), true).await;
let mut metar_check = MetarCheck::new(state, remote_metar.icao.clone(), true).await;
metar_check.last_metar = Some(remote_metar);
metar_check.insert().await?;
metar_check.insert(state).await?;
}
updated_metars.append(&mut remote_metars);
}
// Update still missing METARs
for difference in found_metar_icaos.symmetric_difference(&requested_icaos) {
let metar_check = MetarCheck::new(difference.to_string(), false).await;
metar_check.insert().await?;
let metar_check = MetarCheck::new(state, difference.to_string(), false).await;
metar_check.insert(state).await?;
// Only add cached metar data if it's less than 4 hours old
if let Some(last_metar) = metar_check.last_metar {
let four_hours_ago = Utc::now() - chrono::Duration::hours(4);
@@ -1205,26 +1215,26 @@ impl Metar {
Ok(updated_metars)
}
pub async fn update_metars(client: &HttpClient, etag: Option<String>) -> ApiResult<String> {
let (remote_metars, etag) = Self::get_cached_remote_metars(client, etag)
pub async fn update_metars(state: &AppState, etag: Option<String>) -> ApiResult<String> {
let (remote_metars, etag) = Self::get_cached_remote_metars(state, etag)
.await
.unwrap_or_else(|err| {
log::warn!("Unable to get cached remote METAR data; {}", err);
(vec![], String::new())
});
MetarRow::insert_all(remote_metars).await?;
MetarRow::insert_all(&state.pool, remote_metars).await?;
Ok(etag)
}
pub async fn insert(&self) -> ApiResult<()> {
pub async fn insert(&self, pool: &Pool<Postgres>) -> ApiResult<()> {
log::trace!(
"Inserting metar {} with observation time {}",
self.icao,
self.observation_time
);
let metar: MetarRow = self.to_row()?;
metar.insert().await?;
metar.insert(pool).await?;
Ok(())
}
}
@@ -1235,43 +1245,45 @@ mod tests {
#[tokio::test]
async fn test_metar_parse() {
let state = AppState::new().await.unwrap();
let mut metar_string = "METAR KABC 121755Z AUTO 21016G24KT 180V240 1SM R11/P6000FT \
-RA BR BKN015 OVC025 06/04 A2990 RMK AO2 PK WND 20032/25 WSHFT 1715 VIS 3/4V1 1/2 VIS 3/4 \
RWY11 RAB07 CIG 013V017 CIG 017 RWY11 PRESFR SLP125 P0003 60009 T00640036 10066 21012 58033 \
TSNO $"
.to_string();
let metar = Metar::parse(&metar_string).unwrap();
let metar = Metar::parse(&state.pool, &metar_string).unwrap();
dbg!(&metar.observation_time);
metar_string = "KMIA 090053Z 33004KT 10SM FEW015 FEW024 SCT075 SCT250 25/22 A2990 RMK AO2 \
SLP126 T02500217 $"
.to_string();
let metar = Metar::parse(&metar_string).unwrap();
let metar = Metar::parse(&state.pool, &metar_string).unwrap();
dbg!(&metar.observation_time);
metar_string =
"KMRB 082253Z 30014G23KT 10SM CLR 05/M12 A3002 RMK AO2 PK WND 30028/2157 SLP168 T00501117"
.to_string();
let metar = Metar::parse(&metar_string).unwrap();
let metar = Metar::parse(&state.pool, &metar_string).unwrap();
dbg!(&metar.observation_time);
metar_string = "KHEF 092356Z 13009KT 10SM CLR 08/M03 A3022 RMK AO2 SLP239 6//// T00831033 \
10133 20078 53002 PNO $"
.to_string();
let metar = Metar::parse(&metar_string).unwrap();
let metar = Metar::parse(&state.pool, &metar_string).unwrap();
dbg!(&metar.observation_time);
metar_string = "KSLK 162351Z AUTO VRB03KT 1SM -SN BR FEW007 OVC014 00/M02 A2974 RMK AO2 \
SLP090 P0001 60004 T00001017 10000 21011 53026"
.to_string();
let metar = Metar::parse(&metar_string).unwrap();
let metar = Metar::parse(&state.pool, &metar_string).unwrap();
dbg!(&metar.observation_time);
metar_string = "KABC 121755Z AUTO 21016G24KT 180V240 1SM R11/P6000FT -RA BR BKN015 OVC025 \
SCTCB FEW123TCU 06/04 A2990 RMK AO2 PK WND 20032/25 WSHFT 1715 VIS 3/4V1 1/2 VIS 3/4 RWY11 \
RAB07 CIG 013V017 CIG 017 RWY11 PRESFR SLP125 P0003 60009 T00640036 10066 21012 58033 TSNO $"
.to_string();
let metar = Metar::parse(&metar_string).unwrap();
let metar = Metar::parse(&state.pool, &metar_string).unwrap();
dbg!(&metar.observation_time);
dbg!(&metar.sky_condition);
}