Working on queries to get latest metar airports first
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::try_join;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -121,6 +122,7 @@ struct AirportRow {
|
||||
pub has_tower: Option<bool>,
|
||||
pub has_beacon: Option<bool>,
|
||||
pub public: bool,
|
||||
pub metar_observation_time: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -141,6 +143,7 @@ pub struct UpdateAirport {
|
||||
pub runways: Option<Vec<UpdateRunway>>,
|
||||
pub frequencies: Option<Vec<UpdateFrequency>>,
|
||||
pub public: Option<bool>,
|
||||
pub latest_metar_observation: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl Into<AirportRow> for Airport {
|
||||
@@ -160,6 +163,10 @@ impl Into<AirportRow> for Airport {
|
||||
has_tower: self.has_tower,
|
||||
has_beacon: self.has_beacon,
|
||||
public: self.public,
|
||||
metar_observation_time: match self.latest_metar {
|
||||
Some(m) => Some(m.observation_time),
|
||||
None => None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -303,7 +310,8 @@ impl Airport {
|
||||
Self::push_condition_bounds(&mut builder, &mut has_where, &query.bounds)?;
|
||||
|
||||
// Order by AircraftCategory
|
||||
builder.push(" ORDER BY CASE category ");
|
||||
builder.push(" ORDER BY (metar_observation_time IS NULL), ");
|
||||
builder.push(" CASE category ");
|
||||
builder.push(" WHEN 'large_airport' THEN 1 ");
|
||||
builder.push(" WHEN 'medium_airport' THEN 2 ");
|
||||
builder.push(" WHEN 'small_airport' THEN 3 ");
|
||||
@@ -516,7 +524,20 @@ impl Airport {
|
||||
}
|
||||
|
||||
// TODO
|
||||
pub async fn update(_icao: &str, _airport: &UpdateAirport) -> ApiResult<()> {
|
||||
pub async fn update(icao: &str, airport: &UpdateAirport) -> ApiResult<()> {
|
||||
let pool = db::pool();
|
||||
|
||||
let mut query_builder: QueryBuilder<Postgres> =
|
||||
QueryBuilder::new(format!("UPDATE {} SET ", TABLE_NAME));
|
||||
if let Some(latest_metar_observation) = airport.latest_metar_observation {
|
||||
query_builder.push("metar_observation_time = ");
|
||||
query_builder.push_bind(latest_metar_observation);
|
||||
}
|
||||
|
||||
query_builder.push(" WHERE icao = ").push_bind(icao);
|
||||
let query = query_builder.build();
|
||||
query.execute(pool).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::str::FromStr;
|
||||
use redis::{AsyncCommands, RedisResult};
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use crate::airports::{Airport, UpdateAirport};
|
||||
use crate::db::redis_async_connection;
|
||||
|
||||
const TABLE_NAME: &str = "metars";
|
||||
@@ -63,7 +64,7 @@ pub enum ReportModifier {
|
||||
#[serde(rename = "AUTO")]
|
||||
Auto,
|
||||
#[serde(rename = "COR")]
|
||||
Corrected
|
||||
Corrected,
|
||||
}
|
||||
|
||||
impl FromStr for ReportModifier {
|
||||
@@ -72,7 +73,7 @@ impl FromStr for ReportModifier {
|
||||
match s {
|
||||
"AUTO" => Ok(ReportModifier::Auto),
|
||||
"COR" => Ok(ReportModifier::Corrected),
|
||||
_ => Err(Error::new(400, format!("Invalid report modifier '{}'", s)))
|
||||
_ => Err(Error::new(400, format!("Invalid report modifier '{}'", s))),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -122,7 +123,10 @@ impl FromStr for AutomatedStationType {
|
||||
match s {
|
||||
"AO1" => Ok(AutomatedStationType::WithoutPrecipitationDiscriminator),
|
||||
"AO2" => Ok(AutomatedStationType::WithPrecipitationDiscriminator),
|
||||
_ => Err(Error::new(400, format!("Invalid automated station type '{}'", s)))
|
||||
_ => Err(Error::new(
|
||||
400,
|
||||
format!("Invalid automated station type '{}'", s),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -522,9 +526,7 @@ impl Metar {
|
||||
format!(
|
||||
"P{}",
|
||||
visibility_whole
|
||||
+ (visibility_left[1..visibility_left.len()]
|
||||
.parse::<f64>()?
|
||||
/ visibility_right)
|
||||
+ (visibility_left[1..visibility_left.len()].parse::<f64>()? / visibility_right)
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
@@ -895,10 +897,7 @@ impl Metar {
|
||||
) -> Vec<String> {
|
||||
let mut missing_metar_icaos: Vec<String> = vec![];
|
||||
let current_time = chrono::Local::now().naive_local().and_utc().timestamp();
|
||||
let db_metars_set: HashSet<&str> = db_metars
|
||||
.iter()
|
||||
.map(|icao| icao.icao.as_str())
|
||||
.collect();
|
||||
let db_metars_set: HashSet<&str> = db_metars.iter().map(|icao| icao.icao.as_str()).collect();
|
||||
let station_icaos_set: HashSet<&str> = station_icaos.iter().map(|s| s.as_str()).collect();
|
||||
for difference in db_metars_set.symmetric_difference(&station_icaos_set) {
|
||||
missing_metar_icaos.push(difference.to_string());
|
||||
@@ -986,17 +985,63 @@ impl Metar {
|
||||
let pool = db::pool();
|
||||
let metar_rows: Vec<MetarRow> = sqlx::query_as::<_, MetarRow>(&format!(
|
||||
r#"
|
||||
SELECT DISTINCT ON (icao) * FROM {} WHERE icao = ANY($1) ORDER BY icao, observation_time DESC
|
||||
"#,
|
||||
SELECT DISTINCT ON (icao) * FROM {}
|
||||
WHERE icao = ANY($1)
|
||||
AND observation_time >= NOW() - INTERVAL '50 minutes'
|
||||
ORDER BY icao, observation_time DESC
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(icao_list)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
metars = metar_rows
|
||||
.into_iter()
|
||||
.filter_map(|metar_db| Metar::from_db(metar_db).ok())
|
||||
.collect();
|
||||
for metar_row in metar_rows {
|
||||
let metar = match Metar::from_db(metar_row) {
|
||||
Ok(m) => m,
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let icao = metar.icao.clone();
|
||||
let observation_time = metar.observation_time.clone();
|
||||
tokio::spawn(async move {
|
||||
match Airport::update(
|
||||
&icao,
|
||||
&UpdateAirport {
|
||||
icao: None,
|
||||
iata: None,
|
||||
local: None,
|
||||
name: None,
|
||||
category: None,
|
||||
iso_country: None,
|
||||
iso_region: None,
|
||||
municipality: None,
|
||||
elevation_ft: None,
|
||||
longitude: None,
|
||||
latitude: None,
|
||||
has_tower: None,
|
||||
has_beacon: None,
|
||||
runways: None,
|
||||
frequencies: None,
|
||||
public: None,
|
||||
latest_metar_observation: Some(observation_time),
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(err) => log::error!(
|
||||
"Unable to update airport {} with the latest observation time: {}",
|
||||
icao,
|
||||
err
|
||||
),
|
||||
};
|
||||
});
|
||||
|
||||
metars.push(metar);
|
||||
}
|
||||
}
|
||||
|
||||
let mut conn = redis_async_connection().await?;
|
||||
@@ -1011,17 +1056,19 @@ impl Metar {
|
||||
let result: RedisResult<Option<bool>> = conn.get(icao).await;
|
||||
match result {
|
||||
Ok(Some(value)) => {
|
||||
if value {
|
||||
if !value {
|
||||
updated_missing_icao_list.push(icao);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
updated_missing_icao_list.push(icao);
|
||||
Ok(None) => updated_missing_icao_list.push(icao),
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
return Err(err.into());
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
dbg!(&updated_missing_icao_list);
|
||||
if !updated_missing_icao_list.is_empty() {
|
||||
log::trace!(
|
||||
"Retrieving missing METAR data for {:?}",
|
||||
|
||||
Reference in New Issue
Block a user