use std::collections::HashMap; use std::str::FromStr; use chrono::{DateTime, Utc}; use futures_util::try_join; use reqwest::Client; use serde::{Deserialize, Serialize}; use sqlx::{Postgres, QueryBuilder}; use crate::airports::{ AirportCategory, Frequency, FrequencyRow, Runway, RunwayRow, UpdateFrequency, UpdateRunway, }; use crate::db; use crate::error::{ApiResult, Error}; use crate::metars::Metar; const TABLE_NAME: &str = "airports"; #[derive(Debug, Serialize, Deserialize)] pub struct Airport { pub icao: String, #[serde(skip_serializing_if = "Option::is_none")] pub iata: Option, #[serde(skip_serializing_if = "Option::is_none")] pub local: Option, pub name: String, pub category: AirportCategory, pub iso_country: String, pub iso_region: String, pub municipality: String, pub elevation_ft: f32, pub longitude: f32, pub latitude: f32, #[serde(skip_serializing_if = "Option::is_none")] pub has_tower: Option, #[serde(skip_serializing_if = "Option::is_none")] pub has_beacon: Option, pub runways: Vec, pub frequencies: Vec, pub public: bool, #[serde(skip_serializing_if = "Option::is_none")] pub latest_metar: Option, } #[derive(Debug, Deserialize)] pub struct AirportQuery { pub page: Option, pub limit: Option, pub icaos: Option, pub iatas: Option, pub locals: Option, pub name: Option, pub categories: Option, pub iso_countries: Option, pub iso_regions: Option, pub municipalities: Option, pub bounds: Option, pub metars: Option, } impl Default for AirportQuery { fn default() -> Self { Self { page: Some(1), limit: Some(1000), icaos: None, iatas: None, locals: None, name: None, categories: None, iso_countries: None, iso_regions: None, municipalities: None, bounds: None, metars: None, } } } #[derive(Debug, Deserialize)] pub struct Bounds { pub north_east_lat: f32, pub north_east_lon: f32, pub south_west_lat: f32, pub south_west_lon: f32, } impl Bounds { fn parse(input: &str) -> ApiResult { let parts: Vec<&str> = input.split(',').collect(); if parts.len() != 4 { return Err(Error::new( 400, format!("Expected 4 fields in bounds but received {}", parts.len()), )); } let north_east_lat = parts[0].trim().parse::()?; let north_east_lon = parts[1].trim().parse::()?; let south_west_lat = parts[2].trim().parse::()?; let south_west_lon = parts[3].trim().parse::()?; Ok(Bounds { north_east_lat, north_east_lon, south_west_lat, south_west_lon, }) } } #[derive(Debug, Deserialize, sqlx::FromRow)] struct AirportRow { pub icao: String, pub iata: Option, pub local: Option, pub name: String, pub category: String, pub iso_country: String, pub iso_region: String, pub municipality: String, pub elevation_ft: f32, longitude: f32, latitude: f32, pub has_tower: Option, pub has_beacon: Option, pub public: bool, pub metar_observation_time: Option>, } #[derive(Debug, Deserialize)] pub struct UpdateAirport { pub icao: Option, pub iata: Option, pub local: Option, pub name: Option, pub category: Option, pub iso_country: Option, pub iso_region: Option, pub municipality: Option, pub elevation_ft: Option, pub longitude: Option, pub latitude: Option, pub has_tower: Option, pub has_beacon: Option, pub runways: Option>, pub frequencies: Option>, pub public: Option, pub latest_metar_observation: Option>, } impl Into for Airport { fn into(self) -> AirportRow { AirportRow { icao: self.icao.clone(), iata: self.iata.clone(), local: self.local.clone(), name: self.name.clone(), category: self.category.clone().to_string(), iso_country: self.iso_country.clone(), iso_region: self.iso_region.clone(), municipality: self.municipality.clone(), elevation_ft: self.elevation_ft, longitude: self.longitude, latitude: self.latitude, has_tower: self.has_tower, has_beacon: self.has_beacon, public: self.public, metar_observation_time: match self.latest_metar { Some(m) => Some(m.observation_time), None => None, }, } } } impl From for Airport { fn from(airport: AirportRow) -> Self { Self { icao: airport.icao.clone(), iata: airport.iata.clone(), local: airport.local.clone(), name: airport.name.clone(), category: match AirportCategory::from_str(&airport.category) { Ok(c) => c, Err(_) => { log::error!("Invalid Airport category: {}", airport.category); AirportCategory::Unknown } }, iso_country: airport.iso_country.clone(), iso_region: airport.iso_region.clone(), municipality: airport.municipality.clone(), elevation_ft: airport.elevation_ft, longitude: airport.longitude, latitude: airport.latitude, has_tower: airport.has_tower, has_beacon: airport.has_beacon, runways: vec![], frequencies: vec![], public: airport.public, latest_metar: None, } } } impl Airport { pub async fn select(client: &Client, icao: &str, metar: bool) -> Option { let pool = db::pool(); let airport_fut = async { sqlx::query_as(&format!("SELECT * FROM {} WHERE icao = $1", TABLE_NAME)) .bind(icao) .fetch_optional(pool) .await }; let metar_fut = async { if metar { match Metar::find_all_distinct(client, &vec![icao.to_string()]).await { Ok(m) => Some(m.into_iter().nth(0)), Err(err) => { log::error!("{}", err); None } } } else { None } }; let runways_fut = Runway::select_all(icao); let frequencies_fut = Frequency::select_all(icao); let (airport_result, runways_result, frequencies_result, metar_result) = tokio::join!(airport_fut, runways_fut, frequencies_fut, metar_fut); let airport_row: Option = match airport_result { Ok(opt) => opt, Err(err) => { log::error!("Unable to find airport '{}': {}", icao, err); return None; } }; let runways: Vec = match runways_result { Ok(r) => r, Err(err) => { log::error!("Error retrieving runways for airport '{}': {}", icao, err); vec![] } }; let frequencies: Vec = match frequencies_result { Ok(f) => f, Err(err) => { log::error!( "Error retrieving frequencies for airport '{}': {}", icao, err ); vec![] } }; let metar: Option = match metar_result { Some(m_option) => match m_option { Some(m) => Some(m), None => None, }, None => None, }; airport_row.map(|row| { let mut airport: Airport = row.into(); airport.runways = runways; airport.frequencies = frequencies; airport.latest_metar = metar; airport }) } pub async fn select_all(client: &Client, query: &AirportQuery) -> ApiResult> { let pool = db::pool(); let mut builder = QueryBuilder::::new("SELECT * FROM "); builder.push(TABLE_NAME); let mut has_where = false; Self::push_condition_array(&mut builder, &mut has_where, "icao", &query.icaos); Self::push_condition_array(&mut builder, &mut has_where, "iata", &query.iatas); Self::push_condition_array( &mut builder, &mut has_where, "iso_country", &query.iso_countries, ); Self::push_condition_array( &mut builder, &mut has_where, "iso_region", &query.iso_regions, ); Self::push_condition_array( &mut builder, &mut has_where, "municipality", &query.municipalities, ); Self::push_condition_array(&mut builder, &mut has_where, "local", &query.locals); Self::push_condition_array(&mut builder, &mut has_where, "category", &query.categories); Self::push_condition_like(&mut builder, &mut has_where, "name", &query.name); Self::push_condition_bounds(&mut builder, &mut has_where, &query.bounds)?; builder.push(" ORDER BY (metar_observation_time IS NULL) ASC, "); builder.push(" CASE category "); builder.push(" WHEN 'large_airport' THEN 1 "); builder.push(" WHEN 'medium_airport' THEN 2 "); builder.push(" WHEN 'small_airport' THEN 3 "); builder.push(" WHEN 'seaplane_base' THEN 4 "); builder.push(" WHEN 'heliport' THEN 5 "); builder.push(" WHEN 'balloon_port' THEN 6 "); builder.push(" WHEN 'unknown' THEN 7 "); builder.push(" ELSE 8 END"); // Apply pagination. if let Some(limit) = query.limit { builder.push(" LIMIT ").push_bind(limit as i64); let offset = if let Some(page) = query.page { (page.saturating_sub(1) * limit) as i64 } else { 0 }; builder.push(" OFFSET ").push_bind(offset); } let airport_query = builder.build_query_as::(); let airport_rows: Vec = airport_query.fetch_all(pool).await?; let mut airports: Vec = airport_rows.into_iter().map(From::from).collect(); if airports.is_empty() { return Ok(airports); } // Bulk update airport sub-fields let icaos: Vec = airports.iter().map(|a| a.icao.clone()).collect(); let runway_future = Runway::select_all_map(icaos.clone()); let frequency_future = Frequency::select_all_map(icaos.clone()); let metar_future = if query.metars.unwrap_or(false) { Some(Metar::find_all_distinct(client, &icaos)) } else { None }; let (runway_map, frequency_map, mut metars_opt) = match metar_future { Some(future_metars) => { let (runway_map, frequency_map, metars) = try_join!(runway_future, frequency_future, future_metars)?; ( runway_map, frequency_map, Some( metars .into_iter() .map(|m| (m.icao.clone(), m)) .collect::>(), ), ) } None => { let (runway_map, frequency_map) = try_join!(runway_future, frequency_future)?; (runway_map, frequency_map, None) } }; for airport in airports.iter_mut() { airport.runways = runway_map.get(&airport.icao).cloned().unwrap_or_default(); airport.frequencies = frequency_map .get(&airport.icao) .cloned() .unwrap_or_default(); if let Some(ref mut metar_map) = metars_opt { airport.latest_metar = metar_map.remove(&airport.icao); } } Ok(airports) } pub async fn count(query: &AirportQuery) -> i64 { let pool = db::pool(); let mut builder = QueryBuilder::::new("SELECT COUNT(*) FROM "); builder.push(TABLE_NAME); let mut has_where = false; Self::push_condition_array(&mut builder, &mut has_where, "icao", &query.icaos); Self::push_condition_array(&mut builder, &mut has_where, "iata", &query.iatas); Self::push_condition_array( &mut builder, &mut has_where, "iso_country", &query.iso_countries, ); Self::push_condition_array( &mut builder, &mut has_where, "iso_region", &query.iso_regions, ); Self::push_condition_array( &mut builder, &mut has_where, "municipality", &query.municipalities, ); Self::push_condition_array(&mut builder, &mut has_where, "local", &query.locals); Self::push_condition_array(&mut builder, &mut has_where, "category", &query.categories); Self::push_condition_like(&mut builder, &mut has_where, "name", &query.name); if let Err(err) = Self::push_condition_bounds(&mut builder, &mut has_where, &query.bounds) { log::error!("Error parsing bounds string: {}", err); return 0; } let sql_query = builder.build_query_scalar(); sql_query.fetch_one(pool).await.unwrap_or_else(|_| 0) } pub async fn insert(&self) -> ApiResult { let pool = db::pool(); let mut all_runway_rows: Vec = Vec::new(); let mut all_frequency_rows: Vec = Vec::new(); for runway in &self.runways { all_runway_rows.push(Runway::into(runway, &self.icao)); } for frequency in &self.frequencies { all_frequency_rows.push(Frequency::into(frequency, &self.icao)); } Runway::insert_all(&all_runway_rows).await?; Frequency::insert_all(&all_frequency_rows).await?; let airport: AirportRow = sqlx::query_as(&format!( r#" INSERT INTO {} ( icao, iata, local, name, category, iso_country, iso_region, municipality, elevation_ft, longitude, latitude, has_tower, has_beacon, public ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) RETURNING * "#, TABLE_NAME, )) .bind(self.icao.to_string()) .bind(&self.iata) .bind(&self.local) .bind(self.name.to_string()) .bind(self.category.to_string()) .bind(self.iso_country.to_string()) .bind(self.iso_region.to_string()) .bind(self.municipality.to_string()) .bind(self.elevation_ft) .bind(self.longitude) .bind(self.latitude) .bind(self.has_tower) .bind(self.has_beacon) .bind(self.public) .fetch_one(pool) .await?; Ok(airport.into()) } pub async fn insert_all(airports: Vec) -> ApiResult<()> { let pool = db::pool(); let chunk_size = 1000; let mut all_runway_rows: Vec = Vec::new(); let mut all_frequency_rows: Vec = Vec::new(); let airport_rows: Vec = airports .into_iter() .map(|airport| { for runway in &airport.runways { all_runway_rows.push(Runway::into(runway, &airport.icao)); } for frequency in &airport.frequencies { all_frequency_rows.push(Frequency::into(frequency, &airport.icao)); } airport.into() }) .collect(); for chunk in airport_rows.chunks(chunk_size) { let mut query_builder: QueryBuilder = QueryBuilder::new( "INSERT INTO airports (icao, iata, local, name, category, \ iso_country, iso_region, municipality, elevation_ft, \ longitude, latitude, has_tower, has_beacon, public) ", ); query_builder.push_values(chunk, |mut b, row| { b.push_bind(&row.icao) .push_bind(&row.iata) .push_bind(&row.local) .push_bind(&row.name) .push_bind(&row.category) .push_bind(&row.iso_country) .push_bind(&row.iso_region) .push_bind(&row.municipality) .push_bind(row.elevation_ft) .push_bind(row.longitude) .push_bind(row.latitude) .push_bind(row.has_tower) .push_bind(row.has_beacon) .push_bind(row.public); }); let query = query_builder.build(); query.execute(pool).await?; } Runway::insert_all(&all_runway_rows).await?; Frequency::insert_all(&all_frequency_rows).await?; Ok(()) } // TODO pub async fn update(icao: &str, airport: &UpdateAirport) -> ApiResult<()> { let pool = db::pool(); let mut query_builder: QueryBuilder = QueryBuilder::new(format!("UPDATE {} SET ", TABLE_NAME)); if let Some(latest_metar_observation) = airport.latest_metar_observation { query_builder.push("metar_observation_time = "); query_builder.push_bind(latest_metar_observation); } query_builder.push(" WHERE icao = ").push_bind(icao); let query = query_builder.build(); query.execute(pool).await?; Ok(()) } pub async fn delete(icao: &str) -> ApiResult<()> { let pool = db::pool(); sqlx::query(&format!( r#" DELETE FROM {} WHERE icao = $1 "#, TABLE_NAME )) .bind(icao.to_string()) .execute(pool) .await?; Ok(()) } pub async fn delete_all() -> ApiResult<()> { let pool = db::pool(); sqlx::query(&format!( r#" DELETE FROM {} WHERE true "#, TABLE_NAME )) .execute(pool) .await?; Ok(()) } fn push_condition_array<'a>( builder: &mut QueryBuilder<'a, Postgres>, has_where: &mut bool, column: &str, field: &'a Option, ) { if let Some(value_str) = field { // Split on commas, trim whitespace, and drop empties. let values: Vec<&str> = value_str .split(',') .map(str::trim) .filter(|s| !s.is_empty()) .collect(); if !values.is_empty() { if !*has_where { builder.push(" WHERE "); *has_where = true; } else { builder.push(" AND "); } builder.push(column); builder.push(" = ANY("); builder.push_bind(values); builder.push(")"); } } } fn push_condition_like<'a>( builder: &mut QueryBuilder<'a, Postgres>, has_where: &mut bool, column: &str, field: &'a Option, ) { // Query column like if let Some(value) = field { if !*has_where { builder.push(" WHERE "); *has_where = true; } else { builder.push(" AND "); } // Using ILIKE with wildcards for partial matching builder .push(column) .push(" ILIKE ") .push_bind(format!("%{}%", value)); } } fn push_condition_bounds<'a>( builder: &mut QueryBuilder<'a, Postgres>, has_where: &mut bool, field: &'a Option, ) -> ApiResult<()> { // Query bounds if let Some(bounds_string) = field { if !*has_where { builder.push(" WHERE "); *has_where = true; } else { builder.push(" AND "); } let bounds = Bounds::parse(bounds_string)?; builder .push("(") .push("latitude BETWEEN ") .push_bind(bounds.south_west_lat) .push(" AND ") .push_bind(bounds.north_east_lat) .push(" AND ") .push("longitude BETWEEN ") .push_bind(bounds.south_west_lon) .push(" AND ") .push_bind(bounds.north_east_lon) .push(")"); } Ok(()) } }