From b78f1efecf8e59a923b82652d007a1baca3ea443 Mon Sep 17 00:00:00 2001 From: Neur0toxine Date: Sat, 25 May 2024 21:29:53 +0300 Subject: [PATCH] wip: transcoding --- src/dto.rs | 1 + src/server.rs | 11 +++-- src/task.rs | 85 ++++++++++++++++++++++++++++-------- src/thread_pool.rs | 4 +- src/transcoder.rs | 105 ++++++++++++++++++++++++++++++++++----------- 5 files changed, 155 insertions(+), 51 deletions(-) diff --git a/src/dto.rs b/src/dto.rs index b0d013e..012617f 100644 --- a/src/dto.rs +++ b/src/dto.rs @@ -17,6 +17,7 @@ pub struct ConvertResponse { #[try_from_multipart(rename_all = "camelCase")] pub struct ConvertRequest { pub codec: String, + pub codec_opts: Option, pub bit_rate: usize, pub max_bit_rate: usize, pub sample_rate: i32, diff --git a/src/server.rs b/src/server.rs index 505cfcd..61eda9f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,10 @@ use std::path::Path; use std::sync::Arc; -use axum::{Json, Router}; use axum::extract::{DefaultBodyLimit, State}; use axum::http::StatusCode; use axum::routing::post; +use axum::{Json, Router}; use axum_typed_multipart::TypedMultipart; use tokio::net::TcpListener; use tower_http::trace::TraceLayer; @@ -34,8 +34,7 @@ impl Server { let app = Router::new() .route( "/enqueue", - post(enqueue_file) - .layer(DefaultBodyLimit::max(CONTENT_LENGTH_LIMIT)), + post(enqueue_file).layer(DefaultBodyLimit::max(CONTENT_LENGTH_LIMIT)), ) .with_state(this) .layer(TraceLayer::new_for_http()); @@ -54,8 +53,7 @@ async fn enqueue_file( TypedMultipart(req): TypedMultipart, ) -> (StatusCode, Json) { let task_id = Uuid::new_v4(); - let input = - Path::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id.to_string())); + let input = Path::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id.to_string())); let output = Path::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id.to_string())); @@ -79,6 +77,7 @@ async fn enqueue_file( let task = Task::new( task_id, req.codec, + req.codec_opts, req.bit_rate, req.max_bit_rate, req.sample_rate, @@ -107,4 +106,4 @@ async fn enqueue_file( }), ), } -} \ No newline at end of file +} diff --git a/src/task.rs b/src/task.rs index 8e16649..00a5fa8 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,11 +1,14 @@ -use tracing::{debug, error}; -use ffmpeg_next::{Error, format}; -use ffmpeg_next::channel_layout::ChannelLayout; use crate::transcoder::{Transcoder, TranscoderParams}; +use ffmpeg_next::channel_layout::ChannelLayout; +use ffmpeg_next::format::context; +use ffmpeg_next::format::context::Input; +use ffmpeg_next::{format, Dictionary, Error}; +use tracing::{debug, error}; pub struct Task { id: uuid::Uuid, codec: String, + codec_opts: Option, bit_rate: usize, max_bit_rate: usize, sample_rate: i32, @@ -19,6 +22,7 @@ impl Task { pub fn new( id: uuid::Uuid, codec: String, + codec_opts: Option, bit_rate: usize, max_bit_rate: usize, sample_rate: i32, @@ -30,6 +34,7 @@ impl Task { Task { id, codec, + codec_opts, bit_rate, max_bit_rate, sample_rate, @@ -41,26 +46,58 @@ impl Task { } pub fn execute(self) { - debug!("performing transcoding for task with id: {}", self.id.to_string()); - let mut ictx = format::input(&self.input_path).unwrap(); - let mut octx = format::output_as(&self.output_path, &self.codec).unwrap(); - let transcoder = Transcoder::new(&mut ictx, &mut octx, TranscoderParams { - codec: self.codec, - bit_rate: self.bit_rate, - max_bit_rate: self.max_bit_rate, - sample_rate: self.sample_rate, - channel_layout: match self.channel_layout.as_str() { - "stereo" => ChannelLayout::STEREO, - "mono" => ChannelLayout::MONO, - "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, - _ => ChannelLayout::STEREO, + debug!( + "performing transcoding for task with id: {}", + self.id.to_string() + ); + let mut ictx = match format::input(&self.input_path) { + Ok(val) => val, + Err(err) => { + error!("couldn't initialize input context: {:?}", err); + return; } - }); + }; + + let octx: Result; + if self.codec_opts.is_some() { + octx = format::output_as_with( + &self.output_path, + &self.codec, + params_to_avdictionary(&self.codec_opts.unwrap_or_default()), + ); + } else { + octx = format::output_as(&self.output_path, &self.codec); + } + + let mut octx = match octx { + Ok(val) => val, + Err(err) => { + error!("couldn't initialize output context: {:?}", err); + return; + } + }; + + let transcoder = Transcoder::new( + &mut ictx, + &mut octx, + TranscoderParams { + codec: self.codec, + bit_rate: self.bit_rate, + max_bit_rate: self.max_bit_rate, + sample_rate: self.sample_rate, + channel_layout: match self.channel_layout.as_str() { + "stereo" => ChannelLayout::STEREO, + "mono" => ChannelLayout::MONO, + "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, + _ => ChannelLayout::STEREO, + }, + }, + ); let mut transcoder = match transcoder { Ok(val) => val, Err(err) => { error!("couldn't initialize FFmpeg transcoder: {:?}", err); - return + return; } }; octx.set_metadata(ictx.metadata().to_owned()); @@ -88,3 +125,15 @@ impl Task { .unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err)); } } + +fn params_to_avdictionary(input: &str) -> Dictionary { + let mut dict: Dictionary = Dictionary::new(); + for pair in input.split(";") { + let mut parts = pair.split(":"); + + if let (Some(key), Some(value)) = (parts.next(), parts.next()) { + dict.set(key, value); + } + } + dict +} diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 762cc13..14de4dd 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -1,9 +1,9 @@ -use std::sync::{Arc, Mutex}; use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use tracing::{error, debug}; +use tracing::{debug, error}; use crate::task::Task; diff --git a/src/transcoder.rs b/src/transcoder.rs index 9351b4d..e28c3b3 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -4,6 +4,8 @@ use std::error::Error; use std::sync::Arc; use ffmpeg::{codec, filter, format, frame, media}; +use ffmpeg_next::codec::Audio; +use ffmpeg_next::Codec; pub struct Transcoder { params: Arc, @@ -28,16 +30,27 @@ impl Transcoder { ictx: &mut format::context::Input, octx: &mut format::context::Output, params: TranscoderParams, - ) -> Result { + ) -> Result> { let input = ictx .streams() .best(media::Type::Audio) .expect("could not find best audio stream"); let context = codec::context::Context::from_parameters(input.parameters())?; - let mut decoder = context.decoder().audio()?; - let codec = ffmpeg::encoder::find_by_name(&*params.codec) - .expect(format!("failed to find encoder with name: {}", params.codec).as_str()) - .audio()?; + let mut decoder = match context.decoder().audio() { + Ok(val) => val, + Err(err) => { + return Err( + format!("couldn't find decoder for input file: {}", err.to_string()).into(), + ) + } + }; + let codec = match ffmpeg::encoder::find_by_name(&*params.codec) { + None => return Err(format!("couldn't find codec with name: {}", params.codec).into()), + Some(val) => match val.audio() { + Ok(val) => val, + Err(err) => return Err(err.into()), + }, + }; let global = octx .format() .flags() @@ -68,7 +81,13 @@ impl Transcoder { encoder.set_format( codec .formats() - .expect(format!("failed to get supported formats for codec: {}", codec.name()).as_str()) + .expect( + format!( + "failed to get supported formats for codec: {}", + codec.name() + ) + .as_str(), + ) .next() .unwrap(), ); @@ -115,13 +134,21 @@ impl Transcoder { self.encoder.send_eof().unwrap(); } - pub(crate) fn receive_and_process_encoded_packets(&mut self, octx: &mut format::context::Output) { + pub(crate) fn receive_and_process_encoded_packets( + &mut self, + octx: &mut format::context::Output, + ) -> Result<(), Box> { let mut encoded = ffmpeg::Packet::empty(); while self.encoder.receive_packet(&mut encoded).is_ok() { encoded.set_stream(0); encoded.rescale_ts(self.in_time_base, self.out_time_base); - encoded.write_interleaved(octx).unwrap(); + + match encoded.write_interleaved(octx) { + Err(err) => return Err(err.into()), + Ok(_) => (), + } } + Ok(()) } fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) { @@ -132,37 +159,65 @@ impl Transcoder { self.filter.get("in").unwrap().source().flush().unwrap(); } - pub(crate) fn get_and_process_filtered_frames(&mut self, octx: &mut format::context::Output) { + pub(crate) fn get_and_process_filtered_frames( + &mut self, + octx: &mut format::context::Output, + ) -> Result<(), Box> { let mut filtered = frame::Audio::empty(); - while self - .filter - .get("out") - .unwrap() - .sink() - .frame(&mut filtered) - .is_ok() - { - self.send_frame_to_encoder(&filtered); - self.receive_and_process_encoded_packets(octx); + loop { + let mut ctx: ffmpeg::filter::Context = match self.filter.get("out") { + None => return Err(Box::from("cannot get context from filter")), + Some(val) => val, + }; + + if !ctx.sink().frame(&mut filtered).is_ok() { + return Err(Box::from("frame is suddenly invalid, stopping...")); + } + + match self.send_frame_to_encoder(&filtered) { + Err(err) => return Err(err.into()), + Ok(_) => (), + }; + match self.receive_and_process_encoded_packets(octx) { + Err(err) => return Err(err.into()), + Ok(_) => (), + } } } - pub(crate) fn send_packet_to_decoder(&mut self, packet: &ffmpeg::Packet) { - self.decoder.send_packet(packet).unwrap(); + pub(crate) fn send_packet_to_decoder( + &mut self, + packet: &ffmpeg::Packet, + ) -> Result<(), Box> { + match self.decoder.send_packet(packet) { + Err(err) => return Err(err.into()), + Ok(_) => Ok(()), + } } - pub(crate) fn send_eof_to_decoder(&mut self) { - self.decoder.send_eof().unwrap(); + pub(crate) fn send_eof_to_decoder(&mut self) -> Result<(), Box> { + match self.decoder.send_eof() { + Err(err) => return Err(err.into()), + Ok(_) => Ok(()), + } } - pub(crate) fn receive_and_process_decoded_frames(&mut self, octx: &mut format::context::Output) { + pub(crate) fn receive_and_process_decoded_frames( + &mut self, + octx: &mut format::context::Output, + ) -> Result<(), Box> { let mut decoded = frame::Audio::empty(); while self.decoder.receive_frame(&mut decoded).is_ok() { let timestamp = decoded.timestamp(); decoded.set_pts(timestamp); self.add_frame_to_filter(&decoded); - self.get_and_process_filtered_frames(octx); + + match self.get_and_process_filtered_frames(octx) { + Err(err) => return Err(err.into()), + Ok(_) => (), + } } + Ok(()) } }