Updated metar checking
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
use crate::error::ApiResult;
|
||||
use redis::{
|
||||
Client as RedisClient, aio::MultiplexedConnection as RedisConnection, RedisResult,
|
||||
};
|
||||
use redis::{Client as RedisClient, aio::MultiplexedConnection as RedisConnection, RedisResult};
|
||||
use s3::{Bucket, Region, creds::Credentials, BucketConfiguration, request::ResponseData};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::OnceLock;
|
||||
|
||||
57
api/src/metars/metar_check.rs
Normal file
57
api/src/metars/metar_check.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use redis::{AsyncCommands, RedisResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use crate::db::redis_async_connection;
|
||||
use crate::error::ApiResult;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct MetarCheck {
|
||||
pub icao: String,
|
||||
pub status: bool,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl MetarCheck {
|
||||
pub fn new(icao: String, status: bool) -> Self {
|
||||
Self {
|
||||
icao,
|
||||
status,
|
||||
updated_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get(icao: &str) -> Option<MetarCheck> {
|
||||
let mut conn = match redis_async_connection().await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let result: RedisResult<Option<String>> = conn.get(icao).await;
|
||||
match result {
|
||||
Ok(Some(value)) => match serde_json::from_str(&value) {
|
||||
Ok(result) => Some(result),
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
None
|
||||
}
|
||||
},
|
||||
Ok(None) => None,
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn insert(&self, seconds: u64) -> ApiResult<()> {
|
||||
let mut conn = redis_async_connection().await?;
|
||||
let value = serde_json::to_string(&self)?;
|
||||
conn
|
||||
.set_ex::<_, _, ()>(self.icao.as_str(), value, seconds)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
mod metar_check;
|
||||
mod model;
|
||||
mod routes;
|
||||
|
||||
pub use model::*;
|
||||
pub use metar_check::*;
|
||||
pub use routes::init_routes;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::error::Error;
|
||||
use crate::{error::ApiResult, db};
|
||||
use chrono::{DateTime, Datelike, NaiveDate, Utc};
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::str::FromStr;
|
||||
@@ -10,6 +10,7 @@ use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use crate::airports::{Airport, UpdateAirport};
|
||||
use crate::db::redis_async_connection;
|
||||
use crate::metars::MetarCheck;
|
||||
|
||||
const TABLE_NAME: &str = "metars";
|
||||
|
||||
@@ -431,28 +432,19 @@ impl Metar {
|
||||
let visibility: String = if visibility_str.contains("/") {
|
||||
let visibility_parts: Vec<&str> = visibility_str.split("/").collect();
|
||||
let visibility_left = visibility_parts[0];
|
||||
let visibility_right = visibility_parts[1].parse::<f64>().unwrap();
|
||||
let visibility_right = visibility_parts[1].parse::<f64>()?;
|
||||
if visibility_left.starts_with("M") {
|
||||
format!(
|
||||
"M{}",
|
||||
visibility_left[1..visibility_left.len()]
|
||||
.parse::<f64>()
|
||||
.unwrap()
|
||||
/ visibility_right
|
||||
visibility_left[1..visibility_left.len()].parse::<f64>()? / visibility_right
|
||||
)
|
||||
} else if visibility_left.starts_with("P") {
|
||||
format!(
|
||||
"P{}",
|
||||
visibility_left[1..visibility_left.len()]
|
||||
.parse::<f64>()
|
||||
.unwrap()
|
||||
/ visibility_right
|
||||
visibility_left[1..visibility_left.len()].parse::<f64>()? / visibility_right
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"{}",
|
||||
visibility_left.parse::<f64>().unwrap() / visibility_right
|
||||
)
|
||||
format!("{}", visibility_left.parse::<f64>()? / visibility_right)
|
||||
}
|
||||
} else {
|
||||
visibility_str.to_string()
|
||||
@@ -463,22 +455,18 @@ impl Metar {
|
||||
&& metar_parts.len() > 1
|
||||
&& visibility_re.is_match(metar_parts[1])
|
||||
{
|
||||
let visibility_whole = metar_parts[0].parse::<f64>().unwrap();
|
||||
let visibility_whole = metar_parts[0].parse::<f64>()?;
|
||||
metar_parts.remove(0);
|
||||
let visibility_parts: Vec<&str> = metar_parts[0].split("/").collect();
|
||||
metar_parts.remove(0);
|
||||
let visibility_left = visibility_parts[0];
|
||||
let visibility_right = visibility_parts[1][0..visibility_parts[1].len() - 2]
|
||||
.parse::<f64>()
|
||||
.unwrap();
|
||||
let visibility_right =
|
||||
visibility_parts[1][0..visibility_parts[1].len() - 2].parse::<f64>()?;
|
||||
let visibility = if visibility_left.starts_with("M") {
|
||||
format!(
|
||||
"M{}",
|
||||
visibility_whole
|
||||
+ (visibility_left[1..visibility_left.len()]
|
||||
.parse::<f64>()
|
||||
.unwrap()
|
||||
/ visibility_right)
|
||||
+ (visibility_left[1..visibility_left.len()].parse::<f64>()? / visibility_right)
|
||||
)
|
||||
} else if visibility_left.starts_with("P") {
|
||||
format!(
|
||||
@@ -909,8 +897,17 @@ impl Metar {
|
||||
let current_year = current_time.year();
|
||||
let current_month = current_time.month();
|
||||
let candidate_date = NaiveDate::from_ymd_opt(current_year, current_month, observation_day)
|
||||
.ok_or_else(|| Error::new(500, format!("Invalid date with day {} for current month", observation_day)))?
|
||||
.and_hms_opt(observation_hour, observation_minute, 0).unwrap();
|
||||
.ok_or_else(|| {
|
||||
Error::new(
|
||||
500,
|
||||
format!(
|
||||
"Invalid date with day {} for current month",
|
||||
observation_day
|
||||
),
|
||||
)
|
||||
})?
|
||||
.and_hms_opt(observation_hour, observation_minute, 0)
|
||||
.unwrap();
|
||||
|
||||
let obs_datetime = if candidate_date > current_time {
|
||||
// Subtract one month. (Handle year rollover carefully.)
|
||||
@@ -920,8 +917,16 @@ impl Metar {
|
||||
(current_month - 1, current_year)
|
||||
};
|
||||
|
||||
let adjusted_date = NaiveDate::from_ymd_opt(year, month, observation_day)
|
||||
.ok_or_else(|| Error::new(500, format!("Invalid date with day {} for month {}", observation_day, month)))?;
|
||||
let adjusted_date =
|
||||
NaiveDate::from_ymd_opt(year, month, observation_day).ok_or_else(|| {
|
||||
Error::new(
|
||||
500,
|
||||
format!(
|
||||
"Invalid date with day {} for month {}",
|
||||
observation_day, month
|
||||
),
|
||||
)
|
||||
})?;
|
||||
adjusted_date.and_hms(observation_hour, observation_minute, 0)
|
||||
} else {
|
||||
candidate_date
|
||||
@@ -929,32 +934,8 @@ impl Metar {
|
||||
Ok(obs_datetime.format("%Y-%m-%dT%H:%M:00Z").to_string())
|
||||
}
|
||||
|
||||
async fn get_missing_metar_icaos(
|
||||
db_metars: &Vec<Self>,
|
||||
station_icaos: &Vec<String>,
|
||||
) -> Vec<String> {
|
||||
let mut missing_metar_icaos: Vec<String> = vec![];
|
||||
let current_time = Utc::now().timestamp();
|
||||
let db_metars_set: HashSet<&str> = db_metars.iter().map(|icao| icao.icao.as_str()).collect();
|
||||
let station_icaos_set: HashSet<&str> = station_icaos.iter().map(|s| s.as_str()).collect();
|
||||
for difference in db_metars_set.symmetric_difference(&station_icaos_set) {
|
||||
missing_metar_icaos.push(difference.to_string());
|
||||
}
|
||||
let time_offset = env::var("API_METAR_TIME_OFFSET")
|
||||
.unwrap_or("3000".to_string())
|
||||
.parse::<i64>()
|
||||
.unwrap_or(3000);
|
||||
for metar in db_metars {
|
||||
if current_time > (metar.observation_time.timestamp() + time_offset) {
|
||||
log::trace!("{} METAR data is outdated", metar.icao);
|
||||
missing_metar_icaos.push(metar.icao.to_string());
|
||||
}
|
||||
}
|
||||
missing_metar_icaos
|
||||
}
|
||||
|
||||
async fn get_remote_metars(client: &Client, icaos: &[&str]) -> ApiResult<Vec<Metar>> {
|
||||
let base_url = std::env::var("AVIATION_WEATHER_URL").expect("AVIATION_WEATHER_URL must be set");
|
||||
async fn get_remote_metars(client: &Client, icaos: &Vec<String>) -> ApiResult<Vec<Metar>> {
|
||||
let base_url = env::var("AVIATION_WEATHER_URL").expect("AVIATION_WEATHER_URL must be set");
|
||||
// Query the remote API for the missing METAR data 10 at a time
|
||||
let icao_chunks = icaos
|
||||
.chunks(10)
|
||||
@@ -962,7 +943,10 @@ impl Metar {
|
||||
.collect::<Vec<String>>();
|
||||
let mut metars: Vec<Metar> = vec![];
|
||||
for icao_chunk in icao_chunks {
|
||||
let url = format!("{}/metar?ids={}&hours=0&order=id,-obs", base_url, icao_chunk);
|
||||
let url = format!(
|
||||
"{}/metar?ids={}&hours=0&order=id,-obs",
|
||||
base_url, icao_chunk
|
||||
);
|
||||
let mut m = match client.get(url).send().await {
|
||||
Ok(r) => {
|
||||
// Check if the status code is 200
|
||||
@@ -1012,100 +996,132 @@ impl Metar {
|
||||
pub async fn find_all(
|
||||
client: &Client,
|
||||
icao_list: &Vec<String>,
|
||||
force: &bool,
|
||||
_force: &bool,
|
||||
) -> ApiResult<Vec<Self>> {
|
||||
if icao_list.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let pool = db::pool();
|
||||
let metar_rows: Vec<MetarRow> = sqlx::query_as::<_, MetarRow>(&format!(
|
||||
r#"
|
||||
SELECT DISTINCT ON (icao) * FROM {}
|
||||
WHERE icao = ANY($1)
|
||||
ORDER BY icao, observation_time DESC
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(icao_list)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let current_time = Utc::now().timestamp();
|
||||
let time_offset = env::var("API_METAR_TIME_OFFSET")
|
||||
.unwrap_or("3000".to_string())
|
||||
.parse::<i64>()
|
||||
.unwrap_or(3000);
|
||||
let short_time_offset: i64 = 300;
|
||||
|
||||
// Setup metars and missing metar structures
|
||||
let mut metars: Vec<Metar> = vec![];
|
||||
if !*force {
|
||||
let pool = db::pool();
|
||||
let metar_rows: Vec<MetarRow> = sqlx::query_as::<_, MetarRow>(&format!(
|
||||
r#"
|
||||
SELECT DISTINCT ON (icao) * FROM {}
|
||||
WHERE icao = ANY($1)
|
||||
ORDER BY icao, observation_time DESC
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(icao_list)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
metars = metar_rows
|
||||
.into_iter()
|
||||
.map(|m| Metar::from_db(m).unwrap())
|
||||
.collect();
|
||||
let mut missing_metar_icaos: Vec<String> = vec![];
|
||||
let mut found_metar_icaos: HashSet<String> = HashSet::new();
|
||||
let mut requested_icaos: HashSet<String> = HashSet::from_iter(icao_list.clone());
|
||||
|
||||
// Iterate over returned database metars
|
||||
for metar_row in metar_rows {
|
||||
let icao = metar_row.icao.clone();
|
||||
// Remove icao from requested icaos
|
||||
requested_icaos.remove(&icao);
|
||||
|
||||
// Handle outdated metars
|
||||
if current_time > (metar_row.observation_time.timestamp() + time_offset) {
|
||||
let refresh_seconds = match MetarCheck::get(&icao).await {
|
||||
Some(c) => current_time - c.updated_at.timestamp(),
|
||||
None => short_time_offset,
|
||||
};
|
||||
// If the metar was cached more than short_time_offset minutes ago, refresh it
|
||||
if refresh_seconds >= short_time_offset {
|
||||
log::trace!("{} METAR data is outdated, refreshing now", &icao);
|
||||
missing_metar_icaos.push(icao);
|
||||
}
|
||||
// Otherwise return outdated data and wait
|
||||
else {
|
||||
log::trace!(
|
||||
"{} METAR data is outdated; refreshing in {} seconds",
|
||||
&icao,
|
||||
short_time_offset - refresh_seconds
|
||||
);
|
||||
metars.push(Metar::from_db(metar_row)?)
|
||||
}
|
||||
}
|
||||
// Otherwise add the metar to the vector
|
||||
else {
|
||||
found_metar_icaos.insert(icao.clone());
|
||||
let metar_check = MetarCheck::new(icao, true);
|
||||
metar_check.insert(time_offset as u64).await?;
|
||||
metars.push(Metar::from_db(metar_row)?);
|
||||
}
|
||||
}
|
||||
|
||||
let mut conn = redis_async_connection().await?;
|
||||
// Check for missing metars
|
||||
let missing_icao_list = Self::get_missing_metar_icaos(&metars, icao_list).await;
|
||||
if !missing_icao_list.is_empty() {
|
||||
let mut updated_missing_icao_list: Vec<&str> = Vec::new();
|
||||
for icao in &missing_icao_list {
|
||||
if *force {
|
||||
updated_missing_icao_list.push(icao);
|
||||
} else {
|
||||
let result: RedisResult<Option<bool>> = conn.get(icao).await;
|
||||
match result {
|
||||
Ok(Some(value)) => {
|
||||
if value {
|
||||
updated_missing_icao_list.push(icao);
|
||||
}
|
||||
}
|
||||
Ok(None) => updated_missing_icao_list.push(icao),
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
return Err(err.into());
|
||||
}
|
||||
// Add all metars that were not in the returned database metars
|
||||
for icao in &requested_icaos {
|
||||
match MetarCheck::get(icao).await {
|
||||
Some(c) => {
|
||||
if current_time > (c.updated_at.timestamp() + short_time_offset) {
|
||||
missing_metar_icaos.push(icao.to_string());
|
||||
}
|
||||
}
|
||||
None => {
|
||||
missing_metar_icaos.push(icao.to_string());
|
||||
}
|
||||
}
|
||||
if !updated_missing_icao_list.is_empty() {
|
||||
log::trace!(
|
||||
"Retrieving missing METAR data for {:?}",
|
||||
updated_missing_icao_list
|
||||
);
|
||||
let mut missing_icao_list = Self::get_remote_metars(client, &updated_missing_icao_list)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
log::warn!("Unable to get remote METAR data; {}", err);
|
||||
vec![]
|
||||
});
|
||||
}
|
||||
|
||||
if missing_icao_list.len() > 0 {
|
||||
// Insert missing METARs
|
||||
for missing_metar in &missing_icao_list {
|
||||
let _: RedisResult<()> = conn.set(&missing_metar.icao, true).await;
|
||||
missing_metar.insert().await?;
|
||||
}
|
||||
metars.append(&mut missing_icao_list)
|
||||
}
|
||||
if !missing_metar_icaos.is_empty() {
|
||||
log::trace!(
|
||||
"Retrieving missing METAR data for {:?}",
|
||||
missing_metar_icaos
|
||||
);
|
||||
let mut remote_metars = Self::get_remote_metars(client, &missing_metar_icaos)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
log::warn!("Unable to get remote METAR data; {}", err);
|
||||
vec![]
|
||||
});
|
||||
|
||||
// Invalidate the still missing icaos
|
||||
let still_missing_icao_list =
|
||||
Self::get_missing_metar_icaos(&missing_icao_list, icao_list).await;
|
||||
if !still_missing_icao_list.is_empty() {
|
||||
for icao in still_missing_icao_list {
|
||||
// Skip values if they've been set to true in the past
|
||||
let result: RedisResult<Option<bool>> = conn.get(&icao).await;
|
||||
if let Ok(Some(v)) = result {
|
||||
if v {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let _: RedisResult<()> = conn.set_ex(&icao, false, 3600).await;
|
||||
}
|
||||
if remote_metars.len() > 0 {
|
||||
// Insert missing METARs
|
||||
for remote_metar in &remote_metars {
|
||||
found_metar_icaos.insert(remote_metar.icao.to_string());
|
||||
let metar_check = MetarCheck::new(remote_metar.icao.clone(), true);
|
||||
metar_check.insert(time_offset as u64).await?;
|
||||
remote_metar.insert().await?;
|
||||
}
|
||||
metars.append(&mut remote_metars)
|
||||
}
|
||||
|
||||
// Update still missing metars
|
||||
let mut still_missing_metar_icaos: Vec<String> = vec![];
|
||||
for difference in found_metar_icaos.symmetric_difference(&requested_icaos) {
|
||||
still_missing_metar_icaos.push(difference.to_string());
|
||||
let metar_check = MetarCheck::new(difference.to_string(), false);
|
||||
metar_check.insert(short_time_offset as u64).await?
|
||||
}
|
||||
// if !still_missing_metar_icaos.is_empty() {
|
||||
// log::trace!("Still missing METAR data from {:?}", still_missing_metar_icaos);
|
||||
// }
|
||||
}
|
||||
|
||||
Ok(metars)
|
||||
}
|
||||
|
||||
pub async fn insert(&self) -> ApiResult<()> {
|
||||
log::trace!("Inserting metar {} with observation time {}", self.icao, self.observation_time);
|
||||
log::trace!(
|
||||
"Inserting metar {} with observation time {}",
|
||||
self.icao,
|
||||
self.observation_time
|
||||
);
|
||||
let metar: MetarRow = self.to_db()?;
|
||||
metar.insert().await?;
|
||||
Ok(())
|
||||
@@ -1137,13 +1153,12 @@ mod tests {
|
||||
);
|
||||
// Remove the trailing 'Z' and parse
|
||||
let trimmed = &datetime_str[..19];
|
||||
NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S")
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Parsing '{}' from input {} failed: {}",
|
||||
trimmed, obs_time, e
|
||||
)
|
||||
});
|
||||
NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S").unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Parsing '{}' from input {} failed: {}",
|
||||
trimmed, obs_time, e
|
||||
)
|
||||
});
|
||||
}
|
||||
Err(_err) => {}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user