wip: transcoding

This commit is contained in:
Pavel 2024-05-25 21:29:53 +03:00
parent 973e69640c
commit b78f1efecf
5 changed files with 155 additions and 51 deletions

View File

@ -17,6 +17,7 @@ pub struct ConvertResponse {
#[try_from_multipart(rename_all = "camelCase")] #[try_from_multipart(rename_all = "camelCase")]
pub struct ConvertRequest { pub struct ConvertRequest {
pub codec: String, pub codec: String,
pub codec_opts: Option<String>,
pub bit_rate: usize, pub bit_rate: usize,
pub max_bit_rate: usize, pub max_bit_rate: usize,
pub sample_rate: i32, pub sample_rate: i32,

View File

@ -1,10 +1,10 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use axum::{Json, Router};
use axum::extract::{DefaultBodyLimit, State}; use axum::extract::{DefaultBodyLimit, State};
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::routing::post; use axum::routing::post;
use axum::{Json, Router};
use axum_typed_multipart::TypedMultipart; use axum_typed_multipart::TypedMultipart;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
@ -34,8 +34,7 @@ impl Server {
let app = Router::new() let app = Router::new()
.route( .route(
"/enqueue", "/enqueue",
post(enqueue_file) post(enqueue_file).layer(DefaultBodyLimit::max(CONTENT_LENGTH_LIMIT)),
.layer(DefaultBodyLimit::max(CONTENT_LENGTH_LIMIT)),
) )
.with_state(this) .with_state(this)
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());
@ -54,8 +53,7 @@ async fn enqueue_file(
TypedMultipart(req): TypedMultipart<ConvertRequest>, TypedMultipart(req): TypedMultipart<ConvertRequest>,
) -> (StatusCode, Json<ConvertResponse>) { ) -> (StatusCode, Json<ConvertResponse>) {
let task_id = Uuid::new_v4(); let task_id = Uuid::new_v4();
let input = let input = Path::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id.to_string()));
Path::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id.to_string()));
let output = let output =
Path::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id.to_string())); Path::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id.to_string()));
@ -79,6 +77,7 @@ async fn enqueue_file(
let task = Task::new( let task = Task::new(
task_id, task_id,
req.codec, req.codec,
req.codec_opts,
req.bit_rate, req.bit_rate,
req.max_bit_rate, req.max_bit_rate,
req.sample_rate, req.sample_rate,
@ -107,4 +106,4 @@ async fn enqueue_file(
}), }),
), ),
} }
} }

View File

@ -1,11 +1,14 @@
use tracing::{debug, error};
use ffmpeg_next::{Error, format};
use ffmpeg_next::channel_layout::ChannelLayout;
use crate::transcoder::{Transcoder, TranscoderParams}; use crate::transcoder::{Transcoder, TranscoderParams};
use ffmpeg_next::channel_layout::ChannelLayout;
use ffmpeg_next::format::context;
use ffmpeg_next::format::context::Input;
use ffmpeg_next::{format, Dictionary, Error};
use tracing::{debug, error};
pub struct Task { pub struct Task {
id: uuid::Uuid, id: uuid::Uuid,
codec: String, codec: String,
codec_opts: Option<String>,
bit_rate: usize, bit_rate: usize,
max_bit_rate: usize, max_bit_rate: usize,
sample_rate: i32, sample_rate: i32,
@ -19,6 +22,7 @@ impl Task {
pub fn new( pub fn new(
id: uuid::Uuid, id: uuid::Uuid,
codec: String, codec: String,
codec_opts: Option<String>,
bit_rate: usize, bit_rate: usize,
max_bit_rate: usize, max_bit_rate: usize,
sample_rate: i32, sample_rate: i32,
@ -30,6 +34,7 @@ impl Task {
Task { Task {
id, id,
codec, codec,
codec_opts,
bit_rate, bit_rate,
max_bit_rate, max_bit_rate,
sample_rate, sample_rate,
@ -41,26 +46,58 @@ impl Task {
} }
pub fn execute(self) { pub fn execute(self) {
debug!("performing transcoding for task with id: {}", self.id.to_string()); debug!(
let mut ictx = format::input(&self.input_path).unwrap(); "performing transcoding for task with id: {}",
let mut octx = format::output_as(&self.output_path, &self.codec).unwrap(); self.id.to_string()
let transcoder = Transcoder::new(&mut ictx, &mut octx, TranscoderParams { );
codec: self.codec, let mut ictx = match format::input(&self.input_path) {
bit_rate: self.bit_rate, Ok(val) => val,
max_bit_rate: self.max_bit_rate, Err(err) => {
sample_rate: self.sample_rate, error!("couldn't initialize input context: {:?}", err);
channel_layout: match self.channel_layout.as_str() { return;
"stereo" => ChannelLayout::STEREO,
"mono" => ChannelLayout::MONO,
"stereo_downmix" => ChannelLayout::STEREO_DOWNMIX,
_ => ChannelLayout::STEREO,
} }
}); };
let octx: Result<context::Output, ffmpeg_next::Error>;
if self.codec_opts.is_some() {
octx = format::output_as_with(
&self.output_path,
&self.codec,
params_to_avdictionary(&self.codec_opts.unwrap_or_default()),
);
} else {
octx = format::output_as(&self.output_path, &self.codec);
}
let mut octx = match octx {
Ok(val) => val,
Err(err) => {
error!("couldn't initialize output context: {:?}", err);
return;
}
};
let transcoder = Transcoder::new(
&mut ictx,
&mut octx,
TranscoderParams {
codec: self.codec,
bit_rate: self.bit_rate,
max_bit_rate: self.max_bit_rate,
sample_rate: self.sample_rate,
channel_layout: match self.channel_layout.as_str() {
"stereo" => ChannelLayout::STEREO,
"mono" => ChannelLayout::MONO,
"stereo_downmix" => ChannelLayout::STEREO_DOWNMIX,
_ => ChannelLayout::STEREO,
},
},
);
let mut transcoder = match transcoder { let mut transcoder = match transcoder {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
error!("couldn't initialize FFmpeg transcoder: {:?}", err); error!("couldn't initialize FFmpeg transcoder: {:?}", err);
return return;
} }
}; };
octx.set_metadata(ictx.metadata().to_owned()); octx.set_metadata(ictx.metadata().to_owned());
@ -88,3 +125,15 @@ impl Task {
.unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err)); .unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err));
} }
} }
fn params_to_avdictionary(input: &str) -> Dictionary {
let mut dict: Dictionary = Dictionary::new();
for pair in input.split(";") {
let mut parts = pair.split(":");
if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
dict.set(key, value);
}
}
dict
}

View File

@ -1,9 +1,9 @@
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use tracing::{error, debug}; use tracing::{debug, error};
use crate::task::Task; use crate::task::Task;

View File

@ -4,6 +4,8 @@ use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use ffmpeg::{codec, filter, format, frame, media}; use ffmpeg::{codec, filter, format, frame, media};
use ffmpeg_next::codec::Audio;
use ffmpeg_next::Codec;
pub struct Transcoder { pub struct Transcoder {
params: Arc<TranscoderParams>, params: Arc<TranscoderParams>,
@ -28,16 +30,27 @@ impl Transcoder {
ictx: &mut format::context::Input, ictx: &mut format::context::Input,
octx: &mut format::context::Output, octx: &mut format::context::Output,
params: TranscoderParams, params: TranscoderParams,
) -> Result<Transcoder, ffmpeg::Error> { ) -> Result<Transcoder, Box<dyn Error>> {
let input = ictx let input = ictx
.streams() .streams()
.best(media::Type::Audio) .best(media::Type::Audio)
.expect("could not find best audio stream"); .expect("could not find best audio stream");
let context = codec::context::Context::from_parameters(input.parameters())?; let context = codec::context::Context::from_parameters(input.parameters())?;
let mut decoder = context.decoder().audio()?; let mut decoder = match context.decoder().audio() {
let codec = ffmpeg::encoder::find_by_name(&*params.codec) Ok(val) => val,
.expect(format!("failed to find encoder with name: {}", params.codec).as_str()) Err(err) => {
.audio()?; return Err(
format!("couldn't find decoder for input file: {}", err.to_string()).into(),
)
}
};
let codec = match ffmpeg::encoder::find_by_name(&*params.codec) {
None => return Err(format!("couldn't find codec with name: {}", params.codec).into()),
Some(val) => match val.audio() {
Ok(val) => val,
Err(err) => return Err(err.into()),
},
};
let global = octx let global = octx
.format() .format()
.flags() .flags()
@ -68,7 +81,13 @@ impl Transcoder {
encoder.set_format( encoder.set_format(
codec codec
.formats() .formats()
.expect(format!("failed to get supported formats for codec: {}", codec.name()).as_str()) .expect(
format!(
"failed to get supported formats for codec: {}",
codec.name()
)
.as_str(),
)
.next() .next()
.unwrap(), .unwrap(),
); );
@ -115,13 +134,21 @@ impl Transcoder {
self.encoder.send_eof().unwrap(); self.encoder.send_eof().unwrap();
} }
pub(crate) fn receive_and_process_encoded_packets(&mut self, octx: &mut format::context::Output) { pub(crate) fn receive_and_process_encoded_packets(
&mut self,
octx: &mut format::context::Output,
) -> Result<(), Box<dyn Error>> {
let mut encoded = ffmpeg::Packet::empty(); let mut encoded = ffmpeg::Packet::empty();
while self.encoder.receive_packet(&mut encoded).is_ok() { while self.encoder.receive_packet(&mut encoded).is_ok() {
encoded.set_stream(0); encoded.set_stream(0);
encoded.rescale_ts(self.in_time_base, self.out_time_base); encoded.rescale_ts(self.in_time_base, self.out_time_base);
encoded.write_interleaved(octx).unwrap();
match encoded.write_interleaved(octx) {
Err(err) => return Err(err.into()),
Ok(_) => (),
}
} }
Ok(())
} }
fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) { fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) {
@ -132,37 +159,65 @@ impl Transcoder {
self.filter.get("in").unwrap().source().flush().unwrap(); self.filter.get("in").unwrap().source().flush().unwrap();
} }
pub(crate) fn get_and_process_filtered_frames(&mut self, octx: &mut format::context::Output) { pub(crate) fn get_and_process_filtered_frames(
&mut self,
octx: &mut format::context::Output,
) -> Result<(), Box<dyn Error>> {
let mut filtered = frame::Audio::empty(); let mut filtered = frame::Audio::empty();
while self loop {
.filter let mut ctx: ffmpeg::filter::Context = match self.filter.get("out") {
.get("out") None => return Err(Box::from("cannot get context from filter")),
.unwrap() Some(val) => val,
.sink() };
.frame(&mut filtered)
.is_ok() if !ctx.sink().frame(&mut filtered).is_ok() {
{ return Err(Box::from("frame is suddenly invalid, stopping..."));
self.send_frame_to_encoder(&filtered); }
self.receive_and_process_encoded_packets(octx);
match self.send_frame_to_encoder(&filtered) {
Err(err) => return Err(err.into()),
Ok(_) => (),
};
match self.receive_and_process_encoded_packets(octx) {
Err(err) => return Err(err.into()),
Ok(_) => (),
}
} }
} }
pub(crate) fn send_packet_to_decoder(&mut self, packet: &ffmpeg::Packet) { pub(crate) fn send_packet_to_decoder(
self.decoder.send_packet(packet).unwrap(); &mut self,
packet: &ffmpeg::Packet,
) -> Result<(), Box<dyn Error>> {
match self.decoder.send_packet(packet) {
Err(err) => return Err(err.into()),
Ok(_) => Ok(()),
}
} }
pub(crate) fn send_eof_to_decoder(&mut self) { pub(crate) fn send_eof_to_decoder(&mut self) -> Result<(), Box<dyn Error>> {
self.decoder.send_eof().unwrap(); match self.decoder.send_eof() {
Err(err) => return Err(err.into()),
Ok(_) => Ok(()),
}
} }
pub(crate) fn receive_and_process_decoded_frames(&mut self, octx: &mut format::context::Output) { pub(crate) fn receive_and_process_decoded_frames(
&mut self,
octx: &mut format::context::Output,
) -> Result<(), Box<dyn Error>> {
let mut decoded = frame::Audio::empty(); let mut decoded = frame::Audio::empty();
while self.decoder.receive_frame(&mut decoded).is_ok() { while self.decoder.receive_frame(&mut decoded).is_ok() {
let timestamp = decoded.timestamp(); let timestamp = decoded.timestamp();
decoded.set_pts(timestamp); decoded.set_pts(timestamp);
self.add_frame_to_filter(&decoded); self.add_frame_to_filter(&decoded);
self.get_and_process_filtered_frames(octx);
match self.get_and_process_filtered_frames(octx) {
Err(err) => return Err(err.into()),
Ok(_) => (),
}
} }
Ok(())
} }
} }