diff --git a/service/Cargo.lock b/service/Cargo.lock index 78b9968..5bf3786 100644 --- a/service/Cargo.lock +++ b/service/Cargo.lock @@ -628,6 +628,7 @@ dependencies = [ "itoa", "pq-sys", "r2d2", + "serde_json", "uuid", ] @@ -1569,9 +1570,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", @@ -1581,9 +1582,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", @@ -1592,9 +1593,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -1817,6 +1818,7 @@ dependencies = [ "quick-xml", "r2d2", "redis", + "regex", "reqwest", "rustix", "serde", diff --git a/service/Cargo.toml b/service/Cargo.toml index 385bf54..9d6a043 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -16,7 +16,7 @@ actix-web-httpauth = "0.8.1" actix-multipart = "0.6.1" chrono = { version = "0.4.31", features = ["serde"] } dotenv = "0.15.0" -diesel = { version = "2.1.2", features = ["postgres", "r2d2", "uuid", "chrono"] } +diesel = { version = "2.1.2", features = ["postgres", "r2d2", "uuid", "chrono", "serde_json"] } postgis_diesel = { version = "2.2.1", features = ["serde"] } diesel_migrations = { version = "2.1.0", features = ["postgres"] } env_logger = "0.10.0" @@ -33,3 +33,4 @@ argon2 = "0.5.2" jsonwebtoken = "9.0.0" redis = { version = "0.23.3", features = ["tokio-comp", "connection-manager", "r2d2"] } rustix = "0.38.19" # https://github.com/imsnif/bandwhich/issues/284 +regex = "1.10.2" diff --git a/service/migrations/000001_metars/up.sql b/service/migrations/000001_metars/up.sql index b786ebc..8b0fa36 100644 --- a/service/migrations/000001_metars/up.sql +++ b/service/migrations/000001_metars/up.sql @@ -1,26 +1,7 @@ CREATE TABLE IF NOT EXISTS metars ( - id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - raw_text TEXT NOT NULL, - station_id TEXT NOT NULL, - observation_time TEXT NOT NULL, - latitude DOUBLE PRECISION NOT NULL, - longitude DOUBLE PRECISION NOT NULL, - temp_c DOUBLE PRECISION, - dewpoint_c DOUBLE PRECISION, - wind_dir_degrees TEXT, - wind_speed_kt INTEGER, - visibility_statute_mi TEXT, - altim_in_hg DOUBLE PRECISION, - sea_level_pressure_mb DOUBLE PRECISION, - qcf_auto BOOLEAN, - qcf_auto_station BOOLEAN, - wx_string TEXT, - sky_condition TEXT[], - flight_category TEXT, - three_hr_pressure_tendency_mb DOUBLE PRECISION, - metar_type TEXT NOT NULL, - max_t_c DOUBLE PRECISION, - min_t_c DOUBLE PRECISION, - precip_in DOUBLE PRECISION, - elevation_m INTEGER NOT NULL + id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + station_id TEXT NOT NULL, + observation_time TIMESTAMP NOT NULL, + raw_text TEXT NOT NULL, + data JSONB NOT NULL ); \ No newline at end of file diff --git a/service/src/db/schema.rs b/service/src/db/schema.rs index 419f9cd..3d61a0f 100644 --- a/service/src/db/schema.rs +++ b/service/src/db/schema.rs @@ -21,29 +21,10 @@ diesel::table! { diesel::table! { metars (id) { id -> Integer, - raw_text -> Text, station_id -> Text, - observation_time -> Text, - latitude -> Double, - longitude -> Double, - temp_c -> Nullable, - dewpoint_c -> Nullable, - wind_dir_degrees -> Nullable, - wind_speed_kt -> Nullable, - visibility_statute_mi -> Nullable, - altim_in_hg -> Nullable, - sea_level_pressure_mb -> Nullable, - qcf_auto -> Nullable, - qcf_auto_station -> Nullable, - wx_string -> Nullable, - sky_condition -> Nullable>, - flight_category -> Text, - three_hr_pressure_tendency_mb -> Nullable, - metar_type -> Text, - max_t_c -> Nullable, - min_t_c -> Nullable, - precip_in -> Nullable, - elevation_m -> Integer, + observation_time -> Timestamp, + raw_text -> Text, + data -> Jsonb, } } diff --git a/service/src/metars/model.rs b/service/src/metars/model.rs index c03b626..ce84b35 100644 --- a/service/src/metars/model.rs +++ b/service/src/metars/model.rs @@ -1,384 +1,553 @@ use crate::{error_handler::ServiceError, db}; use crate::db::schema::metars::{self}; +use chrono::Datelike; use diesel::{prelude::*, sql_query}; use log::{warn, trace}; use std::collections::HashSet; -use std::io::BufRead; -use quick_xml::{Reader, events::{Event, BytesStart}, Writer, de::Deserializer}; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] pub struct QualityControlFlags { - pub auto: Option, - pub auto_station: Option + pub auto: Option, + pub auto_station: Option, + pub maintenance_indicator_on: Option, + pub corrected: Option +} + +impl Default for QualityControlFlags { + fn default() -> Self { + QualityControlFlags { + auto: None, + auto_station: None, + maintenance_indicator_on: None, + corrected: None, + } + } } #[derive(Serialize, Deserialize, Debug)] pub struct SkyCondition { - #[serde(rename(deserialize = "@sky_cover"))] - pub sky_cover: String, - #[serde(rename(deserialize = "@cloud_base_ft_agl"))] - pub cloud_base_ft_agl: Option + pub sky_cover: String, + pub cloud_base_ft_agl: Option +} + +impl Default for SkyCondition { + fn default() -> Self { + SkyCondition { + sky_cover: "".to_string(), + cloud_base_ft_agl: None + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RunwayVisualRange { + pub runway: String, + pub visibility_ft: Option, + pub variable_visibility_high_ft: Option, + pub variable_visibility_low_ft: Option +} + +impl Default for RunwayVisualRange { + fn default() -> Self { + RunwayVisualRange { + runway: "".to_string(), + visibility_ft: None, + variable_visibility_high_ft: None, + variable_visibility_low_ft: None + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum FlightCategory { + VFR, + MVFR, + LIFR, + IFR, + UNKN } #[derive(Serialize, Deserialize, Debug)] pub struct Metar { - pub raw_text: String, - pub station_id: String, - pub observation_time: String, - pub latitude: f64, - pub longitude: f64, - pub temp_c: Option, - pub dewpoint_c: Option, - pub wind_dir_degrees: Option, - pub wind_speed_kt: Option, - pub visibility_statute_mi: Option, - pub altim_in_hg: Option, - pub sea_level_pressure_mb: Option, - pub quality_control_flags: Option, - pub wx_string: Option, - pub sky_condition: Option>, - pub flight_category: String, - pub three_hr_pressure_tendency_mb: Option, - pub metar_type: String, - #[serde(rename = "maxT_c")] - pub max_t_c: Option, - #[serde(rename = "minT_c")] - pub min_t_c: Option, - pub precip_in: Option, - pub elevation_m: i32 + pub raw_text: String, + pub station_id: String, + pub observation_time: chrono::NaiveDateTime, + pub temp_c: Option, + pub dewpoint_c: Option, + pub wind_dir_degrees: Option, + pub wind_speed_kt: Option, + pub wind_gust_kt: Option, + pub variable_wind_dir_degrees: Option, + pub visibility_statute_mi: Option, + pub runway_visual_range: Vec, + pub altim_in_hg: Option, + pub sea_level_pressure_mb: Option, + pub quality_control_flags: QualityControlFlags, + pub weather_phenomena: Vec, + pub sky_condition: Vec, + pub flight_category: FlightCategory, + pub three_hr_pressure_tendency_mb: Option, + pub max_t_c: Option, + pub min_t_c: Option, + pub precip_in: Option, + pub elevation_m: i32 +} + +impl Default for Metar { + fn default() -> Self { + Metar { + raw_text: "".to_string(), + station_id: "".to_string(), + observation_time: chrono::NaiveDateTime::parse_from_str("1970-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(), + temp_c: None, + dewpoint_c: None, + wind_dir_degrees: None, + wind_speed_kt: None, + wind_gust_kt: None, + variable_wind_dir_degrees: None, + visibility_statute_mi: None, + runway_visual_range: vec![], + altim_in_hg: None, + sea_level_pressure_mb: None, + quality_control_flags: QualityControlFlags::default(), + weather_phenomena: vec![], + sky_condition: vec![], + flight_category: FlightCategory::UNKN, + three_hr_pressure_tendency_mb: None, + max_t_c: None, + min_t_c: None, + precip_in: None, + elevation_m: 0 + } + } } impl Metar { - fn parse(input: String) -> Result, ServiceError> { - if input.is_empty() { - return Err(ServiceError::new(500, "Input is empty".to_string())) - } - - let mut reader = Reader::from_str(&input); - let mut buf = Vec::new(); - let mut junk_buf: Vec = Vec::new(); - let mut metars: Vec = vec![]; - - loop { - match reader.read_event_into(&mut buf) { - Err(e) => panic!("Error at position: {}: {:?}", reader.buffer_position(), e), - Ok(Event::Eof) => break, - Ok(Event::Start(e)) => { - match e.name().as_ref() { - b"METAR" => { - let metar_bytes = Self::read_to_end_into_buffer(&mut reader, &e, &mut junk_buf).unwrap(); - let str = std::str::from_utf8(&metar_bytes).unwrap(); - let mut deserializer = Deserializer::from_str(str); - match Self::deserialize(&mut deserializer) { - Ok(m) => metars.push(m), - Err(err) => warn!("Error deserializing; {}", err) - }; - }, - _ => () - } - }, - _ => () - } - } - - return Ok(metars) - } - - // https://capnfabs.net/posts/parsing-huge-xml-quickxml-rust-serde/ - fn read_to_end_into_buffer(reader: &mut Reader, start_tag: &BytesStart, junk_buf: &mut Vec) -> Result, quick_xml::Error> { - let mut depth = 0; - let mut output_buf: Vec = Vec::new(); - let mut w = Writer::new(&mut output_buf); - let tag_name = start_tag.name(); - w.write_event(Event::Start(start_tag.clone()))?; - loop { - junk_buf.clear(); - let event = reader.read_event_into(junk_buf)?; - w.write_event(&event)?; + fn parse(metar_strings: Vec<&str>) -> Result, ServiceError> { + // Parse a metar in the format of: KSMF 211653Z 01004KT 10SM BKN250 11/06 A3041 RMK AO2 SLP296 T01060061 + let mut metars: Vec = vec![]; + for metar_string in metar_strings { + trace!("Parsing METAR data: {}", metar_string); + let mut metar: Metar = Metar::default(); + metar.raw_text = metar_string.to_owned(); + let mut metar_parts: Vec<&str> = metar_string.split_whitespace().collect(); + if metar_parts.len() < 4 { + warn!("Unable to parse METAR data in an unexpected format: {}", metar_string); + continue; + } - match event { - Event::Start(e) if e.name() == tag_name => depth += 1, - Event::End(e) if e.name() == tag_name => { - if depth == 0 { - return Ok(output_buf); - } - depth -= 1; - } - Event::Eof => { - panic!("EOF") - } - _ => {} - } - } - } + // Station Identifier + metar.station_id = metar_parts[0].to_string(); + metar_parts.remove(0); + + // Date/Time + let observation_time = metar_parts[0]; + metar_parts.remove(0); + let observation_time_day = &observation_time[0..2]; + let observation_time_hour = &observation_time[2..4]; + let observation_time_minute = &observation_time[4..6]; + let current_time = chrono::Utc::now().naive_utc(); + // Check if the observation time is from the previous month + let observation_time_month = if current_time.day() > observation_time_day.parse::().unwrap() { + current_time.month() - 1 + } else { + current_time.month() + }; + // Check if the observation time is from the previous year + let observation_time_year = if current_time.month() > observation_time_month { + current_time.year() - 1 + } else { + current_time.year() + }; + // Handle Daylight Savings Time + let observation_time_hour = if observation_time_month == 3 && observation_time_day.parse::().unwrap() < 14 { + observation_time_hour.parse::().unwrap() - 1 + } else { + observation_time_hour.parse::().unwrap() + }; + let observation_time = format!("{}-{}-{}T{}:{}:00Z", observation_time_year, observation_time_month, observation_time_day, observation_time_hour, observation_time_minute); + metar.observation_time = chrono::NaiveDateTime::parse_from_str(&observation_time, "%Y-%m-%dT%H:%M:%SZ").unwrap(); - fn get_missing_metar_icaos(db_metars: &Vec, station_icaos: &Vec<&str>) -> Vec { - let mut missing_metar_icaos: Vec = vec![]; - let current_time = chrono::Local::now().naive_local().timestamp(); - let db_metars_set: HashSet<&str> = db_metars.iter().map(|icao| icao.station_id.as_str()).collect(); - let station_icaos_set: HashSet<&str> = station_icaos.to_owned().into_iter().collect(); - for difference in db_metars_set.symmetric_difference(&station_icaos_set) { - missing_metar_icaos.push(difference.to_string()); - } - for metar in db_metars { - match chrono::NaiveDateTime::parse_and_remainder(&metar.observation_time, "%Y-%m-%dT%H:%M:%S") { - Ok((time, _)) => { - if current_time > (time.timestamp() + 3600) { - trace!("{} METAR data is outdated", metar.station_id); - missing_metar_icaos.push(metar.station_id.to_string()); - } - }, - Err(err) => { - warn!("Parsing METAR timestamp failed; {}", err); - missing_metar_icaos.push(metar.station_id.to_string()); - } - }; - } - return missing_metar_icaos; - } + // Report Modifiers + if metar_parts[0] == "AUTO" { + metar.quality_control_flags.auto = Some(true); + metar_parts.remove(0); + } else if metar_parts[0] == "COR" { + metar.quality_control_flags.corrected = Some(true); + metar_parts.remove(0); + } - async fn get_remote_metars(icaos: String) -> Vec { - let gov_api_url = std::env::var("GOV_API_URL").expect("GOV_API_URL must be set"); - let url = format!("{}/metar.php?ids={}&format=xml", gov_api_url, icaos); - match reqwest::get(url).await { - Ok(r) => match r.text().await { - Ok(r) => { - match Metar::parse(r) { - Ok(m) => m, - Err(err) => { - warn!("{}", err); - vec![] - } - } - }, - Err(err) => { - warn!("Unable to parse METAR request: {}", err); - vec![] - } - }, + // Wind Direction and Speed + let wind_re = regex::Regex::new(r"^(?:[0-9]{3}|VRB)[0-9]{2}KT$").unwrap(); + let wind_gust_re = regex::Regex::new(r"^(?:[0-9]{3}|VRB)[0-9]{2}G[0-9]{2}KT$").unwrap(); + if wind_re.is_match(metar_parts[0]) { + let wind = metar_parts[0]; + metar_parts.remove(0); + let wind_dir_degrees = &wind[0..3]; + let wind_speed_kt = &wind[3..5]; + metar.wind_dir_degrees = Some(wind_dir_degrees.to_string()); + metar.wind_speed_kt = Some(wind_speed_kt.parse::().unwrap()); + } else if wind_gust_re.is_match(metar_parts[0]) { + let wind = metar_parts[0]; + metar_parts.remove(0); + let wind_dir_degrees = &wind[0..3]; + let wind_speed_kt = &wind[3..5]; + metar.wind_dir_degrees = Some(wind_dir_degrees.to_string()); + metar.wind_speed_kt = Some(wind_speed_kt.parse::().unwrap()); + // Gust + let wind_gust_kt = &wind[6..8]; + metar.wind_gust_kt = Some(wind_gust_kt.parse::().unwrap()); + } + + // Variable Wind Direction + let variable_wind_re = regex::Regex::new(r"^[0-9]{3}V[0-9]{3}$").unwrap(); + if variable_wind_re.is_match(metar_parts[0]) { + metar.variable_wind_dir_degrees = Some(metar_parts[0].to_string()); + metar_parts.remove(0); + } + + // Visibility + let visibility_re = regex::Regex::new(r"^M?(?:[0-9]+|[0-9]+/[0-9]+)SM").unwrap(); + if visibility_re.is_match(metar_parts[0]) { + let visibility_str = &metar_parts[0][0..metar_parts[0].len() - 2]; + metar_parts.remove(0); + let visibility: String = if visibility_str.contains("/") { + let visibility_parts: Vec<&str> = visibility_str.split("/").collect(); + let visibility_left = visibility_parts[0]; + let visibility_right = visibility_parts[1].parse::().unwrap(); + if visibility_left.starts_with("M") { + format!("M{}", visibility_left[1..visibility_left.len()].parse::().unwrap() / visibility_right) + } else if visibility_left.starts_with("P") { + format!("P{}", visibility_left[1..visibility_left.len()].parse::().unwrap() / visibility_right) + } else { + format!("{}", visibility_left.parse::().unwrap() / visibility_right) + } + } else { + visibility_str.to_string() + }; + metar.visibility_statute_mi = Some(visibility); + } else if metar_parts[0].parse::().is_ok() && metar_parts.len() > 1 && visibility_re.is_match(metar_parts[1]) { + let visibility_whole = metar_parts[0].parse::().unwrap(); + metar_parts.remove(0); + let visibility_parts: Vec<&str> = metar_parts[0].split("/").collect(); + metar_parts.remove(0); + let visibility_left = visibility_parts[0]; + let visibility_right = visibility_parts[1][0..visibility_parts[1].len() - 2].parse::().unwrap(); + let visibility = if visibility_left.starts_with("M") { + format!("M{}", visibility_whole + (visibility_left[1..visibility_left.len()].parse::().unwrap() / visibility_right)) + } else if visibility_left.starts_with("P") { + format!("P{}", visibility_whole + (visibility_left[1..visibility_left.len()].parse::().unwrap() / visibility_right)) + } else { + format!("{}", visibility_whole + (visibility_left.parse::().unwrap() / visibility_right)) + }; + metar.visibility_statute_mi = Some(visibility); + } + + // Runway Visual Range + let rvr_re = regex::Regex::new(r"^R[0-9]{1,3}(?:L|R)?/[PM]?[0-9]{4}FT$").unwrap(); + let variable_rvr_re = regex::Regex::new(r"^R[0-9]{1,3}(?:L|R)?/[PM]?[0-9]{4}V[PM]?[0-9]{4}FT$").unwrap(); + while rvr_re.is_match(metar_parts[0]) || variable_rvr_re.is_match(metar_parts[0]) { + let rvr_string = metar_parts[0]; + metar_parts.remove(0); + let mut rvr = RunwayVisualRange::default(); + let rvr_parts: Vec<&str> = rvr_string.split("/").collect(); + rvr.runway = rvr_parts[0].to_string(); + if rvr_re.is_match(rvr_string) { + rvr.visibility_ft = Some(rvr_parts[1].to_string()); + } else { + let rvr_variable_parts: Vec<&str> = rvr_parts[1].split("V").collect(); + if rvr_variable_parts.len() != 2 { + warn!("Unable to parse runway visual range in {}: {}", rvr_string, metar_string); + } else { + rvr.variable_visibility_low_ft = Some(rvr_variable_parts[0].to_string()); + rvr.variable_visibility_high_ft = Some(rvr_variable_parts[1].to_string()); + } + } + } + + // Weather Phenomena + let wx_re = regex::Regex::new(r"^[+-]?(?:RA|SN|UP|FG|FZFG|BR|HZ|SQ|FC|TS|GR|GS|FZRA|VA|DZ)$").unwrap(); + while wx_re.is_match(metar_parts[0]) { + metar.weather_phenomena.push(metar_parts[0].to_string()); + metar_parts.remove(0); + } + + // Sky Condition + let sky_condition_re = regex::Regex::new(r"^(?:CLR|SKC|(?:FEW|SCT|BKN|OVC|VV)([0-9]{3})?)$").unwrap(); + while sky_condition_re.is_match(metar_parts[0]) { + let sky_condition_string = metar_parts[0]; + metar_parts.remove(0); + let mut sky_condition = SkyCondition::default(); + let sky_cover = &sky_condition_string[0..3]; + sky_condition.sky_cover = sky_cover.to_string(); + if sky_condition_string.len() > 3 { + sky_condition.cloud_base_ft_agl = Some(sky_condition_string[3..sky_condition_string.len()].parse::().unwrap() * 100); + } + metar.sky_condition.push(sky_condition); + } + + // Temperature and Dewpoint + let temp_re = regex::Regex::new(r"^(?:M?[0-9]{2})?/(?:M?[0-9]{2})?$").unwrap(); + if temp_re.is_match(metar_parts[0]) { + let temp_string = metar_parts[0]; + metar_parts.remove(0); + let temp_parts: Vec<&str> = temp_string.split("/").collect(); + let mut temp_c = ""; + let mut dewpoint_c = ""; + if temp_parts.len() != 2 { + if temp_string.ends_with("/") { + temp_c = temp_parts[0]; + } else { + dewpoint_c = temp_parts[0]; + } + } else { + temp_c = temp_parts[0]; + dewpoint_c = temp_parts[1]; + } + if temp_c.starts_with("M") { + metar.temp_c = Some(temp_c[1..temp_c.len()].parse::().unwrap() * -1.0); + } else if !temp_c.is_empty() { + metar.temp_c = match temp_c.parse::() { + Ok(t) => Some(t), Err(err) => { - warn!("Unable to get METAR request: {}", err); + warn!("Unable to parse temperature in {}: {}", temp_c, err); + None + } + }; + } + if dewpoint_c.starts_with("M") { + metar.dewpoint_c = Some(dewpoint_c[1..dewpoint_c.len()].parse::().unwrap() * -1.0); + } else if !dewpoint_c.is_empty() { + metar.dewpoint_c = match dewpoint_c.parse::() { + Ok(d) => Some(d), + Err(err) => { + warn!("Unable to parse dewpoint in {}: {}", dewpoint_c, err); + None + } + }; + } + } + + // Altimeter + let altim_re = regex::Regex::new(r"^A[0-9]{4}$").unwrap(); + if altim_re.is_match(metar_parts[0]) { + let altim = metar_parts[0]; + metar_parts.remove(0); + metar.altim_in_hg = Some(altim[1..altim.len()].parse::().unwrap() / 100.0); + } + + // Remarks + if !metar_parts.is_empty() { + if metar_parts[0] == "RMK" { + metar_parts.remove(0); + } else { + warn!("Unexpected field found, skipping METAR: '{}' ({})", metar_parts[0], metar_string); + continue; + } + } + loop { + if metar_parts.is_empty() { + break; + } + let remark = metar_parts[0]; + metar_parts.remove(0); + if remark == "AO2" { + metar.quality_control_flags.auto_station = Some(true); + } + } + + // Flight Category + // VFR: Visibility >= 5 miles, Ceiling >= 3000 ft + // MVFR: Visibility >= 3 miles, Ceiling >= 1000 ft + // IFR: Visibility >= 1 mile, Ceiling >= 500 ft + // LIFR: Visibility < 1 mile, Ceiling < 500 ft + // UNKN: Visibility or Ceiling is missing + if metar.visibility_statute_mi.is_none() || metar.sky_condition.is_empty() { + metar.flight_category = FlightCategory::UNKN; + } else { + let visibility = match &metar.visibility_statute_mi { + Some(v) => { + if v.starts_with("M") || v.starts_with("P") { + v[1..v.len()].parse::().unwrap() + } else { + v.parse::().unwrap() + } + } + None => 0.0 + }; + let ceiling = match metar.sky_condition.first() { + Some(s) => { + if s.sky_cover == "CLR" || s.sky_cover == "SKC" { + 3000.0 + } else if s.sky_cover == "VV" { + 0.0 + } else { + match s.cloud_base_ft_agl { + Some(c) => c as f64, + None => 0.0 + } + } + }, + None => 3000.0 // Assume VFR if no sky condition is present + }; + if visibility >= 5.0 && ceiling >= 3000.0 { + metar.flight_category = FlightCategory::VFR; + } else if visibility >= 3.0 && ceiling >= 1000.0 { + metar.flight_category = FlightCategory::MVFR; + } else if visibility >= 1.0 && ceiling >= 500.0 { + metar.flight_category = FlightCategory::IFR; + } else { + metar.flight_category = FlightCategory::LIFR; + } + } + + metars.push(metar); + } + return Ok(metars) + } + + fn get_missing_metar_icaos(db_metars: &Vec, station_icaos: &Vec<&str>) -> Vec { + let mut missing_metar_icaos: Vec = vec![]; + let current_time = chrono::Local::now().naive_local().timestamp(); + let db_metars_set: HashSet<&str> = db_metars.iter().map(|icao| icao.station_id.as_str()).collect(); + let station_icaos_set: HashSet<&str> = station_icaos.to_owned().into_iter().collect(); + for difference in db_metars_set.symmetric_difference(&station_icaos_set) { + missing_metar_icaos.push(difference.to_string()); + } + for metar in db_metars { + if current_time > (metar.observation_time.timestamp() + 3600) { + trace!("{} METAR data is outdated", metar.station_id); + missing_metar_icaos.push(metar.station_id.to_string()); + } + } + return missing_metar_icaos; + } + + async fn get_remote_metars(icaos: String) -> Vec { + let gov_api_url = std::env::var("GOV_API_URL").expect("GOV_API_URL must be set"); + let url = format!("{}/metar.php?ids={}", gov_api_url, icaos); + match reqwest::get(url).await { + Ok(r) => match r.text().await { + Ok(r) => { + let metar_strings = r.trim().split("\n").filter(|m| !m.trim().is_empty()).collect(); + match Metar::parse(metar_strings) { + Ok(m) => m, + Err(err) => { + warn!("{}", err); vec![] } + } + }, + Err(err) => { + warn!("Unable to parse METAR request: {}", err); + vec![] } + }, + Err(err) => { + warn!("Unable to get METAR request: {}", err); + vec![] + } + } + } + + fn from_query(query_metars: Vec) -> Vec { + let mut metars: Vec = vec![]; + for metar in query_metars { + let mut metar: Metar = serde_json::from_value(metar.data).unwrap(); + metar.raw_text = metar.raw_text.to_string(); + metar.station_id = metar.station_id.to_string(); + metars.push(metar); + } + return metars; + } + + fn to_insert(metars: &Vec) -> Vec { + let mut insert_metars: Vec = vec![]; + for metar in metars { + insert_metars.push(InsertMetar { + station_id: metar.station_id.to_string(), + observation_time: metar.observation_time, + raw_text: metar.raw_text.to_string(), + data: serde_json::to_value(metar).unwrap() + }); + } + return insert_metars; + } + + pub async fn get_all(icao_string: String) -> Result, ServiceError> { + if icao_string.is_empty() { + return Ok(vec![]); } - fn from_query(query_metars: Vec) -> Vec { - let mut metars: Vec = vec![]; - for metar in query_metars { - let quality_control_flags = Some(QualityControlFlags { - auto: metar.qcf_auto, - auto_station: metar.qcf_auto_station - }); - let sky_condition = match metar.sky_condition { - Some(s) => { - let mut sc: Vec = vec![]; - for string in s { - let split: Vec<&str> = string.split_whitespace().collect(); - if split.len() == 1 { - sc.push(SkyCondition { sky_cover: split[0].to_string(), cloud_base_ft_agl: None }) - } else if split.len() == 2 { - let cloud_base = split[1].parse::().unwrap(); - sc.push(SkyCondition { sky_cover: split[0].to_string(), cloud_base_ft_agl: Some(cloud_base) }) - } - } - Some(sc) - }, - None => None - }; - metars.push(Metar { - raw_text: metar.raw_text, - station_id: metar.station_id, - observation_time: metar.observation_time, - latitude: metar.latitude, - longitude: metar.longitude, - temp_c: metar.temp_c, - dewpoint_c: metar.dewpoint_c, - wind_dir_degrees: metar.wind_dir_degrees, - wind_speed_kt: metar.wind_speed_kt, - visibility_statute_mi: metar.visibility_statute_mi, - altim_in_hg: metar.altim_in_hg, - sea_level_pressure_mb: metar.sea_level_pressure_mb, - quality_control_flags, - wx_string: metar.wx_string, - sky_condition, - flight_category: metar.flight_category, - three_hr_pressure_tendency_mb: metar.three_hr_pressure_tendency_mb, - metar_type: metar.metar_type, - max_t_c: metar.max_t_c, - min_t_c: metar.min_t_c, - precip_in: metar.precip_in, - elevation_m: metar.elevation_m - }) - } - return metars; + let icaos: Vec<&str> = icao_string.split(",").collect(); + + let mut db_metars = match QueryMetar::get_all(&icaos) { + Ok(m) => Self::from_query(m), + Err(err) => return Err(err) + }; + + let missing_icaos = Self::get_missing_metar_icaos(&db_metars, &icaos); + if missing_icaos.is_empty() { + return Ok(db_metars); } - - fn to_insert(metars: &Vec) -> Vec { - let mut insert_metars: Vec = vec![]; - for metar in metars { - insert_metars.push(InsertMetar { - raw_text: metar.raw_text.to_string(), - station_id: metar.station_id.to_string(), - observation_time: metar.observation_time.to_string(), - latitude: metar.latitude, - longitude: metar.longitude, - temp_c: metar.temp_c, - dewpoint_c: metar.dewpoint_c, - wind_dir_degrees: match &metar.wind_dir_degrees { - Some(m) => Some(m.to_string()), - None => None - }, - wind_speed_kt: metar.wind_speed_kt, - visibility_statute_mi: match &metar.visibility_statute_mi { - Some(m) => Some(m.to_string()), - None => None - }, - altim_in_hg: metar.altim_in_hg, - sea_level_pressure_mb: metar.sea_level_pressure_mb, - qcf_auto: match &metar.quality_control_flags { - Some(m) => m.auto, - None => None - }, - qcf_auto_station: match &metar.quality_control_flags { - Some(m) => m.auto_station, - None => None - }, - wx_string: match &metar.wx_string { - Some(m) => Some(m.to_string()), - None => None - }, - sky_condition: match &metar.sky_condition { - Some(s) => { - let mut sc: Vec = vec![]; - for condition in s { - if let Some(cloud_base) = condition.cloud_base_ft_agl { - sc.push(format!("{} {}", condition.sky_cover, cloud_base)); - } else { - sc.push(format!("{}", condition.sky_cover)); - } - } - Some(sc) - }, - None => None - }, - flight_category: metar.flight_category.to_string(), - three_hr_pressure_tendency_mb: metar.three_hr_pressure_tendency_mb, - metar_type: metar.metar_type.to_string(), - max_t_c: metar.max_t_c, - min_t_c: metar.min_t_c, - precip_in: metar.precip_in, - elevation_m: metar.elevation_m - }); - } - return insert_metars; - } - - pub async fn get_all(icao_string: String) -> Result, ServiceError> { - if icao_string.is_empty() { - return Ok(vec![]); - } - - let icaos: Vec<&str> = icao_string.split(",").collect(); - - let mut db_metars = match QueryMetar::get_all(&icaos) { - Ok(m) => Self::from_query(m), - Err(err) => return Err(err) - }; - - let missing_icaos = Self::get_missing_metar_icaos(&db_metars, &icaos); - if missing_icaos.is_empty() { - return Ok(db_metars); - } - trace!("Retrieving missing METAR data for {:?}", missing_icaos); - let missing_icaos_string: Vec = missing_icaos.iter().map(|icao| format!("{}", icao.to_string())).collect(); - let mut missing_metars = Self::get_remote_metars(missing_icaos_string.join(",")).await; - if missing_metars.len() > 0 { - let insert_metars = Self::to_insert(&missing_metars); - let mut conn = db::connection()?; - match diesel::insert_into(metars::table).values(&insert_metars).execute(&mut conn) { - Ok(rows) => trace!("Inserted {} metar rows", rows), - Err(err) => warn!("Unable to insert metar data; {}", err) - }; - } - let mut metars: Vec = vec![]; - metars.append(&mut missing_metars); - metars.append(&mut db_metars); - Ok(metars) + trace!("Retrieving missing METAR data for {:?}", missing_icaos); + let missing_icaos_string: Vec = missing_icaos.iter().map(|icao| format!("{}", icao.to_string())).collect(); + let mut missing_metars = Self::get_remote_metars(missing_icaos_string.join(",")).await; + if missing_metars.len() > 0 { + let insert_metars = Self::to_insert(&missing_metars); + match InsertMetar::insert(&insert_metars) { + Ok(rows) => trace!("Inserted {} metar rows", rows), + Err(err) => warn!("Unable to insert metar data; {}", err) + }; } + let mut metars: Vec = vec![]; + metars.append(&mut missing_metars); + metars.append(&mut db_metars); + Ok(metars) + } } #[derive(Serialize, Deserialize, AsChangeset, Insertable)] #[diesel(table_name = metars)] struct InsertMetar { - raw_text: String, - station_id: String, - observation_time: String, - latitude: f64, - longitude: f64, - temp_c: Option, - dewpoint_c: Option, - wind_dir_degrees: Option, - wind_speed_kt: Option, - visibility_statute_mi: Option, - altim_in_hg: Option, - sea_level_pressure_mb: Option, - qcf_auto: Option, - qcf_auto_station: Option, - wx_string: Option, - sky_condition: Option>, - flight_category: String, - three_hr_pressure_tendency_mb: Option, - metar_type: String, - #[serde(rename = "maxT_c")] - max_t_c: Option, - #[serde(rename = "minT_c")] - min_t_c: Option, - precip_in: Option, - elevation_m: i32 + station_id: String, + observation_time: chrono::NaiveDateTime, + raw_text: String, + data: serde_json::Value +} + +impl InsertMetar { + fn insert(metars: &Vec) -> Result { + let mut conn = db::connection()?; + match diesel::insert_into(metars::table).values(metars).execute(&mut conn) { + Ok(rows) => Ok(rows), + Err(err) => Err(ServiceError { status: 500, message: format!("{}", err) }) + } + } } #[derive(Serialize, Deserialize, Queryable, QueryableByName)] #[diesel(table_name = metars)] struct QueryMetar { - id: i32, - raw_text: String, - station_id: String, - observation_time: String, - latitude: f64, - longitude: f64, - temp_c: Option, - dewpoint_c: Option, - wind_dir_degrees: Option, - wind_speed_kt: Option, - visibility_statute_mi: Option, - altim_in_hg: Option, - sea_level_pressure_mb: Option, - qcf_auto: Option, - qcf_auto_station: Option, - wx_string: Option, - sky_condition: Option>, - flight_category: String, - three_hr_pressure_tendency_mb: Option, - metar_type: String, - #[serde(rename = "maxT_c")] - max_t_c: Option, - #[serde(rename = "minT_c")] - min_t_c: Option, - precip_in: Option, - elevation_m: i32 + id: i32, + station_id: String, + observation_time: chrono::NaiveDateTime, + raw_text: String, + data: serde_json::Value } impl QueryMetar { - fn get_all(icaos: &Vec<&str>) -> Result, ServiceError> { - let station_query: Vec = icaos.iter().map(|icao| format!("'{}'", icao.to_string())).collect(); - - let mut conn = db::connection()?; - let db_metars: Vec = match sql_query(format!("SELECT DISTINCT ON (station_id) * FROM metars WHERE station_id IN ({}) ORDER BY station_id, observation_time DESC", station_query.join(","))).load(&mut conn) { - Ok(m) => m, - Err(err) => return Err(ServiceError { status: 500, message: format!("{}", err) }) - }; - return Ok(db_metars); - } + fn get_all(icaos: &Vec<&str>) -> Result, ServiceError> { + let station_query: Vec = icaos.iter().map(|icao| format!("'{}'", icao.to_string())).collect(); + + let mut conn = db::connection()?; + let db_metars: Vec = match sql_query( + format!("SELECT DISTINCT ON (station_id) * FROM metars WHERE station_id IN ({}) ORDER BY station_id, observation_time DESC", station_query.join(",")) + ).load(&mut conn) { + Ok(m) => m, + Err(err) => return Err(ServiceError { status: 500, message: format!("{}", err) }) + }; + return Ok(db_metars); + } } diff --git a/service/src/scheduler.rs b/service/src/scheduler.rs index e6c7dc4..e991bd8 100644 --- a/service/src/scheduler.rs +++ b/service/src/scheduler.rs @@ -33,12 +33,20 @@ pub fn update_airports() { let airport_icaos: Vec = airports.iter().map(|a| a.icao.to_string()).collect(); let mut peekable = airport_icaos.into_iter().peekable(); + let mut observation_time = 0; + while peekable.peek().is_some() { let chunk: Vec = peekable.by_ref().take(limit as usize).collect(); let icao_string = chunk.join(","); trace!("Updating METARS for: {}", icao_string); match Metar::get_all(icao_string).await { - Ok(_) => { + Ok(metars) => { + // Find the oldest observation time + for metar in metars { + if metar.observation_time.timestamp() < observation_time { + observation_time = metar.observation_time.timestamp(); + } + } sleep(Duration::from_millis(100)).await; }, Err(err) => { @@ -47,7 +55,11 @@ pub fn update_airports() { } } debug!("METAR update complete"); - sleep(Duration::from_secs(60 * 60)).await; + // Sleep until the observation time is 1 hour old + let now = chrono::Utc::now().timestamp(); + let sleep_time = (observation_time + (60 * 60)) - now; + debug!("Next update in {} seconds", sleep_time); + sleep(Duration::from_secs(sleep_time as u64)).await; } }); } \ No newline at end of file