code quality changes, better error handling and logging

This commit is contained in:
Pavel 2024-05-26 12:22:07 +03:00
parent 0d92a39309
commit ed61d12070
5 changed files with 126 additions and 96 deletions

View File

@ -18,18 +18,12 @@ async fn main() {
.init(); .init();
let addr = env::var("LISTEN").unwrap_or_else(|_| "0.0.0.0:8090".to_string()); let addr = env::var("LISTEN").unwrap_or_else(|_| "0.0.0.0:8090".to_string());
let pool = ThreadPool::new(match env::var("NUM_WORKERS") { let pool = ThreadPool::new(
Ok(val) => match val.parse::<usize>() { env::var("NUM_WORKERS")
Ok(val) => { .ok()
if val > 0 { .and_then(|val| val.parse::<usize>().ok())
Some(val); .filter(|&val| val > 0)
} );
None
}
Err(_) => None,
},
Err(_) => None,
});
let temp_dir = env::var("TEMP_DIR").unwrap_or_else(|_| { let temp_dir = env::var("TEMP_DIR").unwrap_or_else(|_| {
env::temp_dir() env::temp_dir()
.to_str() .to_str()

View File

@ -11,7 +11,7 @@ use tower_http::trace::TraceLayer;
use uuid::Uuid; use uuid::Uuid;
use crate::dto::{ConvertRequest, ConvertResponse}; use crate::dto::{ConvertRequest, ConvertResponse};
use crate::task::Task; use crate::task::{Task, TaskParams};
use crate::thread_pool::ThreadPool; use crate::thread_pool::ThreadPool;
const CONTENT_LENGTH_LIMIT: usize = 30 * 1024 * 1024; const CONTENT_LENGTH_LIMIT: usize = 30 * 1024 * 1024;
@ -66,19 +66,19 @@ async fn enqueue_file(
None => return error_response("Invalid output path"), None => return error_response("Invalid output path"),
}; };
let task = Task::new( let params = TaskParams {
task_id, format: req.format,
req.format, codec: req.codec,
req.codec, codec_opts: req.codec_opts,
req.codec_opts, bit_rate: req.bit_rate,
req.bit_rate, max_bit_rate: req.max_bit_rate,
req.max_bit_rate, sample_rate: req.sample_rate,
req.sample_rate, channel_layout: req.channel_layout,
req.channel_layout, upload_url: req.upload_url,
req.upload_url, input_path: input_path.to_string(),
input_path.to_string(), output_path: output_path.to_string(),
output_path.to_string(), };
); let task = Task::new(task_id, params);
// Enqueue the task to the thread pool // Enqueue the task to the thread pool
server.thread_pool.enqueue(task); server.thread_pool.enqueue(task);

View File

@ -5,45 +5,12 @@ use tracing::{debug, error};
pub struct Task { pub struct Task {
id: uuid::Uuid, id: uuid::Uuid,
codec: String, params: TaskParams,
format: String,
codec_opts: Option<String>,
bit_rate: usize,
max_bit_rate: usize,
sample_rate: i32,
channel_layout: String,
input_path: String,
output_path: String,
upload_url: String,
} }
impl Task { impl Task {
pub fn new( pub fn new(id: uuid::Uuid, params: TaskParams) -> Self {
id: uuid::Uuid, Task { id, params }
format: String,
codec: String,
codec_opts: Option<String>,
bit_rate: usize,
max_bit_rate: usize,
sample_rate: i32,
channel_layout: String,
upload_url: String,
input_path: String,
output_path: String,
) -> Self {
Task {
id,
format,
codec,
codec_opts,
bit_rate,
max_bit_rate,
sample_rate,
channel_layout,
input_path,
output_path,
upload_url,
}
} }
pub fn execute(self) { pub fn execute(self) {
@ -51,7 +18,7 @@ impl Task {
"performing transcoding for task with id: {}", "performing transcoding for task with id: {}",
self.id.to_string() self.id.to_string()
); );
let mut ictx = match format::input(&self.input_path) { let mut ictx = match format::input(&self.params.input_path) {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
error!("couldn't initialize input context: {:?}", err); error!("couldn't initialize input context: {:?}", err);
@ -59,14 +26,14 @@ impl Task {
} }
}; };
let octx = if let Some(codec_opts) = self.codec_opts { let octx = if let Some(codec_opts) = self.params.codec_opts {
format::output_as_with( format::output_as_with(
&self.output_path, &self.params.output_path,
&self.format, &self.params.format,
params_to_avdictionary(&codec_opts), params_to_avdictionary(&codec_opts),
) )
} else { } else {
format::output_as(&self.output_path, &self.format) format::output_as(&self.params.output_path, &self.params.format)
}; };
let mut octx = match octx { let mut octx = match octx {
@ -81,11 +48,11 @@ impl Task {
&mut ictx, &mut ictx,
&mut octx, &mut octx,
TranscoderParams { TranscoderParams {
codec: self.codec, codec: self.params.codec,
bit_rate: self.bit_rate, bit_rate: self.params.bit_rate,
max_bit_rate: self.max_bit_rate, max_bit_rate: self.params.max_bit_rate,
sample_rate: self.sample_rate, sample_rate: self.params.sample_rate,
channel_layout: match self.channel_layout.as_str() { channel_layout: match self.params.channel_layout.as_str() {
"stereo" => ChannelLayout::STEREO, "stereo" => ChannelLayout::STEREO,
"mono" => ChannelLayout::MONO, "mono" => ChannelLayout::MONO,
"stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX,
@ -118,8 +85,11 @@ impl Task {
return; return;
} }
transcoder.receive_and_process_decoded_frames(&mut octx) transcoder
.unwrap_or_else(|err| error!("failure during processing decoded frames: {:?}", err)); .receive_and_process_decoded_frames(&mut octx)
.unwrap_or_else(|err| {
error!("failure during processing decoded frames: {:?}", err)
});
} }
} }
@ -138,7 +108,8 @@ impl Task {
return; return;
} }
transcoder.get_and_process_filtered_frames(&mut octx) transcoder
.get_and_process_filtered_frames(&mut octx)
.unwrap_or_else(|err| error!("failure during processing filtered frames: {:?}", err)); .unwrap_or_else(|err| error!("failure during processing filtered frames: {:?}", err));
if let Err(err) = transcoder.send_eof_to_encoder() { if let Err(err) = transcoder.send_eof_to_encoder() {
@ -146,19 +117,39 @@ impl Task {
return; return;
} }
transcoder.receive_and_process_encoded_packets(&mut octx) transcoder
.receive_and_process_encoded_packets(&mut octx)
.unwrap_or_else(|err| error!("failure during transcoding: {:?}", err)); .unwrap_or_else(|err| error!("failure during transcoding: {:?}", err));
if let Err(err) = octx.write_trailer() { if let Err(err) = octx.write_trailer() {
error!("couldn't finish transcoding: {:?}", err); error!("couldn't finish transcoding: {:?}", err);
return;
} }
debug!(
"finished transcoding for task with id: {}",
self.id.to_string()
);
} }
} }
pub struct TaskParams {
pub format: String,
pub codec: String,
pub codec_opts: Option<String>,
pub bit_rate: usize,
pub max_bit_rate: usize,
pub sample_rate: i32,
pub channel_layout: String,
pub input_path: String,
pub output_path: String,
pub upload_url: String,
}
fn params_to_avdictionary(input: &str) -> Dictionary { fn params_to_avdictionary(input: &str) -> Dictionary {
let mut dict: Dictionary = Dictionary::new(); let mut dict: Dictionary = Dictionary::new();
for pair in input.split(";") { for pair in input.split(';') {
let mut parts = pair.split(":"); let mut parts = pair.split(':');
if let (Some(key), Some(value)) = (parts.next(), parts.next()) { if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
dict.set(key, value); dict.set(key, value);

View File

@ -2,6 +2,7 @@ use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use ffmpeg_next::log::Level;
use tracing::{debug, error}; use tracing::{debug, error};
@ -42,6 +43,7 @@ impl Worker {
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
ffmpeg_next::init() ffmpeg_next::init()
.unwrap_or_else(|err| tracing::error!("couldn't init FFmpeg: {:?}", err)); .unwrap_or_else(|err| tracing::error!("couldn't init FFmpeg: {:?}", err));
ffmpeg_next::util::log::set_level(Level::Quiet);
loop { loop {
let task = { let task = {

View File

@ -1,7 +1,6 @@
extern crate ffmpeg_next as ffmpeg; extern crate ffmpeg_next as ffmpeg;
use std::error::Error; use std::error::Error;
use std::sync::Arc;
use ffmpeg::{codec, filter, format, frame, media}; use ffmpeg::{codec, filter, format, frame, media};
use ffmpeg_next::error::EAGAIN; use ffmpeg_next::error::EAGAIN;
@ -35,15 +34,19 @@ impl Transcoder {
.ok_or("could not find best audio stream")?; .ok_or("could not find best audio stream")?;
let context = codec::context::Context::from_parameters(input.parameters())?; let context = codec::context::Context::from_parameters(input.parameters())?;
let mut decoder = context.decoder().audio().map_err(|err| { let mut decoder = context
format!("couldn't find decoder for input file: {}", err) .decoder()
})?; .audio()
.map_err(|err| format!("couldn't find decoder for input file: {}", err))?;
let codec = ffmpeg::encoder::find_by_name(&*params.codec) let codec = ffmpeg::encoder::find_by_name(&params.codec)
.ok_or_else(|| format!("couldn't find codec with name: {}", params.codec))? .ok_or_else(|| format!("couldn't find codec with name: {}", params.codec))?
.audio()?; .audio()?;
let global = octx.format().flags().contains(format::flag::Flags::GLOBAL_HEADER); let global = octx
.format()
.flags()
.contains(format::flag::Flags::GLOBAL_HEADER);
decoder.set_parameters(input.parameters())?; decoder.set_parameters(input.parameters())?;
@ -70,13 +73,26 @@ impl Transcoder {
encoder.set_format( encoder.set_format(
codec codec
.formats() .formats()
.ok_or_else(|| format!("failed to get supported formats for codec: {}", codec.name()))? .ok_or_else(|| {
format!(
"failed to get supported formats for codec: {}",
codec.name()
)
})?
.next() .next()
.ok_or("no supported formats found for codec")?, .ok_or("no supported formats found for codec")?,
); );
encoder.set_bit_rate(if params.bit_rate > 0 { params.bit_rate } else { decoder.bit_rate() }); encoder.set_bit_rate(if params.bit_rate > 0 {
encoder.set_max_bit_rate(if params.max_bit_rate > 0 { params.max_bit_rate } else { decoder.max_bit_rate() }); params.bit_rate
} else {
decoder.bit_rate()
});
encoder.set_max_bit_rate(if params.max_bit_rate > 0 {
params.max_bit_rate
} else {
decoder.max_bit_rate()
});
encoder.set_time_base((1, sample_rate)); encoder.set_time_base((1, sample_rate));
output.set_time_base((1, sample_rate)); output.set_time_base((1, sample_rate));
@ -121,11 +137,26 @@ impl Transcoder {
} }
fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> { fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> {
self.filter.get("in").unwrap().source().add(frame) if let Some(mut ctx) = self.filter.get("in") {
let mut source = ctx.source();
source.add(frame)
} else {
Err(ffmpeg::Error::Other {
errno: 0,
})
}
} }
pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> { pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> {
self.filter.get("in").unwrap().source().flush() if let Some(mut ctx) = self.filter.get("in") {
let mut source = ctx.source();
source.flush()
} else {
Err(ffmpeg::Error::Other {
errno: 0,
})
}
} }
pub(crate) fn get_and_process_filtered_frames( pub(crate) fn get_and_process_filtered_frames(
@ -134,7 +165,10 @@ impl Transcoder {
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let mut filtered = frame::Audio::empty(); let mut filtered = frame::Audio::empty();
loop { loop {
let mut ctx = self.filter.get("out").ok_or("cannot get context from filter")?; let mut ctx = self
.filter
.get("out")
.ok_or("cannot get context from filter")?;
if let Err(err) = ctx.sink().frame(&mut filtered) { if let Err(err) = ctx.sink().frame(&mut filtered) {
if err != ffmpeg::Error::Eof { if err != ffmpeg::Error::Eof {
@ -171,8 +205,12 @@ impl Transcoder {
if let Err(mut err) = self.get_and_process_filtered_frames(octx) { if let Err(mut err) = self.get_and_process_filtered_frames(octx) {
let expected = ffmpeg::Error::Other { errno: EAGAIN }; let expected = ffmpeg::Error::Other { errno: EAGAIN };
if err.downcast_mut::<ffmpeg::error::Error>().ok_or(ffmpeg::Error::Bug) == Err(expected) { if err
continue .downcast_mut::<ffmpeg::error::Error>()
.ok_or(ffmpeg::Error::Bug)
== Err(expected)
{
continue;
} }
} }
} }
@ -206,11 +244,16 @@ fn filter_graph(
filter.output("in", 0)?.input("out", 0)?.parse(spec)?; filter.output("in", 0)?.input("out", 0)?.parse(spec)?;
filter.validate()?; filter.validate()?;
println!("{}", filter.dump());
if let Some(codec) = encoder.codec() { if let Some(codec) = encoder.codec() {
if !codec.capabilities().contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE) { if !codec
filter.get("out").unwrap().sink().set_frame_size(encoder.frame_size()); .capabilities()
.contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE)
{
filter
.get("out")
.unwrap()
.sink()
.set_frame_size(encoder.frame_size());
} }
} }