diff --git a/weather-service/src/metars/model.rs b/weather-service/src/metars/model.rs index b2e573d..89240c5 100644 --- a/weather-service/src/metars/model.rs +++ b/weather-service/src/metars/model.rs @@ -1,5 +1,5 @@ use crate::{error_handler::ServiceError, db}; -use crate::schema::metars; +use crate::schema::metars::{self}; use diesel::{prelude::*, sql_query}; use log::{warn, trace}; use std::collections::HashSet; @@ -18,11 +18,11 @@ pub struct SkyCondition { #[serde(rename = "@sky_cover")] pub sky_cover: String, #[serde(rename = "@cloud_base_ft_agl")] - pub cloud_base_ft_agl: String + pub cloud_base_ft_agl: Option } #[derive(Serialize, Deserialize, Debug)] -pub struct RetrievedMetar { +pub struct Metar { pub raw_text: String, pub station_id: String, pub observation_time: String, @@ -37,20 +37,20 @@ pub struct RetrievedMetar { pub sea_level_pressure_mb: Option, pub quality_control_flags: Option, pub wx_string: Option, - pub sky_condition: Option>, // TODO work on attributes + pub sky_condition: Option>, pub flight_category: String, pub three_hr_pressure_tendency_mb: Option, pub metar_type: String, #[serde(rename = "maxT_c")] pub max_t_c: Option, - #[serde(rename = " ")] + #[serde(rename = "minT_c")] pub min_t_c: Option, pub precip_in: Option, pub elevation_m: i32 } -impl RetrievedMetar { - pub fn parse(input: String) -> Result, ServiceError> { +impl Metar { + fn parse(input: String) -> Result, ServiceError> { if input.is_empty() { return Err(ServiceError::new(500, "Input is empty".to_string())) } @@ -86,7 +86,7 @@ impl RetrievedMetar { } // https://capnfabs.net/posts/parsing-huge-xml-quickxml-rust-serde/ - pub fn read_to_end_into_buffer(reader: &mut Reader, start_tag: &BytesStart, junk_buf: &mut Vec) -> Result, quick_xml::Error> { + fn read_to_end_into_buffer(reader: &mut Reader, start_tag: &BytesStart, junk_buf: &mut Vec) -> Result, quick_xml::Error> { let mut depth = 0; let mut output_buf: Vec = Vec::new(); let mut w = Writer::new(&mut output_buf); @@ -112,75 +112,12 @@ impl RetrievedMetar { } } } -} -#[derive(Serialize, Deserialize, AsChangeset, Insertable)] -#[diesel(table_name = metars)] -pub struct InsertMetar { - pub raw_text: String, - pub station_id: String, - pub observation_time: String, - pub latitude: f64, - pub longitude: f64, - pub temp_c: Option, - pub dewpoint_c: Option, - pub wind_dir_degrees: Option, - pub wind_speed_kt: Option, - pub visibility_statute_mi: Option, - pub altim_in_hg: Option, - pub sea_level_pressure_mb: Option, - pub qcf_auto: Option, - pub qcf_auto_station: Option, - pub wx_string: Option, - pub sky_condition: Option>, - pub flight_category: String, - pub three_hr_pressure_tendency_mb: Option, - pub metar_type: String, - #[serde(rename = "maxT_c")] - pub max_t_c: Option, - #[serde(rename = " ")] - pub min_t_c: Option, - pub precip_in: Option, - pub elevation_m: i32 -} - -#[derive(Serialize, Deserialize, Queryable, QueryableByName)] -#[diesel(table_name = metars)] -pub struct QueryMetar { - pub id: i32, - pub raw_text: String, - pub station_id: String, - pub observation_time: String, - pub latitude: f64, - pub longitude: f64, - pub temp_c: Option, - pub dewpoint_c: Option, - pub wind_dir_degrees: Option, - pub wind_speed_kt: Option, - pub visibility_statute_mi: Option, - pub altim_in_hg: Option, - pub sea_level_pressure_mb: Option, - pub qcf_auto: Option, - pub qcf_auto_station: Option, - pub wx_string: Option, - pub sky_condition: Option>, - pub flight_category: String, - pub three_hr_pressure_tendency_mb: Option, - pub metar_type: String, - #[serde(rename = "maxT_c")] - pub max_t_c: Option, - #[serde(rename = "minT_c")] - pub min_t_c: Option, - pub precip_in: Option, - pub elevation_m: i32 -} - -impl QueryMetar { - fn get_missing_metar_icaos(db_metars: &Vec, station_icaos: Vec<&str>) -> Vec { + fn get_missing_metar_icaos(db_metars: &Vec, station_icaos: &Vec<&str>) -> Vec { let mut missing_metar_icaos: Vec = vec![]; let current_time = chrono::Local::now().naive_local().timestamp(); let db_metars_set: HashSet<&str> = db_metars.iter().map(|icao| icao.station_id.as_str()).collect(); - let station_icaos_set: HashSet<&str> = station_icaos.into_iter().collect(); + let station_icaos_set: HashSet<&str> = station_icaos.to_owned().into_iter().collect(); for difference in db_metars_set.symmetric_difference(&station_icaos_set) { missing_metar_icaos.push(difference.to_string()); } @@ -201,12 +138,12 @@ impl QueryMetar { return missing_metar_icaos; } - async fn get_remote_metars(icaos: String) -> Vec { + async fn get_remote_metars(icaos: String) -> Vec { let url = format!("https://beta.aviationweather.gov/cgi-bin/data/metar.php?ids={}&format=xml", icaos); - let retrieved_metars: Vec = match reqwest::get(url).await { + match reqwest::get(url).await { Ok(r) => match r.text().await { Ok(r) => { - match RetrievedMetar::parse(r) { + match Metar::parse(r) { Ok(m) => m, Err(err) => { warn!("{}", err); @@ -223,11 +160,33 @@ impl QueryMetar { warn!("Unable to get METAR request: {}", err); vec![] } - }; - let mut metars: Vec = vec![]; - for metar in retrieved_metars { - println!("{:?}", metar); - metars.push(InsertMetar { + } + } + + fn from_query(query_metars: Vec) -> Vec { + let mut metars: Vec = vec![]; + for metar in query_metars { + let quality_control_flags = Some(QualityControlFlags { + auto: metar.qcf_auto, + auto_station: metar.qcf_auto_station + }); + let sky_condition = match metar.sky_condition { + Some(s) => { + let mut sc: Vec = vec![]; + for string in s { + let split: Vec<&str> = string.split_whitespace().collect(); + if split.len() == 1 { + sc.push(SkyCondition { sky_cover: split[0].to_string(), cloud_base_ft_agl: None }) + } else if split.len() == 2 { + let cloud_base = split[1].parse::().unwrap(); + sc.push(SkyCondition { sky_cover: split[0].to_string(), cloud_base_ft_agl: Some(cloud_base) }) + } + } + Some(sc) + }, + None => None + }; + metars.push(Metar { raw_text: metar.raw_text, station_id: metar.station_id, observation_time: metar.observation_time, @@ -240,25 +199,9 @@ impl QueryMetar { visibility_statute_mi: metar.visibility_statute_mi, altim_in_hg: metar.altim_in_hg, sea_level_pressure_mb: metar.sea_level_pressure_mb, - qcf_auto: match &metar.quality_control_flags { - Some(q) => q.auto, - None => None - }, - qcf_auto_station: match &metar.quality_control_flags { - Some(q) => q.auto_station, - None => None - }, + quality_control_flags, wx_string: metar.wx_string, - sky_condition: match &metar.sky_condition { - Some(s) => { - let mut sc: Vec = vec![]; - for condition in s { - sc.push(format!("{} {}", condition.sky_cover, condition.cloud_base_ft_agl)); - } - Some(sc) - }, - None => None - }, + sky_condition, flight_category: metar.flight_category, three_hr_pressure_tendency_mb: metar.three_hr_pressure_tendency_mb, metar_type: metar.metar_type, @@ -271,38 +214,10 @@ impl QueryMetar { return metars; } - pub async fn get_all(icaos: String) -> Result, ServiceError> { - if icaos.is_empty() { - return Ok(vec![]); - } - let station_icaos: Vec<&str> = icaos.split(',').collect(); - let station_query: Vec = station_icaos.iter().map(|icao| format!("'{}'", icao.to_string())).collect(); - - let mut conn = db::connection()?; - let mut db_metars: Vec = match sql_query(format!("SELECT DISTINCT ON (station_id) * FROM metars WHERE station_id IN ({}) ORDER BY station_id, observation_time DESC", station_query.join(","))).load(&mut conn) { - Ok(m) => m, - Err(err) => return Err(ServiceError { error_status_code: 500, error_message: format!("{}", err) }) - }; - - let missing_icaos = Self::get_missing_metar_icaos(&db_metars, station_icaos); - if missing_icaos.is_empty() { - return Ok(db_metars); - } - trace!("Retrieving missing METAR data for {:?}", missing_icaos); - let missing_icaos_string: Vec = missing_icaos.iter().map(|icao| format!("'{}'", icao.to_string())).collect(); - - let metars = Self::get_remote_metars(missing_icaos_string.join(",")).await; - - if metars.len() > 0 { - match diesel::insert_into(metars::table).values(&metars).execute(&mut conn) { - Ok(rows) => trace!("Inserted {} metar rows", rows), - Err(err) => warn!("Unable to insert metar data; {}", err) - }; - } - let mut returned_metars: Vec = vec![]; - for metar in &metars { - returned_metars.push(Self { - id: 0, + fn to_insert(metars: &Vec) -> Vec { + let mut insert_metars: Vec = vec![]; + for metar in metars { + insert_metars.push(InsertMetar { raw_text: metar.raw_text.to_string(), station_id: metar.station_id.to_string(), observation_time: metar.observation_time.to_string(), @@ -311,33 +226,157 @@ impl QueryMetar { temp_c: metar.temp_c, dewpoint_c: metar.dewpoint_c, wind_dir_degrees: match &metar.wind_dir_degrees { - Some(d) => Some(d.to_string()), + Some(m) => Some(m.to_string()), None => None }, wind_speed_kt: metar.wind_speed_kt, visibility_statute_mi: match &metar.visibility_statute_mi { - Some(d) => Some(d.to_string()), + Some(m) => Some(m.to_string()), None => None }, altim_in_hg: metar.altim_in_hg, sea_level_pressure_mb: metar.sea_level_pressure_mb, - qcf_auto: metar.qcf_auto, - qcf_auto_station: metar.qcf_auto_station, - wx_string: match &metar.wx_string { - Some(d) => Some(d.to_string()), + qcf_auto: match &metar.quality_control_flags { + Some(m) => m.auto, + None => None + }, + qcf_auto_station: match &metar.quality_control_flags { + Some(m) => m.auto_station, + None => None + }, + wx_string: match &metar.wx_string { + Some(m) => Some(m.to_string()), + None => None + }, + sky_condition: match &metar.sky_condition { + Some(s) => { + let mut sc: Vec = vec![]; + for condition in s { + if let Some(cloud_base) = condition.cloud_base_ft_agl { + sc.push(format!("{} {}", condition.sky_cover, cloud_base)); + } else { + sc.push(format!("{}", condition.sky_cover)); + } + } + Some(sc) + }, None => None }, - sky_condition: metar.sky_condition.to_owned(), flight_category: metar.flight_category.to_string(), three_hr_pressure_tendency_mb: metar.three_hr_pressure_tendency_mb, metar_type: metar.metar_type.to_string(), max_t_c: metar.max_t_c, min_t_c: metar.min_t_c, precip_in: metar.precip_in, - elevation_m: metar.elevation_m, - }) + elevation_m: metar.elevation_m + }); } - returned_metars.append(&mut db_metars); - Ok(returned_metars) + return insert_metars; } -} \ No newline at end of file + + pub async fn get_all(icaos: String) -> Result, ServiceError> { + if icaos.is_empty() { + return Ok(vec![]); + } + + let station_icaos: Vec<&str> = icaos.split(',').collect(); + let mut db_metars = match QueryMetar::get_all(&station_icaos) { + Ok(m) => Self::from_query(m), + Err(err) => return Err(err) + }; + + let missing_icaos = Self::get_missing_metar_icaos(&db_metars, &station_icaos); + if missing_icaos.is_empty() { + return Ok(db_metars); + } + trace!("Retrieving missing METAR data for {:?}", missing_icaos); + let missing_icaos_string: Vec = missing_icaos.iter().map(|icao| format!("'{}'", icao.to_string())).collect(); + let mut missing_metars = Self::get_remote_metars(missing_icaos_string.join(",")).await; + if missing_metars.len() > 0 { + let insert_metars = Self::to_insert(&missing_metars); + let mut conn = db::connection()?; + match diesel::insert_into(metars::table).values(&insert_metars).execute(&mut conn) { + Ok(rows) => trace!("Inserted {} metar rows", rows), + Err(err) => warn!("Unable to insert metar data; {}", err) + }; + } + let mut metars: Vec = vec![]; + metars.append(&mut missing_metars); + metars.append(&mut db_metars); + Ok(metars) + } +} + +#[derive(Serialize, Deserialize, AsChangeset, Insertable)] +#[diesel(table_name = metars)] +struct InsertMetar { + raw_text: String, + station_id: String, + observation_time: String, + latitude: f64, + longitude: f64, + temp_c: Option, + dewpoint_c: Option, + wind_dir_degrees: Option, + wind_speed_kt: Option, + visibility_statute_mi: Option, + altim_in_hg: Option, + sea_level_pressure_mb: Option, + qcf_auto: Option, + qcf_auto_station: Option, + wx_string: Option, + sky_condition: Option>, + flight_category: String, + three_hr_pressure_tendency_mb: Option, + metar_type: String, + #[serde(rename = "maxT_c")] + max_t_c: Option, + #[serde(rename = "minT_c")] + min_t_c: Option, + precip_in: Option, + elevation_m: i32 +} + +#[derive(Serialize, Deserialize, Queryable, QueryableByName)] +#[diesel(table_name = metars)] +struct QueryMetar { + id: i32, + raw_text: String, + station_id: String, + observation_time: String, + latitude: f64, + longitude: f64, + temp_c: Option, + dewpoint_c: Option, + wind_dir_degrees: Option, + wind_speed_kt: Option, + visibility_statute_mi: Option, + altim_in_hg: Option, + sea_level_pressure_mb: Option, + qcf_auto: Option, + qcf_auto_station: Option, + wx_string: Option, + sky_condition: Option>, + flight_category: String, + three_hr_pressure_tendency_mb: Option, + metar_type: String, + #[serde(rename = "maxT_c")] + max_t_c: Option, + #[serde(rename = "minT_c")] + min_t_c: Option, + precip_in: Option, + elevation_m: i32 +} + +impl QueryMetar { + fn get_all(icaos: &Vec<&str>) -> Result, ServiceError> { + let station_query: Vec = icaos.iter().map(|icao| format!("'{}'", icao.to_string())).collect(); + + let mut conn = db::connection()?; + let db_metars: Vec = match sql_query(format!("SELECT DISTINCT ON (station_id) * FROM metars WHERE station_id IN ({}) ORDER BY station_id, observation_time DESC", station_query.join(","))).load(&mut conn) { + Ok(m) => m, + Err(err) => return Err(ServiceError { error_status_code: 500, error_message: format!("{}", err) }) + }; + return Ok(db_metars); + } +} diff --git a/weather-service/src/metars/routes.rs b/weather-service/src/metars/routes.rs index 7dfe3ad..c6b54f4 100644 --- a/weather-service/src/metars/routes.rs +++ b/weather-service/src/metars/routes.rs @@ -1,18 +1,18 @@ use crate::{error_handler::ServiceError, db::Metadata}; -use crate::metars::QueryMetar; +use crate::metars::Metar; use actix_web::{get, web, HttpResponse, Responder}; use log::error; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct MetarsResponse { - pub data: Vec, + pub data: Vec, pub meta: Metadata } #[get("metars/{ids}")] async fn get_all(ids: web::Path) -> impl Responder { - let airports = match web::block(|| Ok::<_, ServiceError>(async {QueryMetar::get_all(ids.into_inner()).await})) + let airports = match web::block(|| Ok::<_, ServiceError>(async {Metar::get_all(ids.into_inner()).await})) .await .unwrap() .unwrap()