Metar overhaul, added footer, updated schemas

This commit is contained in:
2025-05-19 16:09:02 -04:00
parent 2ecb82ae63
commit ed98140d22
54 changed files with 1084 additions and 4924 deletions

View File

@@ -1,19 +1,33 @@
use crate::airports::{Airport, UpdateAirport};
use crate::db::redis_async_connection;
use crate::error::Error;
use crate::http_client::HttpClient;
use crate::metars::MetarCheck;
use crate::{db, error::ApiResult};
use chrono::{DateTime, Datelike, NaiveDate, Utc};
use redis::{AsyncCommands, RedisResult};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::env;
use std::fmt::Display;
use std::io::{Cursor, Read};
use std::str::FromStr;
use std::sync::OnceLock;
use flate2::read::GzDecoder;
use reqwest::header::ETAG;
use utoipa::ToSchema;
static TIME_OFFSET: OnceLock<i64> = OnceLock::new();
const TABLE_NAME: &str = "metars";
const DEFAULT_REFRESH_DURATION: i64 = 3000;
fn time_offset() -> i64 {
*TIME_OFFSET.get_or_init(|| {
env::var("API_METAR_TIME_OFFSET")
.unwrap_or("1800".to_string())
.parse::<i64>()
.unwrap_or(1800)
})
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct Metar {
@@ -292,9 +306,9 @@ impl MetarRow {
impl Metar {
fn parse_multiple(metar_strings: &Vec<&str>) -> ApiResult<Vec<Self>> {
let mut metars: Vec<Metar> = vec![];
let mut metars: Vec<Self> = vec![];
for metar_string in metar_strings {
match Metar::parse(metar_string) {
match Self::parse(metar_string) {
Ok(metar) => metars.push(metar),
Err(e) => {
log::warn!("Failed to parse metar string: {}", e);
@@ -315,7 +329,7 @@ impl Metar {
}
log::trace!("Parsing METAR data: {}", metar_string);
let mut metar: Metar = Metar::default();
let mut metar: Self = Self::default();
metar.raw_text = metar_string.to_owned();
let mut metar_parts: Vec<&str> = metar_string.split_whitespace().collect();
if metar_parts.len() < 4 {
@@ -906,9 +920,14 @@ impl Metar {
observation_day
),
)
})?
.and_hms_opt(observation_hour, observation_minute, 0)
.unwrap();
})?;
let candidate_date = match candidate_date.and_hms_opt(observation_hour, observation_minute, 0) {
Some(date) => date,
None => return Err(Error::new(
500,
format!("Invalid time for time '{}': hour {}, minute {}",
observation_time, observation_hour, observation_minute)))
};
let obs_datetime = if candidate_date > current_time {
// Subtract one month. (Handle year rollover carefully.)
@@ -928,35 +947,74 @@ impl Metar {
),
)
})?;
adjusted_date.and_hms(observation_hour, observation_minute, 0)
adjusted_date
.and_hms_opt(observation_hour, observation_minute, 0)
.unwrap()
} else {
candidate_date
};
Ok(obs_datetime.format("%Y-%m-%dT%H:%M:00Z").to_string())
}
async fn get_remote_metars(client: &Client, icaos: &Vec<String>) -> ApiResult<Vec<Metar>> {
let base_url = env::var("AVIATION_WEATHER_URL").expect("AVIATION_WEATHER_URL must be set");
pub async fn get_cached_remote_metars(client: &HttpClient, etag: Option<String>) -> ApiResult<(Vec<Self>, String)> {
let base_url = env::var("AVIATION_WEATHER_URL")
.expect("AVIATION_WEATHER_URL must be set");
let url = format!("{}/data/cache/metars.cache.csv.gz", base_url);
match client.get(&url, etag.clone()).await {
Ok(r) => {
let new_etag = r
.headers()
.get(ETAG)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
let bytes = r.bytes().await?;
let mut gz = GzDecoder::new(Cursor::new(bytes));
let mut text = String::new();
gz.read_to_string(&mut text)?;
let mut output: Vec<Metar> = Vec::new();
for line in text.lines() {
// Split off first column
let raw_text = line.splitn(2, ',').next().unwrap();
match Metar::parse(raw_text) {
Ok(m) => output.push(m),
Err(err) => {
log::warn!("{}", err);
}
};
}
match new_etag {
Some(etag) => Ok((output, etag)),
None => match etag {
Some(etag) => Ok((output, etag)),
None => Ok((output, String::new()))
}
}
}
Err(err) => Err(err.into()),
}
}
pub async fn get_remote_metars(client: &HttpClient, icaos: &Vec<String>) -> ApiResult<Vec<Self>> {
let base_url = env::var("AVIATION_WEATHER_URL")
.expect("AVIATION_WEATHER_URL must be set");
// Query the remote API for the missing METAR data 10 at a time
let icao_chunks = icaos
.chunks(10)
.map(|chunk| chunk.join(","))
.collect::<Vec<String>>();
let mut metars: Vec<Metar> = vec![];
let mut metars: Vec<Self> = vec![];
for icao_chunk in icao_chunks {
let url = format!(
"{}/metar?ids={}&hours=0&order=id,-obs",
"{}/api/data/metar?ids={}&hours=0&order=id,-obs",
base_url, icao_chunk
);
let mut m = match client.get(url).send().await {
let mut m = match client.get(&url, None).await {
Ok(r) => {
// Check if the status code is 200
if r.status() != 200 {
return Err(Error::new(
500,
format!("Request returned status {}", r.status()),
));
}
match r.text().await {
Ok(r) => {
let metar_chunk = r
@@ -979,22 +1037,22 @@ impl Metar {
Ok(metars)
}
fn from_db(metar_db: MetarRow) -> ApiResult<Metar> {
let metar: Metar = serde_json::from_value(metar_db.data)?;
fn from_row(row: MetarRow) -> ApiResult<Self> {
let metar: Self = serde_json::from_value(row.data)?;
Ok(metar)
}
fn to_db(&self) -> ApiResult<MetarRow> {
fn to_row(&self) -> ApiResult<MetarRow> {
let data = serde_json::to_value(self)?;
Ok(MetarRow {
icao: self.icao.clone(),
icao: self.icao.to_uppercase(),
observation_time: self.observation_time,
raw_text: self.raw_text.clone(),
data,
})
}
pub async fn find_all_distinct(client: &Client, icao_list: &Vec<String>) -> ApiResult<Vec<Self>> {
pub async fn get_all_distinct(icao_list: &Vec<String>) -> ApiResult<Vec<Self>> {
if icao_list.is_empty() {
return Ok(Vec::new());
}
@@ -1011,61 +1069,67 @@ impl Metar {
.bind(icao_list)
.fetch_all(pool)
.await?;
let mut metars = vec![];
for metar_row in metar_rows {
metars.push(Self::from_row(metar_row)?)
}
Ok(metars)
}
pub async fn get_or_update_metars(
client: &HttpClient,
icaos: &Vec<String>,
) -> ApiResult<Vec<Self>> {
let metars = Self::get_all_distinct(&icaos).await?;
let current_time = Utc::now().timestamp();
let time_offset = env::var("API_METAR_TIME_OFFSET")
.unwrap_or("1800".to_string())
.parse::<i64>()
.unwrap_or(1800);
let short_time_offset: i64 = 300;
// Setup metars and missing metar structures
let mut metars: Vec<Metar> = vec![];
let mut updated_metars: Vec<Self> = vec![];
let mut missing_metar_icaos: Vec<String> = vec![];
let mut found_metar_icaos: HashSet<String> = HashSet::new();
let mut requested_icaos: HashSet<String> = HashSet::from_iter(icao_list.clone());
let mut requested_icaos: HashSet<String> = HashSet::from_iter(icaos.clone());
// Iterate over returned database metars
for metar_row in metar_rows {
let icao = metar_row.icao.clone();
// Remove icao from requested icaos
for metar in metars {
let icao = metar.icao.clone();
// Remove found icao from requested ICAOs
requested_icaos.remove(&icao);
// Handle outdated metars
if current_time > (metar_row.observation_time.timestamp() + time_offset) {
// Handle outdated METARs
if current_time > (metar.observation_time.timestamp() + time_offset()) {
// If the METAR has previously been found, get the updated_at time, otherwise default
let refresh_seconds = match MetarCheck::get(&icao).await {
Some(c) => current_time - c.updated_at.timestamp(),
None => short_time_offset,
None => DEFAULT_REFRESH_DURATION,
};
// If the metar was cached more than short_time_offset minutes ago, refresh it
if refresh_seconds >= short_time_offset {
// If the metar is outdated, add it to the refresh list
if refresh_seconds >= DEFAULT_REFRESH_DURATION {
log::trace!("{} METAR data is outdated, marked for refresh", &icao);
missing_metar_icaos.push(icao.clone());
}
// Otherwise return outdated data and wait
// Otherwise return the outdated data (to be checked on the next cycle)
else {
log::trace!(
"{} METAR data is outdated; refreshing in {} seconds",
&icao,
short_time_offset - refresh_seconds
DEFAULT_REFRESH_DURATION - refresh_seconds
);
metars.push(Metar::from_db(metar_row)?)
updated_metars.push(metar);
}
}
// Otherwise add the metar to the vector
// Otherwise add the valid metar to the updated list
else {
found_metar_icaos.insert(icao.clone());
let metar_check = MetarCheck::new(icao, true).await;
metar_check.insert(time_offset as u64).await?;
metars.push(Metar::from_db(metar_row)?);
metar_check.insert().await?;
updated_metars.push(metar);
}
}
// Add all metars that were not in the returned database metars
// Add all METARs that were not in the returned database METARs
for icao in &requested_icaos {
match MetarCheck::get(icao).await {
Some(c) => {
if current_time > (c.updated_at.timestamp() + short_time_offset) {
if current_time > (c.updated_at.timestamp() + DEFAULT_REFRESH_DURATION) {
missing_metar_icaos.push(icao.to_string());
}
}
@@ -1075,6 +1139,7 @@ impl Metar {
}
}
// Retrieve missing METARs
if !missing_metar_icaos.is_empty() {
log::trace!(
"Retrieving missing METAR data for {:?}",
@@ -1087,38 +1152,47 @@ impl Metar {
vec![]
});
// Insert missing METARs
if remote_metars.len() > 0 {
// Insert missing METARs
for remote_metar in remote_metars.clone() {
remote_metar.insert().await?;
found_metar_icaos.insert(remote_metar.icao.to_string());
let mut metar_check = MetarCheck::new(remote_metar.icao.clone(), true).await;
metar_check.last_metar = Some(remote_metar);
metar_check.insert(time_offset as u64).await?;
metar_check.insert().await?;
}
metars.append(&mut remote_metars);
updated_metars.append(&mut remote_metars);
}
// Update still missing metars
// let mut still_missing_metar_icaos: Vec<String> = vec![];
// Update still missing METARs
for difference in found_metar_icaos.symmetric_difference(&requested_icaos) {
// still_missing_metar_icaos.push(difference.to_string());
let metar_check = MetarCheck::new(difference.to_string(), false).await;
metar_check.insert(short_time_offset as u64).await?;
metar_check.insert().await?;
// Only add cached metar data if it's less than 4 hours old
if let Some(last_metar) = metar_check.last_metar {
let four_hours_ago = Utc::now() - chrono::Duration::hours(4);
if last_metar.observation_time < four_hours_ago {
metars.push(last_metar);
updated_metars.push(last_metar);
}
}
}
// if !still_missing_metar_icaos.is_empty() {
// log::trace!("Still missing METAR data from {:?}", still_missing_metar_icaos);
// }
}
Ok(metars)
Ok(updated_metars)
}
pub async fn update_metars(client: &HttpClient, etag: Option<String>) -> ApiResult<String> {
let (remote_metars, etag) = Self::get_cached_remote_metars(client, etag)
.await
.unwrap_or_else(|err| {
log::warn!("Unable to get cached remote METAR data; {}", err);
(vec![], String::new())
});
for remote_metar in remote_metars.clone() {
remote_metar.insert().await?;
}
Ok(etag)
}
pub async fn insert(&self) -> ApiResult<()> {
@@ -1127,7 +1201,7 @@ impl Metar {
self.icao,
self.observation_time
);
let metar: MetarRow = self.to_db()?;
let metar: MetarRow = self.to_row()?;
metar.insert().await?;
Ok(())
}