Scheduler to update metars every hour

This commit is contained in:
2023-10-02 18:34:34 -04:00
parent d046b29dce
commit f4251a4039
5 changed files with 104 additions and 46 deletions

View File

@@ -23,6 +23,6 @@ r2d2 = "0.8.10"
reqwest = "0.11.20" reqwest = "0.11.20"
serde = {version = "1.0.188", features = ["derive"]} serde = {version = "1.0.188", features = ["derive"]}
serde_json = "1.0.105" serde_json = "1.0.105"
tokio = { version = "1.32.0", features = ["macros", "rt"] } tokio = { version = "1.32.0", features = ["macros", "rt", "time"] }
uuid = { version = "1.4.1", features = ["serde", "v4"] } uuid = { version = "1.4.1", features = ["serde", "v4"] }
log = "0.4.20" log = "0.4.20"

View File

@@ -44,13 +44,13 @@ pub struct QueryAirport {
} }
impl QueryAirport { impl QueryAirport {
pub fn get_all(bounds: &Option<Polygon<Point>>, category: &Option<String>, filter: &Option<String>, limit: i32, page: i32) -> Result<Vec<Self>, ServiceError> { pub fn get_all(bounds: &Option<Polygon<Point>>, category: &Option<String>, filter: &Option<String>, order_by: bool, limit: i32, page: i32) -> Result<Vec<Self>, ServiceError> {
let mut conn = db::connection()?; let mut conn = db::connection()?;
let mut query = airports::table let mut query = airports::table
.limit(limit as i64) .limit(limit as i64)
.into_boxed(); .into_boxed();
query = query.filter(airports::id.gt(std::cmp::max(1, page - 1) * limit)); query = query.filter(airports::id.gt(std::cmp::max(0, page - 1) * limit));
if let Some(bounds) = bounds { if let Some(bounds) = bounds {
query = query.filter(st_contains(bounds, airports::point)); query = query.filter(st_contains(bounds, airports::point));
@@ -64,7 +64,10 @@ impl QueryAirport {
.or(airports::full_name.ilike(format!("%{}%", filter))) .or(airports::full_name.ilike(format!("%{}%", filter)))
) )
} }
let airports: Vec<QueryAirport> = query.order((airports::id.asc(), airports::category.asc())).load::<QueryAirport>(&mut conn)?; if order_by {
query = query.order(airports::category.asc());
}
let airports: Vec<QueryAirport> = query.load::<QueryAirport>(&mut conn)?;
Ok(airports) Ok(airports)
} }

View File

@@ -96,7 +96,7 @@ async fn get_all(req: HttpRequest) -> HttpResponse {
}; };
let pages = ((total as f64) / (if limit <= 0 { 1 } else { limit} as f64)).ceil() as i64; let pages = ((total as f64) / (if limit <= 0 { 1 } else { limit} as f64)).ceil() as i64;
match web::block(move || QueryAirport::get_all(&polygon, &category, &filter, limit, page)).await.unwrap() { match web::block(move || QueryAirport::get_all(&polygon, &category, &filter, true, limit, page)).await.unwrap() {
Ok(a) => HttpResponse::Ok().json(AirportsResponse { Ok(a) => HttpResponse::Ok().json(AirportsResponse {
data: a, data: a,
meta: Metadata { page, limit, pages, total } meta: Metadata { page, limit, pages, total }

View File

@@ -17,50 +17,52 @@ mod error_handler;
mod metars; mod metars;
mod users; mod users;
mod schema; mod schema;
mod scheduler;
#[actix_rt::main] #[actix_rt::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
dotenv().ok(); dotenv().ok();
if std::env::var_os("RUST_LOG").is_none() { if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "info,actix=info,diesel_migrations=warn,reqwest=warn,hyper=warn"); std::env::set_var("RUST_LOG", "info,actix=info,diesel_migrations=warn,reqwest=warn,hyper=warn");
} }
env_logger::init_from_env(Env::default().default_filter_or("info")); env_logger::init_from_env(Env::default().default_filter_or("info"));
db::init(); db::init();
scheduler::update_airports();
let mut listenfd = ListenFd::from_env(); let mut listenfd = ListenFd::from_env();
let mut server = HttpServer::new(|| { let mut server = HttpServer::new(|| {
let cors = Cors::default() let cors = Cors::default()
.allow_any_origin() .allow_any_origin()
.allow_any_method() .allow_any_method()
.allow_any_header(); .allow_any_header();
App::new() App::new()
.configure(airports::init_routes) .configure(airports::init_routes)
.configure(metars::init_routes) .configure(metars::init_routes)
.configure(users::init_routes) .configure(users::init_routes)
.wrap(cors) .wrap(cors)
.wrap(Logger::default()) .wrap(Logger::default())
}); });
server = match listenfd.take_tcp_listener(0)? { server = match listenfd.take_tcp_listener(0)? {
Some(listener) => server.listen(listener)?, Some(listener) => server.listen(listener)?,
None => { None => {
let host = match std::env::var("SERVICE_HOST") { let host = match std::env::var("SERVICE_HOST") {
Ok(h) => h, Ok(h) => h,
Err(_) => { Err(_) => {
warn!("Defaulting to SERVICE_HOST localhost"); warn!("Defaulting to SERVICE_HOST localhost");
"localhost".to_string() "localhost".to_string()
}
};
let port = match std::env::var("SERVICE_PORT") {
Ok(p) => p,
Err(_) => {
warn!("Defaulting to SERVICE_PORT 5000");
"5000".to_string()
}
};
debug!("Binding server to {}:{}", host, port);
server.bind(format!("{}:{}", host, port))?
} }
}; };
server.run().await let port = match std::env::var("SERVICE_PORT") {
} Ok(p) => p,
Err(_) => {
warn!("Defaulting to SERVICE_PORT 5000");
"5000".to_string()
}
};
debug!("Binding server to {}:{}", host, port);
server.bind(format!("{}:{}", host, port))?
}
};
server.run().await
}

53
service/src/scheduler.rs Normal file
View File

@@ -0,0 +1,53 @@
use tokio::time::{sleep, Duration};
use log::{warn, debug, trace};
use crate::airports::QueryAirport;
use crate::metars::Metar;
pub fn update_airports() {
tokio::spawn(async {
loop {
debug!("METAR update start");
let total = match QueryAirport::get_count(&None, &None, &None) {
Ok(t) => t,
Err(err) => {
warn!("{}", err);
break
}
};
let limit = 50;
let pages = ((total as f32) / (if limit <= 0 { 1 } else { limit} as f32)).ceil() as i32;
let mut airports: Vec<QueryAirport> = vec![];
for page in 1..(pages + 1) {
match QueryAirport::get_all(&None, &None, &None, false, limit, page) {
Ok(mut a) => {
airports.append(&mut a)
},
Err(err) => {
warn!("{}", err);
break
}
}
}
debug!("Updating {} airport METARS", airports.len());
let airport_icaos: Vec<String> = airports.iter().map(|a| a.icao.to_string()).collect();
let mut peekable = airport_icaos.into_iter().peekable();
while peekable.peek().is_some() {
let chunk: Vec<String> = peekable.by_ref().take(limit as usize).collect();
let icao_string = chunk.join(",");
trace!("Updating METARS for: {}", icao_string);
match Metar::get_all(icao_string).await {
Ok(_) => {
sleep(Duration::from_millis(100)).await;
},
Err(err) => {
warn!("{}", err);
}
}
}
debug!("METAR update complete");
sleep(Duration::from_secs(60 * 60)).await;
}
});
}