diff --git a/Cargo.toml b/Cargo.toml index 2a547c6..9314875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } tower-http = { version = "0.5.2", features = ["trace"] } num_cpus = "1.16.0" +ffmpeg-next = "7.0.1" \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d9e8cda --- /dev/null +++ b/Dockerfile @@ -0,0 +1,2 @@ +FROM alpine:latest +RUN apk add --no-cache ffmpeg-libavutil ffmpeg-libavformat ffmpeg-libavfilter ffmpeg-libavdevice clang16 \ No newline at end of file diff --git a/src/dto.rs b/src/dto.rs index 6e2cbd4..b0d013e 100644 --- a/src/dto.rs +++ b/src/dto.rs @@ -19,6 +19,7 @@ pub struct ConvertRequest { pub codec: String, pub bit_rate: usize, pub max_bit_rate: usize, + pub sample_rate: i32, pub channel_layout: String, pub upload_url: String, diff --git a/src/main.rs b/src/main.rs index 4156540..e9ecebd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod dto; mod server; mod task; mod thread_pool; +mod transcoder; #[tokio::main] async fn main() { diff --git a/src/server.rs b/src/server.rs index d9049ab..505cfcd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use std::path::Path; use std::sync::Arc; -use axum::{debug_handler, Json, Router}; +use axum::{Json, Router}; use axum::extract::{DefaultBodyLimit, State}; use axum::http::StatusCode; use axum::routing::post; @@ -81,6 +81,7 @@ async fn enqueue_file( req.codec, req.bit_rate, req.max_bit_rate, + req.sample_rate, req.channel_layout, req.upload_url, input_path.unwrap().to_string(), diff --git a/src/task.rs b/src/task.rs index 0003ecd..8e16649 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,10 +1,14 @@ -use tracing::debug; +use tracing::{debug, error}; +use ffmpeg_next::{Error, format}; +use ffmpeg_next::channel_layout::ChannelLayout; +use crate::transcoder::{Transcoder, TranscoderParams}; pub struct Task { id: uuid::Uuid, codec: String, bit_rate: usize, max_bit_rate: usize, + sample_rate: i32, channel_layout: String, input_path: String, output_path: String, @@ -17,6 +21,7 @@ impl Task { codec: String, bit_rate: usize, max_bit_rate: usize, + sample_rate: i32, channel_layout: String, upload_url: String, input_path: String, @@ -27,6 +32,7 @@ impl Task { codec, bit_rate, max_bit_rate, + sample_rate, channel_layout, input_path, output_path, @@ -34,7 +40,51 @@ impl Task { } } - pub fn execute(&self) { - debug!("Executing task with id: {}", self.id.to_string()); + pub fn execute(self) { + debug!("performing transcoding for task with id: {}", self.id.to_string()); + let mut ictx = format::input(&self.input_path).unwrap(); + let mut octx = format::output_as(&self.output_path, &self.codec).unwrap(); + let transcoder = Transcoder::new(&mut ictx, &mut octx, TranscoderParams { + codec: self.codec, + bit_rate: self.bit_rate, + max_bit_rate: self.max_bit_rate, + sample_rate: self.sample_rate, + channel_layout: match self.channel_layout.as_str() { + "stereo" => ChannelLayout::STEREO, + "mono" => ChannelLayout::MONO, + "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, + _ => ChannelLayout::STEREO, + } + }); + let mut transcoder = match transcoder { + Ok(val) => val, + Err(err) => { + error!("couldn't initialize FFmpeg transcoder: {:?}", err); + return + } + }; + octx.set_metadata(ictx.metadata().to_owned()); + octx.write_header() + .unwrap_or_else(|err| error!("couldn't start transcoding: {:?}", err)); + + for (stream, mut packet) in ictx.packets() { + if stream.index() == transcoder.stream { + packet.rescale_ts(stream.time_base(), transcoder.in_time_base); + transcoder.send_packet_to_decoder(&packet); + transcoder.receive_and_process_decoded_frames(&mut octx); + } + } + + transcoder.send_eof_to_decoder(); + transcoder.receive_and_process_decoded_frames(&mut octx); + + transcoder.flush_filter(); + transcoder.get_and_process_filtered_frames(&mut octx); + + transcoder.send_eof_to_encoder(); + transcoder.receive_and_process_encoded_packets(&mut octx); + + octx.write_trailer() + .unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err)); } } diff --git a/src/thread_pool.rs b/src/thread_pool.rs index a645304..762cc13 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -39,20 +39,25 @@ struct Worker { impl Worker { fn new(id: usize, receiver: Arc>>) -> Self { - let thread = thread::spawn(move || loop { - let task = { - let lock = receiver.lock().unwrap(); - lock.recv() - }; + let thread = thread::spawn(move || { + ffmpeg_next::init() + .unwrap_or_else(|err| tracing::error!("couldn't init FFmpeg: {:?}", err)); - match task { - Ok(task) => { - debug!("worker {} got a task; executing.", id); - task.execute(); - } - Err(e) => { - error!("worker {} failed to receive task: {:?}", id, e); - thread::sleep(Duration::from_secs(1)); // sleep to avoid busy-looping + loop { + let task = { + let lock = receiver.lock().unwrap(); + lock.recv() + }; + + match task { + Ok(task) => { + debug!("worker {} got a task; executing.", id); + task.execute(); + } + Err(e) => { + error!("worker {} failed to receive task: {:?}", id, e); + thread::sleep(Duration::from_secs(1)); // sleep to avoid busy-looping + } } } }); diff --git a/src/transcoder.rs b/src/transcoder.rs new file mode 100644 index 0000000..9351b4d --- /dev/null +++ b/src/transcoder.rs @@ -0,0 +1,214 @@ +extern crate ffmpeg_next as ffmpeg; + +use std::error::Error; +use std::sync::Arc; + +use ffmpeg::{codec, filter, format, frame, media}; + +pub struct Transcoder { + params: Arc, + pub(crate) stream: usize, + filter: filter::Graph, + decoder: codec::decoder::Audio, + encoder: codec::encoder::Audio, + pub(crate) in_time_base: ffmpeg::Rational, + out_time_base: ffmpeg::Rational, +} + +pub struct TranscoderParams { + pub codec: String, + pub bit_rate: usize, + pub max_bit_rate: usize, + pub sample_rate: i32, + pub channel_layout: ffmpeg::channel_layout::ChannelLayout, +} + +impl Transcoder { + pub fn new( + ictx: &mut format::context::Input, + octx: &mut format::context::Output, + params: TranscoderParams, + ) -> Result { + let input = ictx + .streams() + .best(media::Type::Audio) + .expect("could not find best audio stream"); + let context = codec::context::Context::from_parameters(input.parameters())?; + let mut decoder = context.decoder().audio()?; + let codec = ffmpeg::encoder::find_by_name(&*params.codec) + .expect(format!("failed to find encoder with name: {}", params.codec).as_str()) + .audio()?; + let global = octx + .format() + .flags() + .contains(format::flag::Flags::GLOBAL_HEADER); + + decoder.set_parameters(input.parameters())?; + + let mut output = octx.add_stream(codec)?; + let context = codec::context::Context::from_parameters(output.parameters())?; + let mut encoder = context.encoder().audio()?; + + if global { + encoder.set_flags(codec::flag::Flags::GLOBAL_HEADER); + } + + let sample_rate = if params.sample_rate > 0 { + params.sample_rate + } else { + decoder.rate() as i32 + }; + + encoder.set_rate(sample_rate); + encoder.set_channel_layout(params.channel_layout); + #[cfg(not(feature = "ffmpeg_7_0"))] + { + encoder.set_channels(params.channel_layout.channels()); + } + encoder.set_format( + codec + .formats() + .expect(format!("failed to get supported formats for codec: {}", codec.name()).as_str()) + .next() + .unwrap(), + ); + + if params.bit_rate > 0 { + encoder.set_bit_rate(params.bit_rate); + } else { + encoder.set_bit_rate(decoder.bit_rate()); + } + + if params.max_bit_rate > 0 { + encoder.set_max_bit_rate(params.bit_rate); + } else { + encoder.set_max_bit_rate(decoder.max_bit_rate()); + } + + encoder.set_time_base((1, sample_rate)); + output.set_time_base((1, sample_rate)); + + let encoder = encoder.open_as(codec)?; + output.set_parameters(&encoder); + + let filter = filter("anull", &decoder, &encoder)?; + + let in_time_base = decoder.time_base(); + let out_time_base = output.time_base(); + + Ok(Transcoder { + stream: input.index(), + params: Arc::new(params), + filter, + decoder, + encoder, + in_time_base, + out_time_base, + }) + } + + fn send_frame_to_encoder(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> { + self.encoder.send_frame(frame) + } + + pub(crate) fn send_eof_to_encoder(&mut self) { + self.encoder.send_eof().unwrap(); + } + + pub(crate) fn receive_and_process_encoded_packets(&mut self, octx: &mut format::context::Output) { + let mut encoded = ffmpeg::Packet::empty(); + while self.encoder.receive_packet(&mut encoded).is_ok() { + encoded.set_stream(0); + encoded.rescale_ts(self.in_time_base, self.out_time_base); + encoded.write_interleaved(octx).unwrap(); + } + } + + fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) { + self.filter.get("in").unwrap().source().add(frame).unwrap(); + } + + pub(crate) fn flush_filter(&mut self) { + self.filter.get("in").unwrap().source().flush().unwrap(); + } + + pub(crate) fn get_and_process_filtered_frames(&mut self, octx: &mut format::context::Output) { + let mut filtered = frame::Audio::empty(); + while self + .filter + .get("out") + .unwrap() + .sink() + .frame(&mut filtered) + .is_ok() + { + self.send_frame_to_encoder(&filtered); + self.receive_and_process_encoded_packets(octx); + } + } + + pub(crate) fn send_packet_to_decoder(&mut self, packet: &ffmpeg::Packet) { + self.decoder.send_packet(packet).unwrap(); + } + + pub(crate) fn send_eof_to_decoder(&mut self) { + self.decoder.send_eof().unwrap(); + } + + pub(crate) fn receive_and_process_decoded_frames(&mut self, octx: &mut format::context::Output) { + let mut decoded = frame::Audio::empty(); + while self.decoder.receive_frame(&mut decoded).is_ok() { + let timestamp = decoded.timestamp(); + decoded.set_pts(timestamp); + self.add_frame_to_filter(&decoded); + self.get_and_process_filtered_frames(octx); + } + } +} + +fn filter( + spec: &str, + decoder: &codec::decoder::Audio, + encoder: &codec::encoder::Audio, +) -> Result { + let mut filter = filter::Graph::new(); + + let args = format!( + "time_base={}:sample_rate={}:sample_fmt={}:channel_layout=0x{:x}", + decoder.time_base(), + decoder.rate(), + decoder.format().name(), + decoder.channel_layout().bits() + ); + + filter.add(&filter::find("abuffer").unwrap(), "in", &args)?; + filter.add(&filter::find("abuffersink").unwrap(), "out", "")?; + + { + let mut out = filter.get("out").unwrap(); + + out.set_sample_format(encoder.format()); + out.set_channel_layout(encoder.channel_layout()); + out.set_sample_rate(encoder.rate()); + } + + filter.output("in", 0)?.input("out", 0)?.parse(spec)?; + filter.validate()?; + + println!("{}", filter.dump()); + + if let Some(codec) = encoder.codec() { + if !codec + .capabilities() + .contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE) + { + filter + .get("out") + .unwrap() + .sink() + .set_frame_size(encoder.frame_size()); + } + } + + Ok(filter) +}