From ed61d120703751abfaefc0e9b284947a92e4b722 Mon Sep 17 00:00:00 2001 From: Neur0toxine Date: Sun, 26 May 2024 12:22:07 +0300 Subject: [PATCH] code quality changes, better error handling and logging --- src/main.rs | 18 +++------ src/server.rs | 28 +++++++------- src/task.rs | 95 +++++++++++++++++++++------------------------- src/thread_pool.rs | 2 + src/transcoder.rs | 79 +++++++++++++++++++++++++++++--------- 5 files changed, 126 insertions(+), 96 deletions(-) diff --git a/src/main.rs b/src/main.rs index e9ecebd..2c412c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,18 +18,12 @@ async fn main() { .init(); let addr = env::var("LISTEN").unwrap_or_else(|_| "0.0.0.0:8090".to_string()); - let pool = ThreadPool::new(match env::var("NUM_WORKERS") { - Ok(val) => match val.parse::() { - Ok(val) => { - if val > 0 { - Some(val); - } - None - } - Err(_) => None, - }, - Err(_) => None, - }); + let pool = ThreadPool::new( + env::var("NUM_WORKERS") + .ok() + .and_then(|val| val.parse::().ok()) + .filter(|&val| val > 0) + ); let temp_dir = env::var("TEMP_DIR").unwrap_or_else(|_| { env::temp_dir() .to_str() diff --git a/src/server.rs b/src/server.rs index b7ecd83..730f80a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,7 +11,7 @@ use tower_http::trace::TraceLayer; use uuid::Uuid; use crate::dto::{ConvertRequest, ConvertResponse}; -use crate::task::Task; +use crate::task::{Task, TaskParams}; use crate::thread_pool::ThreadPool; const CONTENT_LENGTH_LIMIT: usize = 30 * 1024 * 1024; @@ -66,19 +66,19 @@ async fn enqueue_file( None => return error_response("Invalid output path"), }; - let task = Task::new( - task_id, - req.format, - req.codec, - req.codec_opts, - req.bit_rate, - req.max_bit_rate, - req.sample_rate, - req.channel_layout, - req.upload_url, - input_path.to_string(), - output_path.to_string(), - ); + let params = TaskParams { + format: req.format, + codec: req.codec, + codec_opts: req.codec_opts, + bit_rate: req.bit_rate, + max_bit_rate: req.max_bit_rate, + sample_rate: req.sample_rate, + channel_layout: req.channel_layout, + upload_url: req.upload_url, + input_path: input_path.to_string(), + output_path: output_path.to_string(), + }; + let task = Task::new(task_id, params); // Enqueue the task to the thread pool server.thread_pool.enqueue(task); diff --git a/src/task.rs b/src/task.rs index e7a3442..d7044bf 100644 --- a/src/task.rs +++ b/src/task.rs @@ -5,45 +5,12 @@ use tracing::{debug, error}; pub struct Task { id: uuid::Uuid, - codec: String, - format: String, - codec_opts: Option, - bit_rate: usize, - max_bit_rate: usize, - sample_rate: i32, - channel_layout: String, - input_path: String, - output_path: String, - upload_url: String, + params: TaskParams, } impl Task { - pub fn new( - id: uuid::Uuid, - format: String, - codec: String, - codec_opts: Option, - bit_rate: usize, - max_bit_rate: usize, - sample_rate: i32, - channel_layout: String, - upload_url: String, - input_path: String, - output_path: String, - ) -> Self { - Task { - id, - format, - codec, - codec_opts, - bit_rate, - max_bit_rate, - sample_rate, - channel_layout, - input_path, - output_path, - upload_url, - } + pub fn new(id: uuid::Uuid, params: TaskParams) -> Self { + Task { id, params } } pub fn execute(self) { @@ -51,7 +18,7 @@ impl Task { "performing transcoding for task with id: {}", self.id.to_string() ); - let mut ictx = match format::input(&self.input_path) { + let mut ictx = match format::input(&self.params.input_path) { Ok(val) => val, Err(err) => { error!("couldn't initialize input context: {:?}", err); @@ -59,14 +26,14 @@ impl Task { } }; - let octx = if let Some(codec_opts) = self.codec_opts { + let octx = if let Some(codec_opts) = self.params.codec_opts { format::output_as_with( - &self.output_path, - &self.format, + &self.params.output_path, + &self.params.format, params_to_avdictionary(&codec_opts), ) } else { - format::output_as(&self.output_path, &self.format) + format::output_as(&self.params.output_path, &self.params.format) }; let mut octx = match octx { @@ -81,11 +48,11 @@ impl Task { &mut ictx, &mut octx, TranscoderParams { - codec: self.codec, - bit_rate: self.bit_rate, - max_bit_rate: self.max_bit_rate, - sample_rate: self.sample_rate, - channel_layout: match self.channel_layout.as_str() { + codec: self.params.codec, + bit_rate: self.params.bit_rate, + max_bit_rate: self.params.max_bit_rate, + sample_rate: self.params.sample_rate, + channel_layout: match self.params.channel_layout.as_str() { "stereo" => ChannelLayout::STEREO, "mono" => ChannelLayout::MONO, "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, @@ -118,8 +85,11 @@ impl Task { return; } - transcoder.receive_and_process_decoded_frames(&mut octx) - .unwrap_or_else(|err| error!("failure during processing decoded frames: {:?}", err)); + transcoder + .receive_and_process_decoded_frames(&mut octx) + .unwrap_or_else(|err| { + error!("failure during processing decoded frames: {:?}", err) + }); } } @@ -138,7 +108,8 @@ impl Task { return; } - transcoder.get_and_process_filtered_frames(&mut octx) + transcoder + .get_and_process_filtered_frames(&mut octx) .unwrap_or_else(|err| error!("failure during processing filtered frames: {:?}", err)); if let Err(err) = transcoder.send_eof_to_encoder() { @@ -146,19 +117,39 @@ impl Task { return; } - transcoder.receive_and_process_encoded_packets(&mut octx) + transcoder + .receive_and_process_encoded_packets(&mut octx) .unwrap_or_else(|err| error!("failure during transcoding: {:?}", err)); if let Err(err) = octx.write_trailer() { error!("couldn't finish transcoding: {:?}", err); + return; } + + debug!( + "finished transcoding for task with id: {}", + self.id.to_string() + ); } } +pub struct TaskParams { + pub format: String, + pub codec: String, + pub codec_opts: Option, + pub bit_rate: usize, + pub max_bit_rate: usize, + pub sample_rate: i32, + pub channel_layout: String, + pub input_path: String, + pub output_path: String, + pub upload_url: String, +} + fn params_to_avdictionary(input: &str) -> Dictionary { let mut dict: Dictionary = Dictionary::new(); - for pair in input.split(";") { - let mut parts = pair.split(":"); + for pair in input.split(';') { + let mut parts = pair.split(':'); if let (Some(key), Some(value)) = (parts.next(), parts.next()) { dict.set(key, value); diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 14de4dd..85bf017 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -2,6 +2,7 @@ use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; +use ffmpeg_next::log::Level; use tracing::{debug, error}; @@ -42,6 +43,7 @@ impl Worker { let thread = thread::spawn(move || { ffmpeg_next::init() .unwrap_or_else(|err| tracing::error!("couldn't init FFmpeg: {:?}", err)); + ffmpeg_next::util::log::set_level(Level::Quiet); loop { let task = { diff --git a/src/transcoder.rs b/src/transcoder.rs index 9106674..6ec5528 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -1,7 +1,6 @@ extern crate ffmpeg_next as ffmpeg; use std::error::Error; -use std::sync::Arc; use ffmpeg::{codec, filter, format, frame, media}; use ffmpeg_next::error::EAGAIN; @@ -35,15 +34,19 @@ impl Transcoder { .ok_or("could not find best audio stream")?; let context = codec::context::Context::from_parameters(input.parameters())?; - let mut decoder = context.decoder().audio().map_err(|err| { - format!("couldn't find decoder for input file: {}", err) - })?; + let mut decoder = context + .decoder() + .audio() + .map_err(|err| format!("couldn't find decoder for input file: {}", err))?; - let codec = ffmpeg::encoder::find_by_name(&*params.codec) + let codec = ffmpeg::encoder::find_by_name(¶ms.codec) .ok_or_else(|| format!("couldn't find codec with name: {}", params.codec))? .audio()?; - let global = octx.format().flags().contains(format::flag::Flags::GLOBAL_HEADER); + let global = octx + .format() + .flags() + .contains(format::flag::Flags::GLOBAL_HEADER); decoder.set_parameters(input.parameters())?; @@ -70,13 +73,26 @@ impl Transcoder { encoder.set_format( codec .formats() - .ok_or_else(|| format!("failed to get supported formats for codec: {}", codec.name()))? + .ok_or_else(|| { + format!( + "failed to get supported formats for codec: {}", + codec.name() + ) + })? .next() .ok_or("no supported formats found for codec")?, ); - encoder.set_bit_rate(if params.bit_rate > 0 { params.bit_rate } else { decoder.bit_rate() }); - encoder.set_max_bit_rate(if params.max_bit_rate > 0 { params.max_bit_rate } else { decoder.max_bit_rate() }); + encoder.set_bit_rate(if params.bit_rate > 0 { + params.bit_rate + } else { + decoder.bit_rate() + }); + encoder.set_max_bit_rate(if params.max_bit_rate > 0 { + params.max_bit_rate + } else { + decoder.max_bit_rate() + }); encoder.set_time_base((1, sample_rate)); output.set_time_base((1, sample_rate)); @@ -121,11 +137,26 @@ impl Transcoder { } fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> { - self.filter.get("in").unwrap().source().add(frame) + if let Some(mut ctx) = self.filter.get("in") { + let mut source = ctx.source(); + source.add(frame) + } else { + Err(ffmpeg::Error::Other { + errno: 0, + }) + } } + pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> { - self.filter.get("in").unwrap().source().flush() + if let Some(mut ctx) = self.filter.get("in") { + let mut source = ctx.source(); + source.flush() + } else { + Err(ffmpeg::Error::Other { + errno: 0, + }) + } } pub(crate) fn get_and_process_filtered_frames( @@ -134,7 +165,10 @@ impl Transcoder { ) -> Result<(), Box> { let mut filtered = frame::Audio::empty(); loop { - let mut ctx = self.filter.get("out").ok_or("cannot get context from filter")?; + let mut ctx = self + .filter + .get("out") + .ok_or("cannot get context from filter")?; if let Err(err) = ctx.sink().frame(&mut filtered) { if err != ffmpeg::Error::Eof { @@ -171,8 +205,12 @@ impl Transcoder { if let Err(mut err) = self.get_and_process_filtered_frames(octx) { let expected = ffmpeg::Error::Other { errno: EAGAIN }; - if err.downcast_mut::().ok_or(ffmpeg::Error::Bug) == Err(expected) { - continue + if err + .downcast_mut::() + .ok_or(ffmpeg::Error::Bug) + == Err(expected) + { + continue; } } } @@ -206,11 +244,16 @@ fn filter_graph( filter.output("in", 0)?.input("out", 0)?.parse(spec)?; filter.validate()?; - println!("{}", filter.dump()); - if let Some(codec) = encoder.codec() { - if !codec.capabilities().contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE) { - filter.get("out").unwrap().sink().set_frame_size(encoder.frame_size()); + if !codec + .capabilities() + .contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE) + { + filter + .get("out") + .unwrap() + .sink() + .set_frame_size(encoder.frame_size()); } }