Overhaul refactor. Still things in progress
This commit is contained in:
@@ -1,112 +1,129 @@
|
||||
use crate::error::{ApiError, ApiResult};
|
||||
use diesel::{r2d2::ConnectionManager, PgConnection};
|
||||
use redis::{Client as RedisClient, aio::MultiplexedConnection as RedisConnection};
|
||||
use crate::error::ApiResult;
|
||||
use redis::{Client as RedisClient, aio::MultiplexedConnection as RedisConnection, RedisResult};
|
||||
use s3::{
|
||||
Bucket, Region, creds::Credentials, BucketConfiguration, request::ResponseData,
|
||||
bucket_ops::CreateBucketResponse,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use crate::diesel_migrations::MigrationHarness;
|
||||
use lazy_static::lazy_static;
|
||||
use log::{error, info, warn};
|
||||
use r2d2;
|
||||
use std::env;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use sqlx::{Pool, Postgres};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
|
||||
pub mod schema;
|
||||
static POOL: OnceLock<Pool<Postgres>> = OnceLock::new();
|
||||
static REDIS: OnceLock<RedisClient> = OnceLock::new();
|
||||
static BUCKET: OnceLock<Bucket> = OnceLock::new();
|
||||
|
||||
type Pool = r2d2::Pool<ConnectionManager<PgConnection>>;
|
||||
pub type DbConnection = r2d2::PooledConnection<ConnectionManager<PgConnection>>;
|
||||
pub async fn initialize() -> ApiResult<()> {
|
||||
log::info!("Initializing database...");
|
||||
let db_user = std::env::var("POSTGRES_USER").unwrap_or("siren".to_string());
|
||||
let db_password = std::env::var("POSTGRES_PASSWORD").expect("POSTGRES_PASSWORD must be set");
|
||||
let db_host: String = std::env::var("POSTGRES_HOST").expect("POSTGRES_HOST must be set");
|
||||
let db_port = std::env::var("POSTGRES_PORT").unwrap_or("5432".to_string());
|
||||
let db_name = std::env::var("POSTGRES_NAME").unwrap_or("siren".to_string());
|
||||
|
||||
pub const MIGRATIONS: diesel_migrations::EmbeddedMigrations = embed_migrations!();
|
||||
|
||||
lazy_static! {
|
||||
static ref POOL: Pool = {
|
||||
let username = env::var("DATABASE_USER").expect("Database username is not set");
|
||||
let password = env::var("DATABASE_PASSWORD").expect("Database password is not set");
|
||||
let host = env::var("DATABASE_HOST").unwrap_or("localhost".to_string());
|
||||
let name = env::var("DATABASE_NAME").expect("Database name is not set");
|
||||
let port = env::var("DATABASE_PORT").unwrap_or("5432".to_string());
|
||||
let url = format!(
|
||||
// Setup Postgres pool connection
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.acquire_timeout(Duration::from_secs(30))
|
||||
.connect(&format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
username, password, host, port, name
|
||||
);
|
||||
let manager = ConnectionManager::<PgConnection>::new(url);
|
||||
Pool::builder()
|
||||
.test_on_check_out(true)
|
||||
.build(manager)
|
||||
.expect("Failed to create db pool")
|
||||
};
|
||||
static ref REDIS: RedisClient = {
|
||||
let host = env::var("REDIS_HOST").unwrap_or("localhost".to_string());
|
||||
let port = env::var("REDIS_PORT").unwrap_or("6379".to_string());
|
||||
db_user, db_password, db_host, db_port, db_name
|
||||
))
|
||||
.await?;
|
||||
match POOL.set(pool) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
log::warn!("Database pool already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
// Setup Redis connection
|
||||
let redis = {
|
||||
let host = std::env::var("REDIS_HOST").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("REDIS_PORT").unwrap_or("6379".to_string());
|
||||
let url = format!("redis://{}:{}", host, port);
|
||||
RedisClient::open(url).expect("Failed to create redis client")
|
||||
};
|
||||
static ref BUCKET: Bucket = {
|
||||
let url = env::var("MINIO_HOST").unwrap_or("localhost".to_string());
|
||||
let port = env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let base_url = format!("http://{}:{}", url, port);
|
||||
match REDIS.set(redis) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
log::warn!("Redis client already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
let region = Region::Custom {
|
||||
region: "".to_string(),
|
||||
endpoint: base_url,
|
||||
};
|
||||
let schema = std::env::var("MINIO_SCHEMA").unwrap_or("http".to_string());
|
||||
let url = std::env::var("MINIO_HOST").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = std::env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = std::env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let base_url = format!("{}://{}:{}", schema, url, port);
|
||||
|
||||
let credentials = Credentials {
|
||||
access_key: Some(user),
|
||||
secret_key: Some(password),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
};
|
||||
|
||||
*Bucket::new("aviation", region.clone(), credentials.clone())
|
||||
.expect("Failed to create S3 Bucket")
|
||||
.with_path_style()
|
||||
let region = Region::Custom {
|
||||
region: "".to_string(),
|
||||
endpoint: base_url,
|
||||
};
|
||||
|
||||
let credentials = Credentials {
|
||||
access_key: Some(user),
|
||||
secret_key: Some(password),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
};
|
||||
|
||||
let bucket = Bucket::new("aviation", region.clone(), credentials.clone())
|
||||
.expect("Failed to create S3 Bucket")
|
||||
.with_path_style();
|
||||
|
||||
match BUCKET.set(*bucket) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
log::warn!("Bucket client already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
// Run migrations
|
||||
match run_migrations().await {
|
||||
Ok(_) => log::debug!("Successfully ran migrations"),
|
||||
Err(e) => log::error!("Failed to run migrations: {}", e),
|
||||
}
|
||||
|
||||
log::info!("Database initialized");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn init() {
|
||||
lazy_static::initialize(&POOL);
|
||||
lazy_static::initialize(&REDIS);
|
||||
lazy_static::initialize(&BUCKET);
|
||||
match create_bucket().await {
|
||||
Ok(_) => info!("Bucket initialized"),
|
||||
Err(err) => match err.status {
|
||||
409 => warn!("Bucket already exists"),
|
||||
_ => error!("Failed to initialize bucket; {}", err),
|
||||
},
|
||||
};
|
||||
let mut pool: DbConnection = connection().expect("Failed to get db connection");
|
||||
match pool.run_pending_migrations(MIGRATIONS) {
|
||||
Ok(_) => info!("Database initialized"),
|
||||
Err(err) => error!("Failed to initialize database; {}", err),
|
||||
};
|
||||
pub fn pool() -> &'static Pool<Postgres> {
|
||||
POOL.get().unwrap()
|
||||
}
|
||||
|
||||
pub fn connection() -> ApiResult<DbConnection> {
|
||||
POOL
|
||||
.get()
|
||||
.map_err(|e| ApiError::new(500, format!("Failed getting db connection: {}", e)))
|
||||
fn redis() -> &'static RedisClient {
|
||||
REDIS.get().unwrap()
|
||||
}
|
||||
|
||||
pub fn redis_connection() -> ApiResult<redis::Connection> {
|
||||
let conn = REDIS.get_connection()?;
|
||||
pub fn redis_connection() -> RedisResult<redis::Connection> {
|
||||
let conn = redis().get_connection()?;
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
pub async fn redis_async_connection() -> ApiResult<RedisConnection> {
|
||||
let conn = REDIS.get_multiplexed_async_connection().await?;
|
||||
pub async fn redis_async_connection() -> RedisResult<RedisConnection> {
|
||||
let conn = redis().get_multiplexed_async_connection().await?;
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
async fn run_migrations() -> ApiResult<()> {
|
||||
log::debug!("Running migrations");
|
||||
let pool = pool();
|
||||
sqlx::migrate!().run(pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_bucket() -> ApiResult<CreateBucketResponse> {
|
||||
let url = env::var("MINIO_URL").unwrap_or("localhost".to_string());
|
||||
let port = env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let url = std::env::var("MINIO_URL").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = std::env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = std::env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let base_url = format!("http://{}:{}", url, port);
|
||||
|
||||
let region = Region::Custom {
|
||||
@@ -133,29 +150,24 @@ async fn create_bucket() -> ApiResult<CreateBucketResponse> {
|
||||
}
|
||||
|
||||
pub async fn upload_file(path: &str, content: &[u8]) -> ApiResult<ResponseData> {
|
||||
let response = BUCKET.put_object(path, content).await?;
|
||||
let response = BUCKET.get().unwrap().put_object(path, content).await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn get_file(path: &str) -> ApiResult<Vec<u8>> {
|
||||
let response = BUCKET.get_object(path).await?;
|
||||
let response = BUCKET.get().unwrap().get_object(path).await?;
|
||||
let bytes = response.bytes();
|
||||
Ok(bytes.to_vec())
|
||||
}
|
||||
|
||||
pub async fn delete_file(path: &str) -> ApiResult<ResponseData> {
|
||||
let response = BUCKET.delete_object(path).await?;
|
||||
let response = BUCKET.get().unwrap().delete_object(path).await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Response<T> {
|
||||
pub struct Paged<T> {
|
||||
pub data: T,
|
||||
pub meta: Option<Metadata>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Metadata {
|
||||
pub page: i32,
|
||||
pub limit: i32,
|
||||
pub total: i64,
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
diesel::table! {
|
||||
use diesel::sql_types::*;
|
||||
use postgis_diesel::sql_types::*;
|
||||
airports (icao) {
|
||||
icao -> Text,
|
||||
category -> Text,
|
||||
name -> Text,
|
||||
elevation_ft -> Float,
|
||||
iso_country -> Text,
|
||||
iso_region -> Text,
|
||||
municipality -> Text,
|
||||
has_metar -> Bool,
|
||||
point -> Geometry,
|
||||
data -> Jsonb
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
metars (id) {
|
||||
id -> Integer,
|
||||
icao -> Text,
|
||||
observation_time -> Timestamp,
|
||||
raw_text -> Text,
|
||||
data -> Jsonb,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
users (email) {
|
||||
email -> Text,
|
||||
hash -> Text,
|
||||
role -> Text,
|
||||
first_name -> Text,
|
||||
last_name -> Text,
|
||||
updated_at -> Timestamp,
|
||||
created_at -> Timestamp,
|
||||
profile_picture -> Nullable<Text>,
|
||||
favorites -> Array<Text>,
|
||||
verified -> Bool,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user