Parsing, querying, inserting airports

This commit is contained in:
2025-04-09 20:38:01 -04:00
parent 240ed741f9
commit 4aa3190783
12 changed files with 453 additions and 104221 deletions

View File

@@ -1,11 +1,13 @@
use std::collections::HashMap;
use std::str::FromStr;
use actix_web::web::Json;
use serde::{Deserialize, Serialize};
use sqlx::{Execute, Postgres, QueryBuilder};
use crate::airports::model::airport_category::AirportCategory;
use crate::airports::{Frequency, Runway, UpdateFrequency, UpdateRunway};
use crate::airports::{Frequency, FrequencyRow, Runway, RunwayRow, UpdateFrequency, UpdateRunway};
use crate::db;
use crate::error::ApiResult;
use crate::error::{ApiResult, Error};
use crate::metars::Metar;
const TABLE_NAME: &str = "airports";
@@ -31,6 +33,8 @@ pub struct Airport {
pub runways: Vec<Runway>,
pub frequencies: Vec<Frequency>,
pub public: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub latest_metar: Option<Metar>,
}
#[derive(Debug, Deserialize)]
@@ -45,6 +49,7 @@ pub struct AirportQuery {
pub iso_countries: Option<String>,
pub iso_regions: Option<String>,
pub municipalities: Option<String>,
pub metars: Option<bool>,
}
impl Default for AirportQuery {
@@ -60,6 +65,7 @@ impl Default for AirportQuery {
iso_countries: None,
iso_regions: None,
municipalities: None,
metars: None,
}
}
}
@@ -125,7 +131,7 @@ impl Into<AirportRow> for Airport {
impl From<AirportRow> for Airport {
fn from(airport: AirportRow) -> Self {
Airport {
Self {
icao: airport.icao.clone(),
iata: airport.iata.clone(),
local: airport.local.clone(),
@@ -148,6 +154,7 @@ impl From<AirportRow> for Airport {
runways: vec![],
frequencies: vec![],
public: airport.public,
latest_metar: None,
}
}
}
@@ -156,24 +163,53 @@ impl Airport {
pub async fn select(icao: &str) -> Option<Self> {
let pool = db::pool();
let airport: Option<AirportRow> = sqlx::query_as(&format!(
r#"
SELECT * FROM {} WHERE icao = $1
"#,
TABLE_NAME
))
.bind(icao)
.fetch_optional(pool)
.await
.unwrap_or_else(|err| {
log::error!("Unable to find airport '{}'", icao);
None
});
let airport_fut = async {
sqlx::query_as(&format!("SELECT * FROM {} WHERE icao = $1", TABLE_NAME))
.bind(icao)
.fetch_optional(pool)
.await
};
match airport {
Some(a) => Some(a.into()),
None => None,
}
let runways_fut = Runway::select_all(icao);
let frequencies_fut = Frequency::select_all(icao);
let (airport_result, runways_result, frequencies_result) =
tokio::join!(airport_fut, runways_fut, frequencies_fut);
let airport_row: Option<AirportRow> = match airport_result {
Ok(opt) => opt,
Err(err) => {
log::error!("Unable to find airport '{}': {}", icao, err);
return None;
}
};
let runways: Vec<Runway> = match runways_result {
Ok(r) => r,
Err(err) => {
log::error!("Error retrieving runways for airport '{}': {}", icao, err);
vec![]
}
};
let frequencies: Vec<Frequency> = match frequencies_result {
Ok(f) => f,
Err(err) => {
log::error!(
"Error retrieving frequencies for airport '{}': {}",
icao,
err
);
vec![]
}
};
airport_row.map(|row| {
let mut airport: Airport = row.into();
airport.runways = runways;
airport.frequencies = frequencies;
airport
})
}
pub async fn select_all(query: &AirportQuery) -> ApiResult<Vec<Self>> {
@@ -183,31 +219,34 @@ impl Airport {
builder.push(TABLE_NAME);
let mut has_where = false;
macro_rules! push_condition {
($field:expr, $value:expr) => {
if let Some(ref val) = $value {
if !has_where {
builder.push(" WHERE ");
has_where = true;
} else {
builder.push(" AND ");
}
builder.push($field).push(" = ").push_bind(val);
}
};
}
// push_condition!("icao", query.icaos);
// push_condition!("iata", query.iata);
// push_condition!("iso_country", query.iso_country);
// push_condition!("iso_region", query.iso_region);
// push_condition!("municipality", query.municipality);
Self::push_condition_array(&mut builder, &mut has_where, "icao", &query.icaos);
Self::push_condition_array(&mut builder, &mut has_where, "iata", &query.iatas);
Self::push_condition_array(
&mut builder,
&mut has_where,
"iso_country",
&query.iso_countries,
);
Self::push_condition_array(
&mut builder,
&mut has_where,
"iso_region",
&query.iso_regions,
);
Self::push_condition_array(
&mut builder,
&mut has_where,
"municipality",
&query.municipalities,
);
Self::push_condition_array(&mut builder, &mut has_where, "local", &query.locals);
Self::push_condition_array(&mut builder, &mut has_where, "name", &query.names);
Self::push_condition_array(&mut builder, &mut has_where, "category", &query.categories);
// Apply pagination.
if let Some(limit) = query.limit {
builder.push(" LIMIT ").push_bind(limit as i64);
let offset = if let Some(page) = query.page {
// Calculate offset (page is 1-based).
(page.saturating_sub(1) * limit) as i64
} else {
0
@@ -215,9 +254,22 @@ impl Airport {
builder.push(" OFFSET ").push_bind(offset);
}
let query = builder.build_query_as();
let airport_rows: Vec<AirportRow> = query.fetch_all(pool).await?;
Ok(airport_rows.into_iter().map(From::from).collect())
let airport_query = builder.build_query_as::<AirportRow>();
let airport_rows: Vec<AirportRow> = airport_query.fetch_all(pool).await?;
let mut airports: Vec<Airport> = airport_rows.into_iter().map(From::from).collect();
// Bulk update airports with runways and frequencies
if !airports.is_empty() {
let icaos: Vec<String> = airports.iter().map(|a| a.icao.clone()).collect();
let mut runway_map = Runway::select_all_map(icaos.clone()).await?;
let mut frequency_map = Frequency::select_all_map(icaos).await?;
for airport in airports.iter_mut() {
airport.runways = runway_map.remove(&airport.icao).unwrap_or_default();
airport.frequencies = frequency_map.remove(&airport.icao).unwrap_or_default();
}
}
Ok(airports)
}
pub async fn count(query: &AirportQuery) -> i64 {
@@ -227,49 +279,48 @@ impl Airport {
builder.push(TABLE_NAME);
let mut has_where = false;
macro_rules! push_condition_array {
($column:expr, $field:expr) => {
if let Some(ref value_str) = $field {
// split on commas, trim whitespace, and drop empties
let values: Vec<&str> = value_str
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();
if !values.is_empty() {
if !has_where {
builder.push(" WHERE ");
has_where = true;
} else {
builder.push(" AND ");
}
dbg!(&values);
builder.push($column);
builder.push(" = ANY(");
builder.push_bind(values);
builder.push(")");
}
}
};
}
push_condition_array!("icao", query.icaos);
push_condition_array!("iata", query.iatas);
push_condition_array!("iso_country", query.iso_countries);
push_condition_array!("iso_region", query.iso_regions);
push_condition_array!("municipality", query.municipalities);
push_condition_array!("local", query.locals);
push_condition_array!("name", query.names);
push_condition_array!("category", query.categories);
Self::push_condition_array(&mut builder, &mut has_where, "icao", &query.icaos);
Self::push_condition_array(&mut builder, &mut has_where, "iata", &query.iatas);
Self::push_condition_array(
&mut builder,
&mut has_where,
"iso_country",
&query.iso_countries,
);
Self::push_condition_array(
&mut builder,
&mut has_where,
"iso_region",
&query.iso_regions,
);
Self::push_condition_array(
&mut builder,
&mut has_where,
"municipality",
&query.municipalities,
);
Self::push_condition_array(&mut builder, &mut has_where, "local", &query.locals);
Self::push_condition_array(&mut builder, &mut has_where, "name", &query.names);
Self::push_condition_array(&mut builder, &mut has_where, "category", &query.categories);
let sql_query = builder.build_query_scalar();
dbg!(&sql_query.sql());
sql_query.fetch_one(pool).await.unwrap_or_else(|_| 0)
}
pub async fn insert(&self) -> ApiResult<Self> {
let pool = db::pool();
let mut all_runway_rows: Vec<RunwayRow> = Vec::new();
let mut all_frequency_rows: Vec<FrequencyRow> = Vec::new();
for runway in &self.runways {
all_runway_rows.push(Runway::into(runway, &self.icao));
}
for frequency in &self.frequencies {
all_frequency_rows.push(Frequency::into(frequency, &self.icao));
}
Runway::insert_all(&all_runway_rows).await?;
Frequency::insert_all(&all_frequency_rows).await?;
let airport: AirportRow = sqlx::query_as(&format!(
r#"
INSERT INTO {} (
@@ -306,12 +357,25 @@ impl Airport {
pub async fn insert_all(airports: Vec<Self>) -> ApiResult<()> {
let pool = db::pool();
let airport_rows: Vec<AirportRow> = airports.into_iter().map(Into::into).collect();
// Define the maximum size of a single insertion batch.
let chunk_size = 1000;
let mut all_runway_rows: Vec<RunwayRow> = Vec::new();
let mut all_frequency_rows: Vec<FrequencyRow> = Vec::new();
let airport_rows: Vec<AirportRow> = airports
.into_iter()
.map(|airport| {
for runway in &airport.runways {
all_runway_rows.push(Runway::into(runway, &airport.icao));
}
for frequency in &airport.frequencies {
all_frequency_rows.push(Frequency::into(frequency, &airport.icao));
}
airport.into()
})
.collect();
Runway::insert_all(&all_runway_rows).await?;
Frequency::insert_all(&all_frequency_rows).await?;
for chunk in airport_rows.chunks(chunk_size) {
// Build a dynamic query for batch insertion.
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO airports (icao, iata, local, name, category, \
iso_country, iso_region, municipality, elevation_ft, \
@@ -376,4 +440,32 @@ impl Airport {
Ok(())
}
fn push_condition_array<'a>(
builder: &mut QueryBuilder<'a, Postgres>,
has_where: &mut bool,
column: &str,
field: &'a Option<String>,
) {
if let Some(ref value_str) = field {
// Split on commas, trim whitespace, and drop empties.
let values: Vec<&str> = value_str
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();
if !values.is_empty() {
if !*has_where {
builder.push(" WHERE ");
*has_where = true;
} else {
builder.push(" AND ");
}
builder.push(column);
builder.push(" = ANY(");
builder.push_bind(values);
builder.push(")");
}
}
}
}

View File

@@ -1,15 +1,115 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use sqlx::{Postgres, QueryBuilder};
use uuid::Uuid;
use crate::db;
use crate::error::ApiResult;
const TABLE_NAME: &str = "frequencies";
#[derive(Debug, Serialize, Deserialize)]
pub struct Frequency {
pub id: String,
#[serde(rename = "id")]
pub frequency_id: String,
pub frequency_mhz: f32,
}
#[derive(Debug, Deserialize, sqlx::FromRow)]
pub struct FrequencyRow {
pub id: Uuid,
pub icao: String,
pub frequency_id: String,
pub frequency_mhz: f32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateFrequency {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
pub icao: Option<String>,
#[serde(rename = "id", skip_serializing_if = "Option::is_none")]
pub frequency_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub frequency_mhz: Option<f32>,
}
impl From<FrequencyRow> for Frequency {
fn from(frequency: FrequencyRow) -> Self {
Self {
frequency_id: frequency.frequency_id.clone(),
frequency_mhz: frequency.frequency_mhz,
}
}
}
impl Frequency {
pub fn into(frequency: &Frequency, icao: &str) -> FrequencyRow {
FrequencyRow {
id: Uuid::new_v4(),
icao: icao.to_string(),
frequency_id: frequency.frequency_id.clone(),
frequency_mhz: frequency.frequency_mhz.clone(),
}
}
pub async fn select_all_map(icaos: Vec<String>) -> ApiResult<HashMap<String, Vec<Self>>> {
let pool = db::pool();
let frequency_rows: Vec<FrequencyRow> = sqlx::query_as(&format!(
r#"SELECT * FROM {} WHERE icao = ANY($1)"#,
TABLE_NAME
))
.bind(&icaos)
.fetch_all(pool)
.await?;
let mut frequency_map: HashMap<String, Vec<Self>> = HashMap::new();
for frequency_row in frequency_rows {
let icao = frequency_row.icao.clone();
let frequency = frequency_row.into();
frequency_map
.entry(icao.to_string())
.or_default()
.push(frequency);
}
Ok(frequency_map)
}
pub async fn select_all(icao: &str) -> ApiResult<Vec<Self>> {
let pool = db::pool();
let frequency_row: Vec<FrequencyRow> = sqlx::query_as(&format!(
r#"
SELECT * FROM {} WHERE icao = $1
"#,
TABLE_NAME
))
.bind(icao)
.fetch_all(pool)
.await?;
Ok(frequency_row.into_iter().map(From::from).collect())
}
pub async fn insert_all(frequencies: &Vec<FrequencyRow>) -> ApiResult<()> {
let pool = db::pool();
let chunk_size = 1000;
for chunk in frequencies.chunks(chunk_size) {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(&format!(
"INSERT INTO {} (id, icao, frequency_id, frequency_mhz) ",
TABLE_NAME
));
query_builder.push_values(chunk, |mut b, row| {
b.push_bind(&row.id)
.push_bind(&row.icao)
.push_bind(&row.frequency_id)
.push_bind(&row.frequency_mhz);
});
let query = query_builder.build();
query.execute(pool).await?;
}
Ok(())
}
}

View File

@@ -1,8 +1,26 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use sqlx::{Postgres, QueryBuilder};
use uuid::Uuid;
use crate::db;
use crate::error::ApiResult;
const TABLE_NAME: &str = "runways";
#[derive(Debug, Serialize, Deserialize)]
pub struct Runway {
pub id: String,
#[serde(rename = "id")]
pub runway_id: String,
pub length_ft: f32,
pub width_ft: f32,
pub surface: String,
}
#[derive(Debug, Deserialize, sqlx::FromRow)]
pub struct RunwayRow {
pub id: Uuid,
pub icao: String,
pub runway_id: String,
pub length_ft: f32,
pub width_ft: f32,
pub surface: String,
@@ -11,7 +29,9 @@ pub struct Runway {
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateRunway {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
pub icao: Option<String>,
#[serde(rename = "id", skip_serializing_if = "Option::is_none")]
pub frequency_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub length_ft: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -19,3 +39,88 @@ pub struct UpdateRunway {
#[serde(skip_serializing_if = "Option::is_none")]
pub surface: Option<String>,
}
impl From<RunwayRow> for Runway {
fn from(runway: RunwayRow) -> Self {
Self {
runway_id: runway.runway_id.clone(),
length_ft: runway.length_ft.clone(),
width_ft: runway.width_ft.clone(),
surface: runway.surface.clone(),
}
}
}
impl Runway {
pub fn into(runway: &Runway, icao: &str) -> RunwayRow {
RunwayRow {
id: Uuid::new_v4(),
icao: icao.to_string(),
runway_id: runway.runway_id.clone(),
length_ft: runway.length_ft.clone(),
width_ft: runway.width_ft.clone(),
surface: runway.surface.clone(),
}
}
pub async fn select_all_map(icaos: Vec<String>) -> ApiResult<HashMap<String, Vec<Self>>> {
let pool = db::pool();
let runway_rows: Vec<RunwayRow> = sqlx::query_as(&format!(
r#"SELECT * FROM {} WHERE icao = ANY($1)"#,
TABLE_NAME
))
.bind(&icaos)
.fetch_all(pool)
.await?;
let mut runway_map: HashMap<String, Vec<Self>> = HashMap::new();
for runway_row in runway_rows {
let icao = runway_row.icao.clone();
let runway = runway_row.into();
runway_map.entry(icao.to_string()).or_default().push(runway);
}
Ok(runway_map)
}
pub async fn select_all(icao: &str) -> ApiResult<Vec<Self>> {
let pool = db::pool();
let runway_rows: Vec<RunwayRow> = sqlx::query_as(&format!(
r#"
SELECT * FROM {} WHERE icao = $1
"#,
TABLE_NAME
))
.bind(icao)
.fetch_all(pool)
.await?;
Ok(runway_rows.into_iter().map(From::from).collect())
}
pub async fn insert_all(runways: &Vec<RunwayRow>) -> ApiResult<()> {
let pool = db::pool();
let chunk_size = 1000;
for chunk in runways.chunks(chunk_size) {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(&format!(
"INSERT INTO {} (id, icao, runway_id, length_ft, width_ft, surface) ",
TABLE_NAME
));
query_builder.push_values(chunk, |mut b, row| {
b.push_bind(&row.id)
.push_bind(&row.icao)
.push_bind(&row.runway_id)
.push_bind(&row.length_ft)
.push_bind(&row.width_ft)
.push_bind(&row.surface);
});
let query = query_builder.build();
query.execute(pool).await?;
}
Ok(())
}
}