Refactor to break out scheduler
This commit is contained in:
24
crates/lib/Cargo.toml
Normal file
24
crates/lib/Cargo.toml
Normal file
@@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "lib"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
argon2 = "0.5.3"
|
||||
chrono = { version = "0.4.42", features = ["serde"] }
|
||||
log = "0.4.28"
|
||||
rand = "0.9.2"
|
||||
rand_chacha = "0.9.0"
|
||||
serde = { version = "1.0.226", features = ["derive"] }
|
||||
serde_json = "1.0.142"
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }
|
||||
utoipa = { version = "5.4.0", features = ["chrono", "uuid", "actix_extras"] }
|
||||
uuid = { version = "1.18.1", features = ["serde", "v4"] }
|
||||
futures-util = "0.3.31"
|
||||
flate2 = "1.1.2"
|
||||
reqwest = "0.12.23"
|
||||
regex = "1.11.2"
|
||||
redis = { version = "0.32.5", features = ["tokio-comp", "connection-manager", "r2d2", "json"] }
|
||||
governor = "0.10.1"
|
||||
tokio = { version = "1.47.1", features = ["macros", "rt", "time"] }
|
||||
rust-s3 = "0.37.0"
|
||||
84
crates/lib/migrations/20250513_initial.sql
Normal file
84
crates/lib/migrations/20250513_initial.sql
Normal file
@@ -0,0 +1,84 @@
|
||||
CREATE EXTENSION IF NOT EXISTS postgis;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS airports (
|
||||
icao TEXT PRIMARY KEY NOT NULL,
|
||||
iata TEXT,
|
||||
local TEXT,
|
||||
name TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
iso_country TEXT NOT NULL,
|
||||
iso_region TEXT NOT NULL,
|
||||
municipality TEXT NOT NULL,
|
||||
elevation_ft REAL NOT NULL,
|
||||
longitude REAL NOT NULL,
|
||||
latitude REAL NOT NULL,
|
||||
geometry GEOMETRY(POINT, 4326) NOT NULL,
|
||||
has_tower BOOLEAN DEFAULT false,
|
||||
has_beacon BOOLEAN DEFAULT false,
|
||||
public BOOLEAN DEFAULT false,
|
||||
metar_observation_time TIMESTAMPTZ
|
||||
);
|
||||
|
||||
CREATE INDEX ON airports (iata);
|
||||
CREATE INDEX ON airports (local);
|
||||
CREATE INDEX ON airports (name);
|
||||
CREATE INDEX ON airports (category);
|
||||
CREATE INDEX ON airports (iso_country);
|
||||
CREATE INDEX ON airports (iso_region);
|
||||
CREATE INDEX ON airports (municipality);
|
||||
CREATE INDEX ON airports USING GIST(geometry);
|
||||
CREATE INDEX ON airports (metar_observation_time);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS runways (
|
||||
id UUID PRIMARY KEY NOT NULL,
|
||||
icao TEXT NOT NULL REFERENCES airports(icao) ON DELETE CASCADE,
|
||||
runway_id TEXT NOT NULL,
|
||||
length_ft REAL NOT NULL,
|
||||
width_ft REAL NOT NULL,
|
||||
surface TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX ON runways (icao);
|
||||
CREATE INDEX ON runways (runway_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS communications (
|
||||
id UUID PRIMARY KEY NOT NULL,
|
||||
icao TEXT NOT NULL REFERENCES airports(icao) ON DELETE CASCADE,
|
||||
frequency_id TEXT NOT NULL,
|
||||
name TEXT,
|
||||
frequencies_mhz REAL[] NOT NULL,
|
||||
phone TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX ON communications (icao);
|
||||
CREATE INDEX ON communications (frequency_id);
|
||||
CREATE INDEX ON communications (name);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS metars (
|
||||
icao TEXT NOT NULL,
|
||||
observation_time TIMESTAMPTZ NOT NULL,
|
||||
raw_text TEXT NOT NULL,
|
||||
data JSONB NOT NULL,
|
||||
PRIMARY KEY(icao, observation_time)
|
||||
);
|
||||
|
||||
CREATE INDEX ON metars (observation_time DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
username TEXT PRIMARY KEY NOT NULL,
|
||||
email TEXT,
|
||||
email_verified BOOLEAN NOT NULL DEFAULT false,
|
||||
password_hash TEXT NOT NULL,
|
||||
role TEXT NOT NULL,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
avatar TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS user_airport_favorites (
|
||||
username TEXT NOT NULL REFERENCES users(username) ON DELETE CASCADE,
|
||||
icao TEXT NOT NULL REFERENCES airports(icao) ON DELETE CASCADE,
|
||||
PRIMARY KEY (username, icao)
|
||||
);
|
||||
9
crates/lib/src/accounts/mod.rs
Normal file
9
crates/lib/src/accounts/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
mod password_requirements;
|
||||
mod user;
|
||||
mod user_favorites;
|
||||
mod utils;
|
||||
|
||||
pub use password_requirements::*;
|
||||
pub use user::*;
|
||||
pub use user_favorites::*;
|
||||
pub use utils::*;
|
||||
24
crates/lib/src/accounts/password_requirements.rs
Normal file
24
crates/lib/src/accounts/password_requirements.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PasswordRequirements {
|
||||
pub max_length: Option<usize>,
|
||||
pub min_length: Option<usize>,
|
||||
pub lowercase_count: Option<usize>,
|
||||
pub uppercase_count: Option<usize>,
|
||||
pub numeric_count: Option<usize>,
|
||||
pub special_count: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for PasswordRequirements {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_length: Some(128),
|
||||
min_length: Some(6),
|
||||
lowercase_count: None,
|
||||
uppercase_count: None,
|
||||
numeric_count: None,
|
||||
special_count: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
274
crates/lib/src/accounts/user.rs
Normal file
274
crates/lib/src/accounts/user.rs
Normal file
@@ -0,0 +1,274 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
#[allow(unused_imports)] // Import is used in schema examples
|
||||
use serde_json::json;
|
||||
use sqlx::{Pool, Postgres, QueryBuilder};
|
||||
use utoipa::ToSchema;
|
||||
use crate::accounts::hash;
|
||||
use crate::error::CoreResult;
|
||||
|
||||
pub const ADMIN_ROLE: &str = "ADMIN";
|
||||
pub const USER_ROLE: &str = "USER";
|
||||
const TABLE_NAME: &str = "users";
|
||||
|
||||
#[derive(Debug, Deserialize, ToSchema)]
|
||||
#[schema(
|
||||
example = json!(
|
||||
{
|
||||
"email": "user",
|
||||
"email": "user@example.com",
|
||||
"password": "changeme",
|
||||
"firstName": "firstname",
|
||||
"lastName": "lastname"
|
||||
}
|
||||
)
|
||||
)]
|
||||
pub struct RegisterRequest {
|
||||
pub username: String,
|
||||
pub email: Option<String>,
|
||||
pub password: String,
|
||||
#[serde(rename = "firstName")]
|
||||
pub first_name: String,
|
||||
#[serde(rename = "lastName")]
|
||||
pub last_name: String,
|
||||
}
|
||||
|
||||
impl RegisterRequest {
|
||||
pub fn to_user(self) -> CoreResult<User> {
|
||||
let password_hash = hash(&self.password)?;
|
||||
Ok(User {
|
||||
username: self.username,
|
||||
email: match self.email {
|
||||
Some(email) => Some(email.to_lowercase()),
|
||||
None => None,
|
||||
},
|
||||
email_verified: false,
|
||||
password_hash,
|
||||
role: USER_ROLE.to_string(),
|
||||
first_name: self.first_name,
|
||||
last_name: self.last_name,
|
||||
avatar: None,
|
||||
updated_at: Utc::now(),
|
||||
created_at: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, ToSchema)]
|
||||
#[schema(
|
||||
example = json!(
|
||||
{
|
||||
"username": "admin",
|
||||
"password": "changeme"
|
||||
}
|
||||
)
|
||||
)]
|
||||
pub struct LoginRequest {
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, ToSchema)]
|
||||
pub struct UserResponse {
|
||||
pub username: String,
|
||||
pub role: String,
|
||||
#[serde(rename = "firstName")]
|
||||
pub first_name: String,
|
||||
#[serde(rename = "lastName")]
|
||||
pub last_name: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub avatar: Option<String>,
|
||||
#[serde(rename = "emailVerified")]
|
||||
pub email_verified: bool,
|
||||
}
|
||||
|
||||
impl From<User> for UserResponse {
|
||||
fn from(user: User) -> Self {
|
||||
UserResponse {
|
||||
username: user.username,
|
||||
email_verified: user.email_verified,
|
||||
role: user.role,
|
||||
first_name: user.first_name,
|
||||
last_name: user.last_name,
|
||||
avatar: user.avatar,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, sqlx::FromRow, ToSchema)]
|
||||
pub struct UpdateUser {
|
||||
pub email: Option<String>,
|
||||
pub email_verified: Option<bool>,
|
||||
pub password: Option<String>,
|
||||
pub role: Option<String>,
|
||||
pub first_name: Option<String>,
|
||||
pub last_name: Option<String>,
|
||||
pub avatar: Option<String>,
|
||||
}
|
||||
|
||||
impl UpdateUser {
|
||||
pub async fn update(&self, pool: &Pool<Postgres>, username: &str) -> CoreResult<User> {
|
||||
let mut query_builder: QueryBuilder<Postgres> =
|
||||
QueryBuilder::new(&format!("UPDATE {} SET ", TABLE_NAME));
|
||||
|
||||
let mut first_clause = true;
|
||||
|
||||
let mut push_comma = |query_builder: &mut QueryBuilder<Postgres>| {
|
||||
if !first_clause {
|
||||
query_builder.push(", ");
|
||||
} else {
|
||||
first_clause = false;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ref email) = self.email {
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("email = ");
|
||||
query_builder.push_bind(email);
|
||||
}
|
||||
if let Some(ref email_verified) = self.email_verified {
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("email_verified = ");
|
||||
query_builder.push_bind(email_verified);
|
||||
}
|
||||
if let Some(ref password) = self.password {
|
||||
push_comma(&mut query_builder);
|
||||
let password_hash = hash(password)?;
|
||||
query_builder.push("password_hash = ");
|
||||
query_builder.push_bind(password_hash);
|
||||
}
|
||||
if let Some(ref role) = self.role {
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("role = ");
|
||||
query_builder.push_bind(role);
|
||||
}
|
||||
if let Some(ref first_name) = self.first_name {
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("first_name = ");
|
||||
query_builder.push_bind(first_name);
|
||||
}
|
||||
if let Some(ref last_name) = self.last_name {
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("last_name = ");
|
||||
query_builder.push_bind(last_name);
|
||||
}
|
||||
if let Some(ref avatar) = self.avatar {
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("avatar = ");
|
||||
query_builder.push_bind(avatar);
|
||||
}
|
||||
push_comma(&mut query_builder);
|
||||
query_builder.push("updated_at = ");
|
||||
query_builder.push_bind(Utc::now());
|
||||
|
||||
query_builder.push(" WHERE username = ");
|
||||
query_builder.push_bind(username);
|
||||
query_builder.push(" RETURNING *");
|
||||
|
||||
let query = query_builder.build_query_as::<User>();
|
||||
let user = query.fetch_one(pool).await?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
|
||||
pub struct User {
|
||||
pub username: String,
|
||||
pub email: Option<String>,
|
||||
pub email_verified: bool,
|
||||
pub password_hash: String,
|
||||
pub role: String,
|
||||
pub first_name: String,
|
||||
pub last_name: String,
|
||||
pub avatar: Option<String>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl User {
|
||||
pub async fn select(pool: &Pool<Postgres>, username: &str) -> Option<Self> {
|
||||
let user: Option<Self> = sqlx::query_as::<_, Self>(&format!(
|
||||
r#"
|
||||
SELECT * FROM {} WHERE username = $1
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(username)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
log::error!("Unable to find user '{}': {}", username, err);
|
||||
None
|
||||
});
|
||||
|
||||
user
|
||||
}
|
||||
|
||||
pub async fn select_by_email(pool: &Pool<Postgres>, email: &str) -> Option<Self> {
|
||||
let user: Option<Self> = sqlx::query_as::<_, Self>(&format!(
|
||||
r#"
|
||||
SELECT * FROM {} WHERE email = $1
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(email.to_lowercase())
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
log::error!("Unable to find user by email '{}': {}", email, err);
|
||||
None
|
||||
});
|
||||
|
||||
user
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn count(pool: &Pool<Postgres>) -> i64 {
|
||||
sqlx::query_scalar(&format!(
|
||||
r#"
|
||||
SELECT COUNT(*) FROM {}
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap_or_else(|_| 0)
|
||||
}
|
||||
|
||||
pub async fn insert(&self, pool: &Pool<Postgres>) -> CoreResult<User> {
|
||||
let user: User = sqlx::query_as::<_, Self>(&format!(
|
||||
r#"
|
||||
INSERT INTO {} (
|
||||
username,
|
||||
email,
|
||||
email_verified,
|
||||
password_hash,
|
||||
role,
|
||||
first_name,
|
||||
last_name,
|
||||
avatar,
|
||||
created_at,
|
||||
updated_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
RETURNING *
|
||||
"#,
|
||||
TABLE_NAME,
|
||||
))
|
||||
.bind(&self.username)
|
||||
.bind(&self.email)
|
||||
.bind(&self.email_verified)
|
||||
.bind(&self.password_hash)
|
||||
.bind(&self.role)
|
||||
.bind(&self.first_name)
|
||||
.bind(&self.last_name)
|
||||
.bind(&self.avatar)
|
||||
.bind(self.created_at)
|
||||
.bind(self.updated_at)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
}
|
||||
61
crates/lib/src/accounts/user_favorites.rs
Normal file
61
crates/lib/src/accounts/user_favorites.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use crate::error::CoreResult;
|
||||
use serde::Deserialize;
|
||||
use sqlx::{Pool, Postgres};
|
||||
|
||||
const TABLE_NAME: &str = "user_airport_favorites";
|
||||
|
||||
#[derive(Debug, Deserialize, sqlx::FromRow)]
|
||||
pub struct UserFavorite {
|
||||
pub username: String,
|
||||
pub icao: String,
|
||||
}
|
||||
|
||||
impl UserFavorite {
|
||||
pub async fn select_all(pool: &Pool<Postgres>, username: &str) -> CoreResult<Vec<String>> {
|
||||
let user_favorites: Vec<UserFavorite> = sqlx::query_as::<_, UserFavorite>(&format!(
|
||||
r#"
|
||||
SELECT * FROM {} WHERE username = $1
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(username)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let favorites = user_favorites.iter().map(|uf| uf.icao.clone()).collect();
|
||||
|
||||
Ok(favorites)
|
||||
}
|
||||
|
||||
pub async fn insert(pool: &Pool<Postgres>, username: &str, icao: &str) -> CoreResult<()> {
|
||||
sqlx::query(&format!(
|
||||
r#"
|
||||
INSERT INTO {} (
|
||||
username, icao
|
||||
) VALUES ($1, $2)
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(username)
|
||||
.bind(icao)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(pool: &Pool<Postgres>, username: &str, icao: &str) -> CoreResult<()> {
|
||||
sqlx::query(&format!(
|
||||
r#"
|
||||
DELETE FROM {} WHERE username = $1 AND icao = $2
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(username)
|
||||
.bind(icao)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
56
crates/lib/src/accounts/utils.rs
Normal file
56
crates/lib/src/accounts/utils.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier};
|
||||
use argon2::password_hash::rand_core::OsRng;
|
||||
use argon2::password_hash::SaltString;
|
||||
use rand::distr::Alphanumeric;
|
||||
use rand::Rng;
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
use rand_chacha::rand_core::SeedableRng;
|
||||
use crate::error::CoreResult;
|
||||
|
||||
pub fn csprng(take: usize) -> String {
|
||||
// Generate a CSPRNG 128-bit (16 byte) ID using alphanumeric characters (a-z, A-Z, 0-9)
|
||||
let rng = ChaCha20Rng::from_os_rng();
|
||||
rng
|
||||
.sample_iter(Alphanumeric)
|
||||
.take(take)
|
||||
.map(char::from)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn hash(string: &str) -> CoreResult<String> {
|
||||
let salt = SaltString::generate(&mut OsRng);
|
||||
let hash = Argon2::default()
|
||||
.hash_password(string.as_bytes(), &salt)?
|
||||
.to_string();
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
pub fn verify_hash(string: &str, hashed_string: &str) -> bool {
|
||||
let bytes = string.as_bytes();
|
||||
let parsed_hash = match PasswordHash::new(hashed_string) {
|
||||
Ok(h) => h,
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"Failed to construct PasswordHash from '{}': {}",
|
||||
hashed_string,
|
||||
err
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
Argon2::default()
|
||||
.verify_password(bytes, &parsed_hash)
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_hash() {
|
||||
let password = hash("password").unwrap();
|
||||
assert!(!verify_hash(&password, "bad_password"));
|
||||
assert!(verify_hash("password", &password));
|
||||
}
|
||||
}
|
||||
715
crates/lib/src/airports/airport.rs
Normal file
715
crates/lib/src/airports/airport.rs
Normal file
@@ -0,0 +1,715 @@
|
||||
use crate::airports::{
|
||||
AirportCategory, Communication, CommunicationRow, Runway, RunwayRow, UpdateCommunication,
|
||||
UpdateRunway,
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::try_join;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Pool, Postgres, QueryBuilder};
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use utoipa::{IntoParams, ToSchema};
|
||||
use crate::error::{CoreError, CoreErrorKind, CoreResult};
|
||||
use crate::metars::Metar;
|
||||
|
||||
const TABLE_NAME: &str = "airports";
|
||||
const DEFAULT_COLUMNS: &str = "icao, iata, local, name, category, iso_country, \
|
||||
iso_region, municipality, elevation_ft, longitude, latitude, has_tower, has_beacon,\
|
||||
public, metar_observation_time";
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||
pub struct Airport {
|
||||
pub icao: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub iata: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub local: Option<String>,
|
||||
pub name: String,
|
||||
pub category: AirportCategory,
|
||||
pub iso_country: String,
|
||||
pub iso_region: String,
|
||||
pub municipality: String,
|
||||
pub elevation_ft: f32,
|
||||
pub longitude: f32,
|
||||
pub latitude: f32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub has_tower: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub has_beacon: Option<bool>,
|
||||
pub runways: Vec<Runway>,
|
||||
pub communications: Vec<Communication>,
|
||||
pub public: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub latest_metar: Option<Metar>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, ToSchema, IntoParams)]
|
||||
#[into_params(parameter_in = Query)]
|
||||
pub struct AirportQuery {
|
||||
pub page: Option<u32>,
|
||||
pub limit: Option<u32>,
|
||||
pub icaos: Option<String>,
|
||||
pub iatas: Option<String>,
|
||||
pub locals: Option<String>,
|
||||
pub name: Option<String>,
|
||||
pub categories: Option<String>,
|
||||
pub iso_countries: Option<String>,
|
||||
pub iso_regions: Option<String>,
|
||||
pub municipalities: Option<String>,
|
||||
pub bounds: Option<String>,
|
||||
pub metars: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for AirportQuery {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
page: Some(1),
|
||||
limit: Some(1000),
|
||||
icaos: None,
|
||||
iatas: None,
|
||||
locals: None,
|
||||
name: None,
|
||||
categories: None,
|
||||
iso_countries: None,
|
||||
iso_regions: None,
|
||||
municipalities: None,
|
||||
bounds: None,
|
||||
metars: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// impl AirportQuery {
|
||||
// pub fn builder() -> AirportQueryBuilder {
|
||||
// AirportQueryBuilder::new()
|
||||
// }
|
||||
// }
|
||||
|
||||
// pub struct AirportQueryBuilder {
|
||||
// inner: AirportQuery,
|
||||
// }
|
||||
//
|
||||
// impl AirportQueryBuilder {
|
||||
// /// start the builder
|
||||
// pub fn new() -> Self {
|
||||
// AirportQueryBuilder {
|
||||
// inner: AirportQuery::default(),
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pub fn page(mut self, page: u32) -> Self {
|
||||
// self.inner.page = Some(page);
|
||||
// self
|
||||
// }
|
||||
//
|
||||
// pub fn limit(mut self, limit: u32) -> Self {
|
||||
// self.inner.limit = Some(limit);
|
||||
// self
|
||||
// }
|
||||
//
|
||||
// pub fn icaos<T: Into<String>>(mut self, v: T) -> Self {
|
||||
// self.inner.icaos = Some(v.into());
|
||||
// self
|
||||
// }
|
||||
//
|
||||
// pub fn iatas<T: Into<String>>(mut self, v: T) -> Self {
|
||||
// self.inner.iatas = Some(v.into());
|
||||
// self
|
||||
// }
|
||||
//
|
||||
// pub fn metars(mut self, v: bool) -> Self {
|
||||
// self.inner.metars = Some(v);
|
||||
// self
|
||||
// }
|
||||
//
|
||||
// pub fn build(self) -> AirportQuery {
|
||||
// self.inner
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(Debug, Deserialize, ToSchema)]
|
||||
pub struct Bounds {
|
||||
pub north_east_lat: f32,
|
||||
pub north_east_lon: f32,
|
||||
pub south_west_lat: f32,
|
||||
pub south_west_lon: f32,
|
||||
}
|
||||
|
||||
impl Bounds {
|
||||
fn parse(input: &str) -> CoreResult<Bounds> {
|
||||
let parts: Vec<&str> = input.split(',').collect();
|
||||
if parts.len() != 4 {
|
||||
return Err(CoreError::new(
|
||||
CoreErrorKind::InvalidInput,
|
||||
format!("Expected 4 fields in bounds but received {}", parts.len()),
|
||||
));
|
||||
}
|
||||
let north_east_lat = parts[0].trim().parse::<f32>()?;
|
||||
let north_east_lon = parts[1].trim().parse::<f32>()?;
|
||||
let south_west_lat = parts[2].trim().parse::<f32>()?;
|
||||
let south_west_lon = parts[3].trim().parse::<f32>()?;
|
||||
|
||||
Ok(Bounds {
|
||||
north_east_lat,
|
||||
north_east_lon,
|
||||
south_west_lat,
|
||||
south_west_lon,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, sqlx::FromRow)]
|
||||
struct AirportRow {
|
||||
pub icao: String,
|
||||
pub iata: Option<String>,
|
||||
pub local: Option<String>,
|
||||
pub name: String,
|
||||
pub category: String,
|
||||
pub iso_country: String,
|
||||
pub iso_region: String,
|
||||
pub municipality: String,
|
||||
pub elevation_ft: f32,
|
||||
longitude: f32,
|
||||
latitude: f32,
|
||||
pub has_tower: Option<bool>,
|
||||
pub has_beacon: Option<bool>,
|
||||
pub public: bool,
|
||||
pub metar_observation_time: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, ToSchema)]
|
||||
pub struct UpdateAirport {
|
||||
pub icao: Option<String>,
|
||||
pub iata: Option<String>,
|
||||
pub local: Option<String>,
|
||||
pub name: Option<String>,
|
||||
pub category: Option<AirportCategory>,
|
||||
pub iso_country: Option<String>,
|
||||
pub iso_region: Option<String>,
|
||||
pub municipality: Option<String>,
|
||||
pub elevation_ft: Option<f32>,
|
||||
pub longitude: Option<f32>,
|
||||
pub latitude: Option<f32>,
|
||||
pub has_tower: Option<bool>,
|
||||
pub has_beacon: Option<bool>,
|
||||
pub runways: Option<Vec<UpdateRunway>>,
|
||||
pub communications: Option<Vec<UpdateCommunication>>,
|
||||
pub public: Option<bool>,
|
||||
pub latest_metar_observation: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl Into<AirportRow> for Airport {
|
||||
fn into(self) -> AirportRow {
|
||||
AirportRow {
|
||||
icao: self.icao.clone(),
|
||||
iata: self.iata.clone(),
|
||||
local: self.local.clone(),
|
||||
name: self.name.clone(),
|
||||
category: self.category.clone().to_string(),
|
||||
iso_country: self.iso_country.clone(),
|
||||
iso_region: self.iso_region.clone(),
|
||||
municipality: self.municipality.clone(),
|
||||
elevation_ft: self.elevation_ft,
|
||||
longitude: self.longitude,
|
||||
latitude: self.latitude,
|
||||
has_tower: self.has_tower,
|
||||
has_beacon: self.has_beacon,
|
||||
public: self.public,
|
||||
metar_observation_time: match self.latest_metar {
|
||||
Some(m) => Some(m.observation_time),
|
||||
None => None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AirportRow> for Airport {
|
||||
fn from(airport: AirportRow) -> Self {
|
||||
Self {
|
||||
icao: airport.icao.clone(),
|
||||
iata: airport.iata.clone(),
|
||||
local: airport.local.clone(),
|
||||
name: airport.name.clone(),
|
||||
category: match AirportCategory::from_str(&airport.category) {
|
||||
Ok(c) => c,
|
||||
Err(_) => {
|
||||
log::error!("Invalid Airport category: {}", airport.category);
|
||||
AirportCategory::Unknown
|
||||
}
|
||||
},
|
||||
iso_country: airport.iso_country.clone(),
|
||||
iso_region: airport.iso_region.clone(),
|
||||
municipality: airport.municipality.clone(),
|
||||
elevation_ft: airport.elevation_ft,
|
||||
longitude: airport.longitude,
|
||||
latitude: airport.latitude,
|
||||
has_tower: airport.has_tower,
|
||||
has_beacon: airport.has_beacon,
|
||||
runways: vec![],
|
||||
communications: vec![],
|
||||
public: airport.public,
|
||||
latest_metar: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Airport {
|
||||
pub async fn select(pool: &Pool<Postgres>, icao: &str, metar: bool) -> Option<Self> {
|
||||
|
||||
let airport_fut = async {
|
||||
sqlx::query_as(&format!(
|
||||
"SELECT {} FROM {} WHERE icao = $1",
|
||||
DEFAULT_COLUMNS, TABLE_NAME
|
||||
))
|
||||
.bind(icao.to_uppercase())
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
};
|
||||
|
||||
let metar_fut = async {
|
||||
if metar {
|
||||
match Metar::get_all_distinct(pool, &vec![icao.to_uppercase()]).await {
|
||||
Ok(m) => Some(m.into_iter().nth(0)),
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let runways_fut = Runway::select_all(pool, icao);
|
||||
let communications_fut = Communication::select_all(pool, icao);
|
||||
|
||||
let (airport_result, runways_result, communications_result, metar_result) =
|
||||
tokio::join!(airport_fut, runways_fut, communications_fut, metar_fut);
|
||||
|
||||
let airport_row: Option<AirportRow> = match airport_result {
|
||||
Ok(opt) => opt,
|
||||
Err(err) => {
|
||||
log::error!("Unable to find airport '{}': {}", icao, err);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let runways: Vec<Runway> = match runways_result {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
log::error!("Error retrieving runways for airport '{}': {}", icao, err);
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
|
||||
let communications: Vec<Communication> = match communications_result {
|
||||
Ok(f) => f,
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"Error retrieving communications for airport '{}': {}",
|
||||
icao,
|
||||
err
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
|
||||
let metar: Option<Metar> = match metar_result {
|
||||
Some(m_option) => match m_option {
|
||||
Some(m) => Some(m),
|
||||
None => None,
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
airport_row.map(|row| {
|
||||
let mut airport: Airport = row.into();
|
||||
airport.runways = runways;
|
||||
airport.communications = communications;
|
||||
airport.latest_metar = metar;
|
||||
airport
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn select_all(pool: &Pool<Postgres>, query: &AirportQuery) -> CoreResult<Vec<Self>> {
|
||||
let mut builder =
|
||||
QueryBuilder::<Postgres>::new(format!("SELECT {} FROM {}", DEFAULT_COLUMNS, TABLE_NAME));
|
||||
|
||||
let mut has_where = false;
|
||||
let icaos = match &query.icaos {
|
||||
Some(icaos) => Some(icaos.to_uppercase()),
|
||||
None => None,
|
||||
};
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "icao", &icaos);
|
||||
let iatas = match &query.iatas {
|
||||
Some(iatas) => Some(iatas.to_uppercase()),
|
||||
None => None,
|
||||
};
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "iata", &iatas);
|
||||
Self::push_condition_array(
|
||||
&mut builder,
|
||||
&mut has_where,
|
||||
"iso_country",
|
||||
&query.iso_countries,
|
||||
);
|
||||
Self::push_condition_array(
|
||||
&mut builder,
|
||||
&mut has_where,
|
||||
"iso_region",
|
||||
&query.iso_regions,
|
||||
);
|
||||
Self::push_condition_array(
|
||||
&mut builder,
|
||||
&mut has_where,
|
||||
"municipality",
|
||||
&query.municipalities,
|
||||
);
|
||||
let locals = match &query.locals {
|
||||
Some(locals) => Some(locals.to_uppercase()),
|
||||
None => None,
|
||||
};
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "local", &locals);
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "category", &query.categories);
|
||||
Self::push_condition_like(&mut builder, &mut has_where, "name", &query.name);
|
||||
Self::push_condition_bounds(&mut builder, &mut has_where, &query.bounds)?;
|
||||
|
||||
builder.push(" ORDER BY (metar_observation_time IS NULL) ASC, ");
|
||||
builder.push(" CASE category ");
|
||||
builder.push(" WHEN 'large_airport' THEN 1 ");
|
||||
builder.push(" WHEN 'medium_airport' THEN 2 ");
|
||||
builder.push(" WHEN 'small_airport' THEN 3 ");
|
||||
builder.push(" WHEN 'seaplane_base' THEN 4 ");
|
||||
builder.push(" WHEN 'heliport' THEN 5 ");
|
||||
builder.push(" WHEN 'balloon_port' THEN 6 ");
|
||||
builder.push(" WHEN 'unknown' THEN 7 ");
|
||||
builder.push(" ELSE 8 END");
|
||||
|
||||
// Apply pagination.
|
||||
if let Some(limit) = query.limit {
|
||||
builder.push(" LIMIT ").push_bind(limit as i64);
|
||||
let offset = if let Some(page) = query.page {
|
||||
(page.saturating_sub(1) * limit) as i64
|
||||
} else {
|
||||
0
|
||||
};
|
||||
builder.push(" OFFSET ").push_bind(offset);
|
||||
}
|
||||
|
||||
let airport_query = builder.build_query_as::<AirportRow>();
|
||||
let airport_rows: Vec<AirportRow> = airport_query.fetch_all(pool).await?;
|
||||
let mut airports: Vec<Airport> = airport_rows.into_iter().map(From::from).collect();
|
||||
|
||||
if airports.is_empty() {
|
||||
return Ok(airports);
|
||||
}
|
||||
|
||||
// Bulk update airport subfields
|
||||
let icaos: Vec<String> = airports.iter().map(|a| a.icao.to_uppercase()).collect();
|
||||
|
||||
let runway_future = Runway::select_all_map(pool, &icaos);
|
||||
let frequency_future = Communication::select_all_map(pool, &icaos);
|
||||
let metar_future = if query.metars.unwrap_or(false) {
|
||||
Some(Metar::get_all_distinct(pool, &icaos))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let (runway_map, frequency_map, mut metars_opt) = match metar_future {
|
||||
Some(future_metars) => {
|
||||
let (runway_map, frequency_map, metars) =
|
||||
try_join!(runway_future, frequency_future, future_metars)?;
|
||||
(
|
||||
runway_map,
|
||||
frequency_map,
|
||||
Some(
|
||||
metars
|
||||
.into_iter()
|
||||
.map(|m| (m.icao.clone(), m))
|
||||
.collect::<HashMap<_, _>>(),
|
||||
),
|
||||
)
|
||||
}
|
||||
None => {
|
||||
let (runway_map, frequency_map) = try_join!(runway_future, frequency_future)?;
|
||||
(runway_map, frequency_map, None)
|
||||
}
|
||||
};
|
||||
|
||||
for airport in airports.iter_mut() {
|
||||
airport.runways = runway_map.get(&airport.icao).cloned().unwrap_or_default();
|
||||
airport.communications = frequency_map
|
||||
.get(&airport.icao)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
if let Some(ref mut metar_map) = metars_opt {
|
||||
airport.latest_metar = metar_map.remove(&airport.icao);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(airports)
|
||||
}
|
||||
|
||||
pub async fn count(pool: &Pool<Postgres>, query: &AirportQuery) -> i64 {
|
||||
let mut builder = QueryBuilder::<Postgres>::new("SELECT COUNT(*) FROM ");
|
||||
builder.push(TABLE_NAME);
|
||||
|
||||
let mut has_where = false;
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "icao", &query.icaos);
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "iata", &query.iatas);
|
||||
Self::push_condition_array(
|
||||
&mut builder,
|
||||
&mut has_where,
|
||||
"iso_country",
|
||||
&query.iso_countries,
|
||||
);
|
||||
Self::push_condition_array(
|
||||
&mut builder,
|
||||
&mut has_where,
|
||||
"iso_region",
|
||||
&query.iso_regions,
|
||||
);
|
||||
Self::push_condition_array(
|
||||
&mut builder,
|
||||
&mut has_where,
|
||||
"municipality",
|
||||
&query.municipalities,
|
||||
);
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "local", &query.locals);
|
||||
Self::push_condition_array(&mut builder, &mut has_where, "category", &query.categories);
|
||||
Self::push_condition_like(&mut builder, &mut has_where, "name", &query.name);
|
||||
if let Err(err) = Self::push_condition_bounds(&mut builder, &mut has_where, &query.bounds) {
|
||||
log::error!("Error parsing bounds string: {}", err);
|
||||
return 0;
|
||||
}
|
||||
|
||||
let sql_query = builder.build_query_scalar();
|
||||
sql_query.fetch_one(pool).await.unwrap_or_else(|_| 0)
|
||||
}
|
||||
|
||||
pub async fn insert(&self, pool: &Pool<Postgres>) -> CoreResult<Self> {
|
||||
let mut all_runway_rows: Vec<RunwayRow> = Vec::new();
|
||||
let mut all_frequency_rows: Vec<CommunicationRow> = Vec::new();
|
||||
for runway in &self.runways {
|
||||
all_runway_rows.push(Runway::into(runway, &self.icao));
|
||||
}
|
||||
for frequency in &self.communications {
|
||||
all_frequency_rows.push(Communication::into(frequency, &self.icao));
|
||||
}
|
||||
Runway::insert_all(pool, &all_runway_rows).await?;
|
||||
Communication::insert_all(pool, &all_frequency_rows).await?;
|
||||
|
||||
let airport: AirportRow = sqlx::query_as(&format!(
|
||||
r#"
|
||||
INSERT INTO {} (
|
||||
icao, iata, local, name, category, iso_country, iso_region, municipality,
|
||||
elevation_ft, longitude, latitude, geometry, has_tower, has_beacon, public
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8,
|
||||
$9, $10, $11,
|
||||
ST_SetSRID(ST_MakePoint($10, $11), 4326),
|
||||
$12, $13, $14
|
||||
)
|
||||
RETURNING {}
|
||||
"#,
|
||||
TABLE_NAME, DEFAULT_COLUMNS
|
||||
))
|
||||
.bind(self.icao.to_string())
|
||||
.bind(&self.iata)
|
||||
.bind(&self.local)
|
||||
.bind(self.name.to_string())
|
||||
.bind(self.category.to_string())
|
||||
.bind(self.iso_country.to_string())
|
||||
.bind(self.iso_region.to_string())
|
||||
.bind(self.municipality.to_string())
|
||||
.bind(self.elevation_ft)
|
||||
.bind(self.longitude)
|
||||
.bind(self.latitude)
|
||||
.bind(self.has_tower)
|
||||
.bind(self.has_beacon)
|
||||
.bind(self.public)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(airport.into())
|
||||
}
|
||||
|
||||
pub async fn insert_all(pool: &Pool<Postgres>, airports: Vec<Self>) -> CoreResult<()> {
|
||||
let chunk_size = 1000;
|
||||
let mut all_runway_rows: Vec<RunwayRow> = Vec::new();
|
||||
let mut all_frequency_rows: Vec<CommunicationRow> = Vec::new();
|
||||
let airport_rows: Vec<AirportRow> = airports
|
||||
.into_iter()
|
||||
.map(|airport| {
|
||||
for runway in &airport.runways {
|
||||
all_runway_rows.push(Runway::into(runway, &airport.icao));
|
||||
}
|
||||
for frequency in &airport.communications {
|
||||
all_frequency_rows.push(Communication::into(frequency, &airport.icao));
|
||||
}
|
||||
airport.into()
|
||||
})
|
||||
.collect();
|
||||
|
||||
for chunk in airport_rows.chunks(chunk_size) {
|
||||
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(format!(
|
||||
"INSERT INTO {} (icao, iata, local, name, category, \
|
||||
iso_country, iso_region, municipality, elevation_ft, \
|
||||
longitude, latitude, geometry, has_tower, has_beacon, public) ",
|
||||
TABLE_NAME
|
||||
));
|
||||
query_builder.push_values(chunk, |mut b, row| {
|
||||
b.push_bind(&row.icao)
|
||||
.push_bind(&row.iata)
|
||||
.push_bind(&row.local)
|
||||
.push_bind(&row.name)
|
||||
.push_bind(&row.category)
|
||||
.push_bind(&row.iso_country)
|
||||
.push_bind(&row.iso_region)
|
||||
.push_bind(&row.municipality)
|
||||
.push_bind(row.elevation_ft)
|
||||
.push_bind(row.longitude)
|
||||
.push_bind(row.latitude)
|
||||
.push_unseparated(", ST_SetSRID(ST_MakePoint(")
|
||||
.push_bind_unseparated(row.longitude)
|
||||
.push_unseparated(", ")
|
||||
.push_bind_unseparated(row.latitude)
|
||||
.push_unseparated("), 4326)")
|
||||
.push_bind(row.has_tower)
|
||||
.push_bind(row.has_beacon)
|
||||
.push_bind(row.public);
|
||||
});
|
||||
|
||||
let query = query_builder.build();
|
||||
query.execute(pool).await?;
|
||||
}
|
||||
|
||||
Runway::insert_all(pool, &all_runway_rows).await?;
|
||||
Communication::insert_all(pool, &all_frequency_rows).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO
|
||||
pub async fn update(pool: &Pool<Postgres>, icao: &str, airport: &UpdateAirport) -> CoreResult<()> {
|
||||
let mut query_builder: QueryBuilder<Postgres> =
|
||||
QueryBuilder::new(format!("UPDATE {} SET ", TABLE_NAME));
|
||||
if let Some(latest_metar_observation) = airport.latest_metar_observation {
|
||||
query_builder.push("metar_observation_time = ");
|
||||
query_builder.push_bind(latest_metar_observation);
|
||||
}
|
||||
|
||||
query_builder.push(" WHERE icao = ").push_bind(icao);
|
||||
let query = query_builder.build();
|
||||
query.execute(pool).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(pool: &Pool<Postgres>, icao: &str) -> CoreResult<()> {
|
||||
sqlx::query(&format!(
|
||||
r#"
|
||||
DELETE FROM {} WHERE icao = $1
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(icao.to_string())
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_all(pool: &Pool<Postgres>) -> CoreResult<()> {
|
||||
sqlx::query(&format!(
|
||||
r#"
|
||||
DELETE FROM {} WHERE true
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_condition_array<'a>(
|
||||
builder: &mut QueryBuilder<'a, Postgres>,
|
||||
has_where: &mut bool,
|
||||
column: &str,
|
||||
field: &'a Option<String>,
|
||||
) {
|
||||
if let Some(value_str) = field {
|
||||
// Split on commas, trim whitespace, and drop empties.
|
||||
let values: Vec<&str> = value_str
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
if !values.is_empty() {
|
||||
if !*has_where {
|
||||
builder.push(" WHERE ");
|
||||
*has_where = true;
|
||||
} else {
|
||||
builder.push(" AND ");
|
||||
}
|
||||
builder.push(column);
|
||||
builder.push(" = ANY(");
|
||||
builder.push_bind(values);
|
||||
builder.push(")");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn push_condition_like<'a>(
|
||||
builder: &mut QueryBuilder<'a, Postgres>,
|
||||
has_where: &mut bool,
|
||||
column: &str,
|
||||
field: &'a Option<String>,
|
||||
) {
|
||||
// Query column like
|
||||
if let Some(value) = field {
|
||||
if !*has_where {
|
||||
builder.push(" WHERE ");
|
||||
*has_where = true;
|
||||
} else {
|
||||
builder.push(" AND ");
|
||||
}
|
||||
// Using ILIKE with wildcards for partial matching
|
||||
builder
|
||||
.push(column)
|
||||
.push(" ILIKE ")
|
||||
.push_bind(format!("%{}%", value));
|
||||
}
|
||||
}
|
||||
|
||||
fn push_condition_bounds<'a>(
|
||||
builder: &mut QueryBuilder<'a, Postgres>,
|
||||
has_where: &mut bool,
|
||||
field: &'a Option<String>,
|
||||
) -> CoreResult<()> {
|
||||
// Query bounds
|
||||
if let Some(bounds_string) = field {
|
||||
if !*has_where {
|
||||
builder.push(" WHERE ");
|
||||
*has_where = true;
|
||||
} else {
|
||||
builder.push(" AND ");
|
||||
}
|
||||
let bounds = Bounds::parse(bounds_string)?;
|
||||
builder
|
||||
.push("(")
|
||||
.push("geometry && ST_MakeEnvelope(")
|
||||
.push_bind(bounds.south_west_lon)
|
||||
.push(", ")
|
||||
.push_bind(bounds.south_west_lat)
|
||||
.push(", ")
|
||||
.push_bind(bounds.north_east_lon)
|
||||
.push(", ")
|
||||
.push_bind(bounds.north_east_lat)
|
||||
.push(", 4326)")
|
||||
.push(")");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
55
crates/lib/src/airports/airport_category.rs
Normal file
55
crates/lib/src/airports/airport_category.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use std::str::FromStr;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub enum AirportCategory {
|
||||
#[serde(rename = "small_airport")]
|
||||
Small,
|
||||
#[serde(rename = "medium_airport")]
|
||||
Medium,
|
||||
#[serde(rename = "large_airport")]
|
||||
Large,
|
||||
#[serde(rename = "heliport")]
|
||||
Heliport,
|
||||
#[serde(rename = "closed")]
|
||||
Closed,
|
||||
#[serde(rename = "seaplane_base")]
|
||||
Seaplane,
|
||||
#[serde(rename = "balloon_port")]
|
||||
BalloonPort,
|
||||
#[serde(rename = "unknown")]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl FromStr for AirportCategory {
|
||||
type Err = ();
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"small_airport" => Ok(AirportCategory::Small),
|
||||
"medium_airport" => Ok(AirportCategory::Medium),
|
||||
"large_airport" => Ok(AirportCategory::Large),
|
||||
"heliport" => Ok(AirportCategory::Heliport),
|
||||
"closed" => Ok(AirportCategory::Closed),
|
||||
"seaplane_base" => Ok(AirportCategory::Seaplane),
|
||||
"balloon_port" => Ok(AirportCategory::BalloonPort),
|
||||
_ => Ok(AirportCategory::Unknown),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for AirportCategory {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
AirportCategory::Small => write!(f, "small_airport"),
|
||||
AirportCategory::Medium => write!(f, "medium_airport"),
|
||||
AirportCategory::Large => write!(f, "large_airport"),
|
||||
AirportCategory::Heliport => write!(f, "heliport"),
|
||||
AirportCategory::Closed => write!(f, "closed"),
|
||||
AirportCategory::Seaplane => write!(f, "seaplane_base"),
|
||||
AirportCategory::BalloonPort => write!(f, "balloon_port"),
|
||||
AirportCategory::Unknown => write!(f, "unknown"),
|
||||
}
|
||||
}
|
||||
}
|
||||
125
crates/lib/src/airports/communication.rs
Normal file
125
crates/lib/src/airports/communication.rs
Normal file
@@ -0,0 +1,125 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Pool, Postgres, QueryBuilder};
|
||||
use std::collections::HashMap;
|
||||
use utoipa::ToSchema;
|
||||
use uuid::Uuid;
|
||||
use crate::error::CoreResult;
|
||||
|
||||
const TABLE_NAME: &str = "communications";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub struct Communication {
|
||||
pub id: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub name: Option<String>,
|
||||
pub frequencies_mhz: Vec<f32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub phone: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, sqlx::FromRow)]
|
||||
pub struct CommunicationRow {
|
||||
pub id: Uuid,
|
||||
pub icao: String,
|
||||
pub frequency_id: String,
|
||||
pub name: Option<String>,
|
||||
pub frequencies_mhz: Vec<f32>,
|
||||
pub phone: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||
pub struct UpdateCommunication {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub icao: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub name: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub frequencies_mhz: Option<Vec<f32>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub phone: Option<String>,
|
||||
}
|
||||
|
||||
impl From<CommunicationRow> for Communication {
|
||||
fn from(frequency: CommunicationRow) -> Self {
|
||||
Self {
|
||||
id: frequency.frequency_id.clone(),
|
||||
name: frequency.name.clone(),
|
||||
frequencies_mhz: frequency.frequencies_mhz,
|
||||
phone: frequency.phone.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Communication {
|
||||
pub fn into(frequency: &Communication, icao: &str) -> CommunicationRow {
|
||||
CommunicationRow {
|
||||
id: Uuid::new_v4(),
|
||||
icao: icao.to_string(),
|
||||
frequency_id: frequency.id.clone(),
|
||||
name: frequency.name.clone(),
|
||||
frequencies_mhz: frequency.frequencies_mhz.clone(),
|
||||
phone: frequency.phone.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn select_all_map(pool: &Pool<Postgres>, icaos: &Vec<String>) -> CoreResult<HashMap<String, Vec<Self>>> {
|
||||
let frequency_rows: Vec<CommunicationRow> = sqlx::query_as(&format!(
|
||||
r#"SELECT * FROM {} WHERE icao = ANY($1)"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(&icaos)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let mut frequency_map: HashMap<String, Vec<Self>> = HashMap::new();
|
||||
for frequency_row in frequency_rows {
|
||||
let icao = frequency_row.icao.clone();
|
||||
let frequency = frequency_row.into();
|
||||
frequency_map
|
||||
.entry(icao.to_string())
|
||||
.or_default()
|
||||
.push(frequency);
|
||||
}
|
||||
|
||||
Ok(frequency_map)
|
||||
}
|
||||
|
||||
pub async fn select_all(pool: &Pool<Postgres>, icao: &str) -> CoreResult<Vec<Self>> {
|
||||
let frequency_row: Vec<CommunicationRow> = sqlx::query_as(&format!(
|
||||
r#"
|
||||
SELECT * FROM {} WHERE icao = $1
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(icao)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(frequency_row.into_iter().map(From::from).collect())
|
||||
}
|
||||
|
||||
pub async fn insert_all(pool: &Pool<Postgres>, communications: &Vec<CommunicationRow>) -> CoreResult<()> {
|
||||
let chunk_size = 1000;
|
||||
|
||||
for chunk in communications.chunks(chunk_size) {
|
||||
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(&format!(
|
||||
"INSERT INTO {} (id, icao, frequency_id, name, frequencies_mhz, phone) ",
|
||||
TABLE_NAME
|
||||
));
|
||||
query_builder.push_values(chunk, |mut b, row| {
|
||||
b.push_bind(&row.id)
|
||||
.push_bind(&row.icao)
|
||||
.push_bind(&row.frequency_id)
|
||||
.push_bind(&row.name)
|
||||
.push_bind(&row.frequencies_mhz)
|
||||
.push_bind(&row.phone);
|
||||
});
|
||||
|
||||
let query = query_builder.build();
|
||||
query.execute(pool).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
9
crates/lib/src/airports/mod.rs
Normal file
9
crates/lib/src/airports/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
mod airport;
|
||||
mod airport_category;
|
||||
mod communication;
|
||||
mod runway;
|
||||
|
||||
pub use airport::*;
|
||||
pub use airport_category::*;
|
||||
pub use communication::*;
|
||||
pub use runway::*;
|
||||
121
crates/lib/src/airports/runway.rs
Normal file
121
crates/lib/src/airports/runway.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use crate::error::CoreResult;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Pool, Postgres, QueryBuilder};
|
||||
use std::collections::HashMap;
|
||||
use utoipa::ToSchema;
|
||||
use uuid::Uuid;
|
||||
|
||||
const TABLE_NAME: &str = "runways";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub struct Runway {
|
||||
#[serde(rename = "id")]
|
||||
pub runway_id: String,
|
||||
pub length_ft: f32,
|
||||
pub width_ft: f32,
|
||||
pub surface: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, sqlx::FromRow)]
|
||||
pub struct RunwayRow {
|
||||
pub id: Uuid,
|
||||
pub icao: String,
|
||||
pub runway_id: String,
|
||||
pub length_ft: f32,
|
||||
pub width_ft: f32,
|
||||
pub surface: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||
pub struct UpdateRunway {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub icao: Option<String>,
|
||||
#[serde(rename = "id", skip_serializing_if = "Option::is_none")]
|
||||
pub frequency_id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub length_ft: Option<f32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub width_ft: Option<f32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub surface: Option<String>,
|
||||
}
|
||||
|
||||
impl From<RunwayRow> for Runway {
|
||||
fn from(runway: RunwayRow) -> Self {
|
||||
Self {
|
||||
runway_id: runway.runway_id.clone(),
|
||||
length_ft: runway.length_ft.clone(),
|
||||
width_ft: runway.width_ft.clone(),
|
||||
surface: runway.surface.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Runway {
|
||||
pub fn into(runway: &Runway, icao: &str) -> RunwayRow {
|
||||
RunwayRow {
|
||||
id: Uuid::new_v4(),
|
||||
icao: icao.to_string(),
|
||||
runway_id: runway.runway_id.clone(),
|
||||
length_ft: runway.length_ft.clone(),
|
||||
width_ft: runway.width_ft.clone(),
|
||||
surface: runway.surface.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn select_all_map(pool: &Pool<Postgres>, icaos: &Vec<String>) -> CoreResult<HashMap<String, Vec<Self>>> {
|
||||
let runway_rows: Vec<RunwayRow> = sqlx::query_as(&format!(
|
||||
r#"SELECT * FROM {} WHERE icao = ANY($1)"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(&icaos)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let mut runway_map: HashMap<String, Vec<Self>> = HashMap::new();
|
||||
for runway_row in runway_rows {
|
||||
let icao = runway_row.icao.clone();
|
||||
let runway = runway_row.into();
|
||||
runway_map.entry(icao.to_string()).or_default().push(runway);
|
||||
}
|
||||
|
||||
Ok(runway_map)
|
||||
}
|
||||
|
||||
pub async fn select_all(pool: &Pool<Postgres>, icao: &str) -> CoreResult<Vec<Self>> {
|
||||
let runway_rows: Vec<RunwayRow> = sqlx::query_as(&format!(
|
||||
r#"
|
||||
SELECT * FROM {} WHERE icao = $1
|
||||
"#,
|
||||
TABLE_NAME
|
||||
))
|
||||
.bind(icao)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(runway_rows.into_iter().map(From::from).collect())
|
||||
}
|
||||
|
||||
pub async fn insert_all(pool: &Pool<Postgres>, runways: &Vec<RunwayRow>) -> CoreResult<()> {
|
||||
let chunk_size = 1000;
|
||||
|
||||
for chunk in runways.chunks(chunk_size) {
|
||||
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(&format!(
|
||||
"INSERT INTO {} (id, icao, runway_id, length_ft, width_ft, surface) ",
|
||||
TABLE_NAME
|
||||
));
|
||||
query_builder.push_values(chunk, |mut b, row| {
|
||||
b.push_bind(&row.id)
|
||||
.push_bind(&row.icao)
|
||||
.push_bind(&row.runway_id)
|
||||
.push_bind(&row.length_ft)
|
||||
.push_bind(&row.width_ft)
|
||||
.push_bind(&row.surface);
|
||||
});
|
||||
|
||||
let query = query_builder.build();
|
||||
query.execute(pool).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
220
crates/lib/src/error.rs
Normal file
220
crates/lib/src/error.rs
Normal file
@@ -0,0 +1,220 @@
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::sync::{MutexGuard, PoisonError};
|
||||
use regex::Regex;
|
||||
use serde::de::StdError;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[non_exhaustive]
|
||||
pub enum CoreErrorKind {
|
||||
NotFound,
|
||||
InvalidInput,
|
||||
Conflict,
|
||||
Unauthorized,
|
||||
Forbidden,
|
||||
PreconditionFailed,
|
||||
Timeout,
|
||||
Cancelled,
|
||||
Unavailable,
|
||||
Internal,
|
||||
External,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CoreError {
|
||||
pub kind: CoreErrorKind,
|
||||
pub message: String,
|
||||
pub context: Vec<(&'static str, String)>,
|
||||
source: Option<Box<dyn StdError>>
|
||||
}
|
||||
|
||||
impl CoreError {
|
||||
pub fn new(kind: CoreErrorKind, message: impl Into<String>) -> Self {
|
||||
Self {
|
||||
kind,
|
||||
message: message.into(),
|
||||
context: vec![],
|
||||
source: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_source(kind: CoreErrorKind, message: impl Into<String>, source: impl StdError + Send + Sync + 'static) -> Self {
|
||||
Self {
|
||||
kind,
|
||||
message: message.into(),
|
||||
context: vec![],
|
||||
source: Some(Box::new(source)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn context(mut self, context: Vec<(&'static str, String)>) -> Self {
|
||||
self.context = context;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for CoreError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?} - {}", self.kind, self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl StdError for CoreError {
|
||||
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
||||
self.source.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
pub type CoreResult<T> = Result<T, CoreError>;
|
||||
|
||||
pub fn not_found(entity: &'static str, id: impl Into<String>) -> CoreError {
|
||||
CoreError::new(CoreErrorKind::NotFound, format!("{entity} not found: {}", id.into()))
|
||||
}
|
||||
|
||||
|
||||
impl From<argon2::password_hash::Error> for CoreError {
|
||||
fn from(error: argon2::password_hash::Error) -> Self {
|
||||
Self::new(CoreErrorKind::External, error.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for CoreError {
|
||||
fn from(error: std::io::Error) -> Self {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown IO error: {:?}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redis::RedisError> for CoreError {
|
||||
fn from(error: redis::RedisError) -> Self {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown redis error: {:?}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<sqlx::Error> for CoreError {
|
||||
fn from(error: sqlx::Error) -> Self {
|
||||
match error {
|
||||
sqlx::Error::RowNotFound => CoreError::new(CoreErrorKind::NotFound, "Not found".to_string()),
|
||||
sqlx::Error::ColumnIndexOutOfBounds { .. } => CoreError::new(CoreErrorKind::InvalidInput, error.to_string()),
|
||||
sqlx::Error::ColumnNotFound { .. } => CoreError::new(CoreErrorKind::NotFound, error.to_string()),
|
||||
sqlx::Error::ColumnDecode { .. } => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::Decode(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::PoolTimedOut => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::PoolClosed => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::Tls(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::Io(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::Protocol(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::Configuration(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::AnyDriverError(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::Database(err) => {
|
||||
if let Some(code) = err.code() {
|
||||
match code.trim() {
|
||||
// Unique violation
|
||||
"23505" => return CoreError::new(CoreErrorKind::External, err.to_string()),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
CoreError::new(CoreErrorKind::External, err.to_string())
|
||||
}
|
||||
sqlx::Error::Migrate(_) => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
sqlx::Error::TypeNotFound { type_name } => {
|
||||
CoreError::new(CoreErrorKind::External, format!("Type not found: {}", type_name))
|
||||
}
|
||||
sqlx::Error::WorkerCrashed => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
_ => CoreError::new(CoreErrorKind::External, error.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest::Error> for CoreError {
|
||||
fn from(error: reqwest::Error) -> Self {
|
||||
match error.status() {
|
||||
Some(status_code) => {
|
||||
if status_code.is_client_error() {
|
||||
Self::new(CoreErrorKind::External, format!("Client reqwest error: {:?}", error))
|
||||
} else if status_code.is_server_error() {
|
||||
Self::new(CoreErrorKind::External, format!("Server reqwest error: {:?}", error))
|
||||
} else {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown reqwest error: {:?}", error))
|
||||
}
|
||||
}
|
||||
_ => Self::new(CoreErrorKind::External, format!("Unknown reqwest error: {:?}", error)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<s3::error::S3Error> for CoreError {
|
||||
fn from(error: s3::error::S3Error) -> Self {
|
||||
match error {
|
||||
s3::error::S3Error::Credentials(err) => {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown s3 credentials error: {:?}", err))
|
||||
}
|
||||
s3::error::S3Error::FromUtf8(err) => {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown s3 from utf8 error: {:?}", err))
|
||||
}
|
||||
s3::error::S3Error::FmtError(err) => {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown s3 fmt error: {:?}", err))
|
||||
}
|
||||
s3::error::S3Error::HmacInvalidLength(err) => Self::new(
|
||||
CoreErrorKind::External,
|
||||
format!("Unknown s3 hmac invalid length error: {:?}", err),
|
||||
),
|
||||
_ => {
|
||||
let re = Regex::new(r"HTTP (\d{3})").unwrap();
|
||||
// Apply the regex to the input string
|
||||
if let Some(captures) = re.captures(&error.to_string()) {
|
||||
if let Some(http_code_str) = captures.get(1) {
|
||||
if let Ok(http_code) = http_code_str.as_str().parse::<u16>() {
|
||||
return Self::new(CoreErrorKind::External, error.to_string()).context(vec![("http_code", http_code.to_string())]);
|
||||
}
|
||||
}
|
||||
}
|
||||
Self::new(CoreErrorKind::External, format!("Unknown s3 error: {:?}", error))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::env::VarError> for CoreError {
|
||||
fn from(error: std::env::VarError) -> Self {
|
||||
Self::new(
|
||||
CoreErrorKind::External,
|
||||
format!("Unknown environment variable error: {:?}", error),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for CoreError {
|
||||
fn from(error: serde_json::Error) -> Self {
|
||||
Self::new(CoreErrorKind::External, format!("Unknown serde_json error: {:?}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> From<PoisonError<MutexGuard<'a, T>>> for CoreError {
|
||||
fn from(_: PoisonError<MutexGuard<'a, T>>) -> Self {
|
||||
Self::new(CoreErrorKind::External, "Failed to acquire lock".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<core::num::ParseIntError> for CoreError {
|
||||
fn from(error: core::num::ParseIntError) -> Self {
|
||||
Self::new(CoreErrorKind::External, format!("Integer parse error: {:?}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<core::num::ParseFloatError> for CoreError {
|
||||
fn from(error: core::num::ParseFloatError) -> Self {
|
||||
Self::new(CoreErrorKind::External, format!("Float parse error: {:?}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<regex::Error> for CoreError {
|
||||
fn from(error: regex::Error) -> Self {
|
||||
Self::new(CoreErrorKind::External, error.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<chrono::ParseError> for CoreError {
|
||||
fn from(error: chrono::ParseError) -> Self {
|
||||
Self::new(CoreErrorKind::External, format!("Chrono parse error: {:?}", error))
|
||||
}
|
||||
}
|
||||
92
crates/lib/src/http_client.rs
Normal file
92
crates/lib/src/http_client.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use crate::error::{CoreResult, CoreError, CoreErrorKind};
|
||||
use governor::clock::DefaultClock;
|
||||
use governor::state::{InMemoryState, NotKeyed};
|
||||
use governor::{Quota, RateLimiter};
|
||||
use reqwest::header::{IF_NONE_MATCH, RETRY_AFTER};
|
||||
use reqwest::{Certificate, Client, Response, StatusCode};
|
||||
use std::env;
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HttpClient {
|
||||
client: Client,
|
||||
limiter: Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
|
||||
pub default_retry_after: u64,
|
||||
}
|
||||
|
||||
impl HttpClient {
|
||||
pub fn new(default_retry_after: u64) -> CoreResult<Self> {
|
||||
let mut client_builder = Client::builder()
|
||||
.timeout(Duration::from_secs(10))
|
||||
.tls_built_in_root_certs(true);
|
||||
|
||||
if let Ok(val) = env::var("NGINX_SSL_ENABLED") {
|
||||
if val == "true" {
|
||||
let certificate_path = env::var("SSL_CA_PATH")?;
|
||||
let certificate_data = std::fs::read(certificate_path)?;
|
||||
let certificate = Certificate::from_pem(&certificate_data)?;
|
||||
client_builder = client_builder.add_root_certificate(certificate);
|
||||
}
|
||||
}
|
||||
|
||||
let client = client_builder.build()?;
|
||||
|
||||
let quota = Quota::per_second(NonZeroU32::new(15).unwrap());
|
||||
let limiter = RateLimiter::direct(quota);
|
||||
let limiter = Arc::new(limiter);
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
limiter,
|
||||
default_retry_after,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn default() -> CoreResult<Self> {
|
||||
Self::new(60)
|
||||
}
|
||||
|
||||
pub async fn get(&self, url: &str, etag: Option<String>) -> CoreResult<Response> {
|
||||
self.limiter.until_ready().await;
|
||||
|
||||
let mut request = self.client.get(url);
|
||||
if let Some(ref etag) = etag {
|
||||
request = request.header(IF_NONE_MATCH, etag);
|
||||
}
|
||||
|
||||
let mut response = request.send().await?;
|
||||
|
||||
// Handle too many requests
|
||||
if response.status() == StatusCode::TOO_MANY_REQUESTS {
|
||||
let retry_after = response
|
||||
.headers()
|
||||
.get(RETRY_AFTER)
|
||||
.and_then(|hdr| hdr.to_str().ok())
|
||||
.and_then(|s| s.parse::<u64>().ok())
|
||||
.unwrap_or(self.default_retry_after);
|
||||
|
||||
log::warn!(
|
||||
"Received 429 Too Many Requests, retrying after {}s",
|
||||
retry_after
|
||||
);
|
||||
sleep(Duration::from_secs(retry_after)).await;
|
||||
|
||||
// Retry once more
|
||||
response = self.client.get(url).send().await?;
|
||||
} else if response.status() == StatusCode::NOT_MODIFIED {
|
||||
log::warn!("Received 304 Not modified")
|
||||
}
|
||||
|
||||
if response.status() != 200 {
|
||||
return Err(CoreError::new(
|
||||
CoreErrorKind::External,
|
||||
format!("Request returned status {}", response.status()),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
6
crates/lib/src/lib.rs
Normal file
6
crates/lib/src/lib.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
pub mod accounts;
|
||||
pub mod airports;
|
||||
pub mod metars;
|
||||
pub mod http_client;
|
||||
pub mod state;
|
||||
pub mod error;
|
||||
57
crates/lib/src/metars/metar_check.rs
Normal file
57
crates/lib/src/metars/metar_check.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use crate::error::CoreResult;
|
||||
use crate::metars::model::Metar;
|
||||
use crate::state::AppState;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct MetarCheck {
|
||||
pub icao: String,
|
||||
pub status: bool,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub last_metar: Option<Metar>,
|
||||
}
|
||||
|
||||
impl MetarCheck {
|
||||
pub async fn new(state: &AppState, icao: String, status: bool) -> Self {
|
||||
match Self::get(state, &icao).await {
|
||||
Some(c) => Self {
|
||||
icao,
|
||||
status,
|
||||
updated_at: Utc::now(),
|
||||
last_metar: c.last_metar,
|
||||
},
|
||||
None => Self {
|
||||
icao,
|
||||
status,
|
||||
updated_at: Utc::now(),
|
||||
last_metar: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get(state: &AppState, icao: &str) -> Option<MetarCheck> {
|
||||
let result: CoreResult<Option<String>> = state.get(icao).await;
|
||||
match result {
|
||||
Ok(Some(value)) => match serde_json::from_str(&value) {
|
||||
Ok(result) => Some(result),
|
||||
Err(err) => {
|
||||
log::error!("Unable to get MetarCheck for ICAO {}: {}", icao, err);
|
||||
None
|
||||
}
|
||||
},
|
||||
Ok(None) => None,
|
||||
Err(err) => {
|
||||
log::error!("Error getting MetarCheck for ICAO {}: {}", icao, err);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn insert(&self, state: &AppState) -> CoreResult<()> {
|
||||
let value = serde_json::to_string(&self)?;
|
||||
state.set(self.icao.as_str(), &value).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
7
crates/lib/src/metars/mod.rs
Normal file
7
crates/lib/src/metars/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
mod metar_check;
|
||||
mod model;
|
||||
mod utils;
|
||||
|
||||
pub use metar_check::*;
|
||||
pub use model::*;
|
||||
pub use utils::*;
|
||||
1287
crates/lib/src/metars/model.rs
Normal file
1287
crates/lib/src/metars/model.rs
Normal file
File diff suppressed because it is too large
Load Diff
113
crates/lib/src/metars/utils.rs
Normal file
113
crates/lib/src/metars/utils.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
use crate::error::{CoreError, CoreErrorKind, CoreResult};
|
||||
use chrono::{Datelike, NaiveDate, Utc};
|
||||
|
||||
pub fn parse_metar_time(observation_time: &str) -> CoreResult<String> {
|
||||
if observation_time.len() != 7 {
|
||||
return Err(CoreError::new(
|
||||
CoreErrorKind::InvalidInput,
|
||||
format!("Unable to parse observation time in {}", observation_time),
|
||||
));
|
||||
}
|
||||
let observation_day = match observation_time[0..2].parse::<u32>() {
|
||||
Ok(day) => day,
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
let observation_hour = match observation_time[2..4].parse::<u32>() {
|
||||
Ok(hour) => hour,
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
let observation_minute = match observation_time[4..6].parse::<u32>() {
|
||||
Ok(minute) => minute,
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
let current_time = Utc::now().naive_utc();
|
||||
let current_year = current_time.year();
|
||||
let current_month = current_time.month();
|
||||
let candidate_date = NaiveDate::from_ymd_opt(current_year, current_month, observation_day)
|
||||
.ok_or_else(|| {
|
||||
CoreError::new(
|
||||
CoreErrorKind::InvalidInput,
|
||||
format!(
|
||||
"Invalid date with day {} for current month",
|
||||
observation_day
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let candidate_date = match candidate_date.and_hms_opt(observation_hour, observation_minute, 0) {
|
||||
Some(date) => date,
|
||||
None => {
|
||||
return Err(CoreError::new(
|
||||
CoreErrorKind::InvalidInput,
|
||||
format!(
|
||||
"Invalid time for time '{}': hour {}, minute {}",
|
||||
observation_time, observation_hour, observation_minute
|
||||
),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let obs_datetime = if candidate_date > current_time {
|
||||
// Subtract one month. (Handle year rollover carefully.)
|
||||
let (month, year) = if current_month == 1 {
|
||||
(12, current_year - 1)
|
||||
} else {
|
||||
(current_month - 1, current_year)
|
||||
};
|
||||
|
||||
let adjusted_date = NaiveDate::from_ymd_opt(year, month, observation_day).ok_or_else(|| {
|
||||
CoreError::new(
|
||||
CoreErrorKind::InvalidInput,
|
||||
format!(
|
||||
"Invalid date with day {} for month {}",
|
||||
observation_day, month
|
||||
),
|
||||
)
|
||||
})?;
|
||||
adjusted_date
|
||||
.and_hms_opt(observation_hour, observation_minute, 0)
|
||||
.unwrap()
|
||||
} else {
|
||||
candidate_date
|
||||
};
|
||||
Ok(obs_datetime.format("%Y-%m-%dT%H:%M:00Z").to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::NaiveDateTime;
|
||||
|
||||
#[test]
|
||||
fn test_parse_metar_time() {
|
||||
for day in 1..=31 {
|
||||
for hour in 0..24 {
|
||||
for minute in 0..60 {
|
||||
// METAR form "DDHHMMZ"
|
||||
let obs_time = format!("{:02}{:02}{:02}Z", day, hour, minute);
|
||||
let result = parse_metar_time(&obs_time);
|
||||
match result {
|
||||
Ok(datetime_str) => {
|
||||
// "YYYY-MM-DDTHH:MM:00Z"
|
||||
assert_eq!(
|
||||
datetime_str.len(),
|
||||
20,
|
||||
"Unexpected length for input {} yielded {}",
|
||||
obs_time,
|
||||
datetime_str
|
||||
);
|
||||
// Remove the trailing 'Z' and parse
|
||||
let trimmed = &datetime_str[..19];
|
||||
NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S").unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Parsing '{}' from input {} failed: {}",
|
||||
trimmed, obs_time, e
|
||||
)
|
||||
});
|
||||
}
|
||||
Err(_err) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
167
crates/lib/src/state.rs
Normal file
167
crates/lib/src/state.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use std::env;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use redis::aio::ConnectionManager;
|
||||
use redis::AsyncTypedCommands;
|
||||
use s3::{Bucket, BucketConfiguration, Region};
|
||||
use s3::creds::Credentials;
|
||||
use sqlx::{Pool, Postgres};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use crate::error::CoreResult;
|
||||
use crate::http_client::HttpClient;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub client: HttpClient,
|
||||
pub pool: Pool<Postgres>,
|
||||
pub connection_manager: Arc<Mutex<ConnectionManager>>,
|
||||
pub bucket: Box<Bucket>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub async fn new() -> CoreResult<Self> {
|
||||
let client = HttpClient::default()?;
|
||||
|
||||
let pool: Pool<Postgres> = {
|
||||
let user = env::var("POSTGRES_USER").unwrap_or("raac".to_string());
|
||||
let password = env::var("POSTGRES_PASSWORD").expect("POSTGRES_PASSWORD must be set");
|
||||
let host: String = env::var("POSTGRES_HOST").expect("POSTGRES_HOST must be set");
|
||||
let port = env::var("POSTGRES_PORT").unwrap_or("5432".to_string());
|
||||
let name = env::var("POSTGRES_DB").unwrap_or("raac".to_string());
|
||||
|
||||
let url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
&user, &password, &host, &port, &name
|
||||
);
|
||||
|
||||
log::info!(
|
||||
"Connecting to database at postgres://{}:*****@{}:{}/{}...",
|
||||
&user,
|
||||
&host,
|
||||
&port,
|
||||
&name
|
||||
);
|
||||
|
||||
let connections = env::var("POSTGRES_CONNECTIONS")
|
||||
.unwrap_or("5".to_string())
|
||||
.parse::<u32>()
|
||||
.unwrap_or(5);
|
||||
let timeout = env::var("POSTGRES_TIMEOUT")
|
||||
.unwrap_or("30".to_string())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(30);
|
||||
|
||||
PgPoolOptions::new()
|
||||
.max_connections(connections)
|
||||
.acquire_timeout(Duration::from_secs(timeout))
|
||||
.connect(&url)
|
||||
.await
|
||||
.expect("Failed to create postgres pool")
|
||||
};
|
||||
|
||||
let run_migrations = env::var("POSTGRES_MIGRATE")
|
||||
.unwrap_or("true".to_string())
|
||||
.parse::<bool>()
|
||||
.unwrap_or(true);
|
||||
|
||||
if run_migrations {
|
||||
log::debug!("Running database migrations...");
|
||||
match sqlx::migrate!().run(&pool).await {
|
||||
Ok(_) => log::debug!("Database migrations completed"),
|
||||
Err(err) => log::error!("Failed to run database migrations: {}", err),
|
||||
};
|
||||
}
|
||||
|
||||
let connection_manager: ConnectionManager = {
|
||||
let host = env::var("VALKEY_HOST").unwrap_or("localhost".to_string());
|
||||
let port = env::var("VALKEY_PORT").unwrap_or("6379".to_string());
|
||||
let url = format!("redis://{}:{}", host, port);
|
||||
log::info!("Connecting to in-memory datastore at {}...", &url);
|
||||
let client = redis::Client::open(url).expect("Failed to create in-memory datastore client");
|
||||
ConnectionManager::new(client)
|
||||
.await
|
||||
.expect("Failed to create in-memory datastore connection manager")
|
||||
};
|
||||
|
||||
// Setup Bucket connection
|
||||
let bucket = {
|
||||
let protocol = std::env::var("MINIO_PROTOCOL").unwrap_or("http".to_string());
|
||||
let host = std::env::var("MINIO_HOST").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = std::env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = std::env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let bucket_name = std::env::var("MINIO_BUCKET").unwrap_or("aviation".to_string());
|
||||
let url = format!("{}://{}:{}", protocol, host, port);
|
||||
|
||||
let region = Region::Custom {
|
||||
region: "".to_string(),
|
||||
endpoint: url.to_string(),
|
||||
};
|
||||
|
||||
let credentials = Credentials {
|
||||
access_key: Some(user),
|
||||
secret_key: Some(password),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
};
|
||||
|
||||
let bucket = Bucket::new(&bucket_name, region.clone(), credentials.clone())?.with_path_style();
|
||||
log::info!("Checking for object in bucket at {}", ®ion.endpoint());
|
||||
match bucket.head_object("/").await {
|
||||
Ok(_) => bucket,
|
||||
Err(_) => {
|
||||
log::debug!("Creating '{}' bucket", &bucket_name);
|
||||
let response = match Bucket::create_with_path_style(
|
||||
&bucket_name,
|
||||
region,
|
||||
credentials,
|
||||
BucketConfiguration::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
log::error!("Failed to create bucket '{}': {}", &bucket_name, err);
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
response.bucket
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
pool,
|
||||
connection_manager: Arc::new(Mutex::new(connection_manager)),
|
||||
bucket,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn set(&self, key: &str, value: &str) -> CoreResult<()> {
|
||||
let mut connection_manager = self.connection_manager.lock()?;
|
||||
connection_manager.set(key, value).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_ex(&self, key: &str, value: &str, seconds: u64) -> CoreResult<()> {
|
||||
let mut connection_manager = self.connection_manager.lock()?;
|
||||
connection_manager.set_ex(key, value, seconds).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get(&self, key: &str) -> CoreResult<Option<String>> {
|
||||
let mut connection_manager = self.connection_manager.lock()?;
|
||||
match connection_manager.get(key).await {
|
||||
Ok(value) => Ok(value),
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn del(&self, key: &str) -> CoreResult<()> {
|
||||
let mut connection_manager = self.connection_manager.lock()?;
|
||||
connection_manager.del(key).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user