Fixing loading in docker environment, updated markers
This commit is contained in:
@@ -1,9 +1,6 @@
|
||||
use crate::error::ApiResult;
|
||||
use redis::{Client as RedisClient, aio::MultiplexedConnection as RedisConnection, RedisResult};
|
||||
use s3::{
|
||||
Bucket, Region, creds::Credentials, BucketConfiguration, request::ResponseData,
|
||||
bucket_ops::CreateBucketResponse,
|
||||
};
|
||||
use s3::{Bucket, Region, creds::Credentials, BucketConfiguration, request::ResponseData};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
@@ -15,30 +12,33 @@ static REDIS: OnceLock<RedisClient> = OnceLock::new();
|
||||
static BUCKET: OnceLock<Bucket> = OnceLock::new();
|
||||
|
||||
pub async fn initialize() -> ApiResult<()> {
|
||||
let db_user = std::env::var("POSTGRES_USER").unwrap_or("aviation".to_string());
|
||||
let db_password = std::env::var("POSTGRES_PASSWORD").expect("POSTGRES_PASSWORD must be set");
|
||||
let db_host: String = std::env::var("POSTGRES_HOST").expect("POSTGRES_HOST must be set");
|
||||
let db_port = std::env::var("POSTGRES_PORT").unwrap_or("5432".to_string());
|
||||
let db_name = std::env::var("POSTGRES_NAME").unwrap_or("aviation".to_string());
|
||||
|
||||
let db_url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
&db_user, &db_password, &db_host, &db_port, &db_name
|
||||
);
|
||||
|
||||
log::info!(
|
||||
"Connecting to database at postgres://{}:*****@{}:{}/{}...",
|
||||
&db_user,
|
||||
&db_host,
|
||||
&db_port,
|
||||
&db_name
|
||||
);
|
||||
// Setup Postgres pool connection
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.acquire_timeout(Duration::from_secs(30))
|
||||
.connect(&db_url)
|
||||
.await?;
|
||||
let pool = {
|
||||
let user = std::env::var("POSTGRES_USER").unwrap_or("aviation".to_string());
|
||||
let password = std::env::var("POSTGRES_PASSWORD").expect("POSTGRES_PASSWORD must be set");
|
||||
let host: String = std::env::var("POSTGRES_HOST").expect("POSTGRES_HOST must be set");
|
||||
let port = std::env::var("POSTGRES_PORT").unwrap_or("5432".to_string());
|
||||
let name = std::env::var("POSTGRES_NAME").unwrap_or("aviation".to_string());
|
||||
|
||||
let db_url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
&user, &password, &host, &port, &name
|
||||
);
|
||||
|
||||
log::info!(
|
||||
"Connecting to database at postgres://{}:*****@{}:{}/{}...",
|
||||
&user,
|
||||
&host,
|
||||
&port,
|
||||
&name
|
||||
);
|
||||
|
||||
PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.acquire_timeout(Duration::from_secs(30))
|
||||
.connect(&db_url)
|
||||
.await?
|
||||
};
|
||||
match POOL.set(pool) {
|
||||
Ok(_) => log::info!("Database connection established"),
|
||||
Err(_) => log::warn!("Database pool already initialized"),
|
||||
@@ -49,6 +49,7 @@ pub async fn initialize() -> ApiResult<()> {
|
||||
let host = std::env::var("REDIS_HOST").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("REDIS_PORT").unwrap_or("6379".to_string());
|
||||
let url = format!("redis://{}:{}", host, port);
|
||||
log::info!("Connecting to redis at {}", &url);
|
||||
RedisClient::open(url).expect("Failed to create redis client")
|
||||
};
|
||||
match REDIS.set(redis) {
|
||||
@@ -56,33 +57,57 @@ pub async fn initialize() -> ApiResult<()> {
|
||||
Err(_) => log::warn!("Redis client already initialized"),
|
||||
}
|
||||
|
||||
let schema = std::env::var("MINIO_SCHEMA").unwrap_or("http".to_string());
|
||||
let url = std::env::var("MINIO_HOST").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = std::env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = std::env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let base_url = format!("{}://{}:{}", schema, url, port);
|
||||
// Setup Bucket connection
|
||||
let bucket = {
|
||||
let protocol = std::env::var("MINIO_PROTOCOL").unwrap_or("http".to_string());
|
||||
let host = std::env::var("MINIO_HOST").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = std::env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = std::env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let bucket_name = std::env::var("MINIO_BUCKET").unwrap_or("aviation".to_string());
|
||||
let url = format!("{}://{}:{}", protocol, host, port);
|
||||
|
||||
let region = Region::Custom {
|
||||
region: "".to_string(),
|
||||
endpoint: base_url,
|
||||
let region = Region::Custom {
|
||||
region: "".to_string(),
|
||||
endpoint: url.to_string(),
|
||||
};
|
||||
|
||||
let credentials = Credentials {
|
||||
access_key: Some(user),
|
||||
secret_key: Some(password),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
};
|
||||
|
||||
let bucket = Bucket::new(&bucket_name, region.clone(), credentials.clone())?.with_path_style();
|
||||
log::info!("Checking for object in bucket at {}", ®ion.endpoint());
|
||||
match bucket.head_object("/").await {
|
||||
Ok(_) => bucket,
|
||||
Err(_) => {
|
||||
log::debug!("Creating '{}' bucket", &bucket_name);
|
||||
let response = match Bucket::create_with_path_style(
|
||||
&bucket_name,
|
||||
region,
|
||||
credentials,
|
||||
BucketConfiguration::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
log::error!("Failed to create bucket '{}': {}", &bucket_name, err);
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
response.bucket
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let credentials = Credentials {
|
||||
access_key: Some(user),
|
||||
secret_key: Some(password),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
};
|
||||
|
||||
let bucket = Bucket::new("aviation", region.clone(), credentials.clone())
|
||||
.expect("Failed to create S3 Bucket")
|
||||
.with_path_style();
|
||||
|
||||
match BUCKET.set(*bucket) {
|
||||
Ok(_) => log::info!("Bucket initialized"),
|
||||
Err(_) => log::warn!("Bucket client already initialized"),
|
||||
Ok(_) => log::info!("Bucket connection initialized"),
|
||||
Err(_) => log::warn!("Bucket connection already initialized"),
|
||||
}
|
||||
|
||||
// Run migrations
|
||||
@@ -115,42 +140,12 @@ pub async fn redis_async_connection() -> RedisResult<RedisConnection> {
|
||||
}
|
||||
|
||||
async fn run_migrations() -> ApiResult<()> {
|
||||
log::debug!("Running migrations");
|
||||
log::debug!("Running database migrations");
|
||||
let pool = pool();
|
||||
sqlx::migrate!().run(pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_bucket() -> ApiResult<CreateBucketResponse> {
|
||||
let url = std::env::var("MINIO_URL").unwrap_or("localhost".to_string());
|
||||
let port = std::env::var("MINIO_PORT").unwrap_or("9000".to_string());
|
||||
let user = std::env::var("MINIO_ROOT_USER").expect("MINIO_ROOT_USER is not set");
|
||||
let password = std::env::var("MINIO_ROOT_PASSWORD").expect("MINIO_ROOT_PASSWORD is not set");
|
||||
let base_url = format!("http://{}:{}", url, port);
|
||||
|
||||
let region = Region::Custom {
|
||||
region: "".to_string(),
|
||||
endpoint: base_url,
|
||||
};
|
||||
|
||||
let credentials = Credentials {
|
||||
access_key: Some(user),
|
||||
secret_key: Some(password),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
};
|
||||
let bucket_name = "aviation";
|
||||
let response = Bucket::create_with_path_style(
|
||||
bucket_name,
|
||||
region,
|
||||
credentials,
|
||||
BucketConfiguration::default(),
|
||||
)
|
||||
.await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn upload_file(path: &str, content: &[u8]) -> ApiResult<ResponseData> {
|
||||
let response = BUCKET.get().unwrap().put_object(path, content).await?;
|
||||
Ok(response)
|
||||
|
||||
@@ -935,8 +935,6 @@ impl Metar {
|
||||
// Check for missing metars
|
||||
let missing_icao_list = Self::get_missing_metar_icaos(&metars, icao_list).await;
|
||||
if !missing_icao_list.is_empty() {
|
||||
log::trace!("Retrieving missing METAR data for {:?}", missing_icao_list);
|
||||
|
||||
let mut updated_missing_icao_list: Vec<&str> = Vec::new();
|
||||
for icao in &missing_icao_list {
|
||||
let result: RedisResult<Option<bool>> = conn.get(icao).await;
|
||||
@@ -952,28 +950,34 @@ impl Metar {
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
let mut missing_icao_list = Self::get_remote_metars(&updated_missing_icao_list)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
log::warn!("Unable to get remote METAR data; {}", err);
|
||||
vec![]
|
||||
});
|
||||
if !updated_missing_icao_list.is_empty() {
|
||||
log::trace!(
|
||||
"Retrieving missing METAR data for {:?}",
|
||||
updated_missing_icao_list
|
||||
);
|
||||
let mut missing_icao_list = Self::get_remote_metars(&updated_missing_icao_list)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
log::warn!("Unable to get remote METAR data; {}", err);
|
||||
vec![]
|
||||
});
|
||||
|
||||
if missing_icao_list.len() > 0 {
|
||||
// Insert missing METARs
|
||||
for missing_metar in &missing_icao_list {
|
||||
let _: RedisResult<()> = conn.set(&missing_metar.station_id, true).await;
|
||||
missing_metar.insert().await?;
|
||||
if missing_icao_list.len() > 0 {
|
||||
// Insert missing METARs
|
||||
for missing_metar in &missing_icao_list {
|
||||
let _: RedisResult<()> = conn.set(&missing_metar.station_id, true).await;
|
||||
missing_metar.insert().await?;
|
||||
}
|
||||
metars.append(&mut missing_icao_list)
|
||||
}
|
||||
metars.append(&mut missing_icao_list)
|
||||
}
|
||||
|
||||
// Invalidate the still missing icaos
|
||||
let still_missing_icao_list =
|
||||
Self::get_missing_metar_icaos(&missing_icao_list, icao_list).await;
|
||||
if !still_missing_icao_list.is_empty() {
|
||||
for icao in still_missing_icao_list {
|
||||
let _: RedisResult<()> = conn.set_ex(&icao, false, 3600).await;
|
||||
// Invalidate the still missing icaos
|
||||
let still_missing_icao_list =
|
||||
Self::get_missing_metar_icaos(&missing_icao_list, icao_list).await;
|
||||
if !still_missing_icao_list.is_empty() {
|
||||
for icao in still_missing_icao_list {
|
||||
let _: RedisResult<()> = conn.set_ex(&icao, false, 3600).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user