Compare commits

...

2 Commits

Author SHA1 Message Date
0d92a39309 simplify the code (2) 2024-05-25 23:58:12 +03:00
d1331462a6 simplify the code (1) 2024-05-25 23:17:54 +03:00
3 changed files with 125 additions and 154 deletions

View File

@ -40,10 +40,7 @@ impl Server {
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());
tracing::info!("listening on {addr}"); tracing::info!("listening on {addr}");
let listener = match TcpListener::bind(addr).await { let listener = TcpListener::bind(addr).await?;
Ok(listen) => listen,
Err(err) => return Err(err),
};
axum::serve(listener, app).await axum::serve(listener, app).await
} }
} }
@ -53,26 +50,21 @@ async fn enqueue_file(
TypedMultipart(req): TypedMultipart<ConvertRequest>, TypedMultipart(req): TypedMultipart<ConvertRequest>,
) -> (StatusCode, Json<ConvertResponse>) { ) -> (StatusCode, Json<ConvertResponse>) {
let task_id = Uuid::new_v4(); let task_id = Uuid::new_v4();
let input = Path::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id.to_string())); let input = Path::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id));
let output = let output = Path::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id));
Path::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id.to_string()));
let file = req.file; let file = req.file;
match file.contents.persist(input.clone()) { match file.contents.persist(input.clone()) {
Ok(_) => { Ok(_) => {
let input_path = input.to_str(); let input_path = match input.to_str() {
let output_path = output.to_str(); Some(path) => path,
None => return error_response("Invalid input path"),
if input_path.is_none() || output_path.is_none() { };
return ( let output_path = match output.to_str() {
StatusCode::INTERNAL_SERVER_ERROR, Some(path) => path,
Json::from(ConvertResponse { None => return error_response("Invalid output path"),
id: None, };
error: Some(String::from("Input or output paths are not correct")),
}),
);
}
let task = Task::new( let task = Task::new(
task_id, task_id,
@ -84,8 +76,8 @@ async fn enqueue_file(
req.sample_rate, req.sample_rate,
req.channel_layout, req.channel_layout,
req.upload_url, req.upload_url,
input_path.unwrap().to_string(), input_path.to_string(),
output_path.unwrap().to_string(), output_path.to_string(),
); );
// Enqueue the task to the thread pool // Enqueue the task to the thread pool
@ -99,12 +91,16 @@ async fn enqueue_file(
}), }),
) )
} }
Err(_) => ( Err(_) => error_response("Cannot save the file"),
StatusCode::INTERNAL_SERVER_ERROR,
Json::from(ConvertResponse {
id: Some(task_id.to_string()),
error: Some(String::from("Cannot save the file")),
}),
),
} }
} }
fn error_response(msg: &str) -> (StatusCode, Json<ConvertResponse>) {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json::from(ConvertResponse {
id: None,
error: Some(msg.to_string()),
}),
)
}

View File

@ -1,8 +1,6 @@
use crate::transcoder::{Transcoder, TranscoderParams}; use crate::transcoder::{Transcoder, TranscoderParams};
use ffmpeg_next::channel_layout::ChannelLayout; use ffmpeg_next::channel_layout::ChannelLayout;
use ffmpeg_next::format::context; use ffmpeg_next::{format, Dictionary};
use ffmpeg_next::format::context::Input;
use ffmpeg_next::{format, Dictionary, Error};
use tracing::{debug, error}; use tracing::{debug, error};
pub struct Task { pub struct Task {
@ -61,16 +59,15 @@ impl Task {
} }
}; };
let octx: Result<context::Output, ffmpeg_next::Error>; let octx = if let Some(codec_opts) = self.codec_opts {
if self.codec_opts.is_some() { format::output_as_with(
octx = format::output_as_with(
&self.output_path, &self.output_path,
&self.format, &self.format,
params_to_avdictionary(&self.codec_opts.unwrap_or_default()), params_to_avdictionary(&codec_opts),
); )
} else { } else {
octx = format::output_as(&self.output_path, &self.format); format::output_as(&self.output_path, &self.format)
} };
let mut octx = match octx { let mut octx = match octx {
Ok(val) => val, Ok(val) => val,
@ -96,6 +93,7 @@ impl Task {
}, },
}, },
); );
let mut transcoder = match transcoder { let mut transcoder = match transcoder {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
@ -103,29 +101,57 @@ impl Task {
return; return;
} }
}; };
octx.set_metadata(ictx.metadata().to_owned()); octx.set_metadata(ictx.metadata().to_owned());
octx.write_header()
.unwrap_or_else(|err| error!("couldn't start transcoding: {:?}", err)); if let Err(err) = octx.write_header() {
error!("couldn't start transcoding: {:?}", err);
return;
}
for (stream, mut packet) in ictx.packets() { for (stream, mut packet) in ictx.packets() {
if stream.index() == transcoder.stream { if stream.index() == transcoder.stream {
packet.rescale_ts(stream.time_base(), transcoder.in_time_base); packet.rescale_ts(stream.time_base(), transcoder.in_time_base);
transcoder.send_packet_to_decoder(&packet);
transcoder.receive_and_process_decoded_frames(&mut octx); if let Err(err) = transcoder.send_packet_to_decoder(&packet) {
error!("error sending packet to decoder: {:?}", err);
return;
}
transcoder.receive_and_process_decoded_frames(&mut octx)
.unwrap_or_else(|err| error!("failure during processing decoded frames: {:?}", err));
} }
} }
transcoder.send_eof_to_decoder(); if let Err(err) = transcoder.send_eof_to_decoder() {
transcoder.receive_and_process_decoded_frames(&mut octx); error!("error sending EOF to decoder: {:?}", err);
return;
}
transcoder.flush_filter(); if let Err(err) = transcoder.receive_and_process_decoded_frames(&mut octx) {
transcoder.get_and_process_filtered_frames(&mut octx); error!("error receiving and processing decoded frames: {:?}", err);
return;
}
transcoder.send_eof_to_encoder(); if let Err(err) = transcoder.flush_filter() {
transcoder.receive_and_process_encoded_packets(&mut octx); error!("couldn't flush filter: {:?}", err);
return;
}
octx.write_trailer() transcoder.get_and_process_filtered_frames(&mut octx)
.unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err)); .unwrap_or_else(|err| error!("failure during processing filtered frames: {:?}", err));
if let Err(err) = transcoder.send_eof_to_encoder() {
error!("couldn't send EOF to encoder: {:?}", err);
return;
}
transcoder.receive_and_process_encoded_packets(&mut octx)
.unwrap_or_else(|err| error!("failure during transcoding: {:?}", err));
if let Err(err) = octx.write_trailer() {
error!("couldn't finish transcoding: {:?}", err);
}
} }
} }

View File

@ -4,11 +4,9 @@ use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use ffmpeg::{codec, filter, format, frame, media}; use ffmpeg::{codec, filter, format, frame, media};
use ffmpeg_next::codec::Audio; use ffmpeg_next::error::EAGAIN;
use ffmpeg_next::Codec;
pub struct Transcoder { pub struct Transcoder {
params: Arc<TranscoderParams>,
pub(crate) stream: usize, pub(crate) stream: usize,
filter: filter::Graph, filter: filter::Graph,
decoder: codec::decoder::Audio, decoder: codec::decoder::Audio,
@ -34,27 +32,18 @@ impl Transcoder {
let input = ictx let input = ictx
.streams() .streams()
.best(media::Type::Audio) .best(media::Type::Audio)
.expect("could not find best audio stream"); .ok_or("could not find best audio stream")?;
let context = codec::context::Context::from_parameters(input.parameters())?; let context = codec::context::Context::from_parameters(input.parameters())?;
let mut decoder = match context.decoder().audio() { let mut decoder = context.decoder().audio().map_err(|err| {
Ok(val) => val, format!("couldn't find decoder for input file: {}", err)
Err(err) => { })?;
return Err(
format!("couldn't find decoder for input file: {}", err.to_string()).into(), let codec = ffmpeg::encoder::find_by_name(&*params.codec)
) .ok_or_else(|| format!("couldn't find codec with name: {}", params.codec))?
} .audio()?;
};
let codec = match ffmpeg::encoder::find_by_name(&*params.codec) { let global = octx.format().flags().contains(format::flag::Flags::GLOBAL_HEADER);
None => return Err(format!("couldn't find codec with name: {}", params.codec).into()),
Some(val) => match val.audio() {
Ok(val) => val,
Err(err) => return Err(err.into()),
},
};
let global = octx
.format()
.flags()
.contains(format::flag::Flags::GLOBAL_HEADER);
decoder.set_parameters(input.parameters())?; decoder.set_parameters(input.parameters())?;
@ -74,55 +63,36 @@ impl Transcoder {
encoder.set_rate(sample_rate); encoder.set_rate(sample_rate);
encoder.set_channel_layout(params.channel_layout); encoder.set_channel_layout(params.channel_layout);
#[cfg(not(feature = "ffmpeg_7_0"))] #[cfg(not(feature = "ffmpeg_7_0"))]
{
encoder.set_channels(params.channel_layout.channels()); encoder.set_channels(params.channel_layout.channels());
}
encoder.set_format( encoder.set_format(
codec codec
.formats() .formats()
.expect( .ok_or_else(|| format!("failed to get supported formats for codec: {}", codec.name()))?
format!(
"failed to get supported formats for codec: {}",
codec.name()
)
.as_str(),
)
.next() .next()
.unwrap(), .ok_or("no supported formats found for codec")?,
); );
if params.bit_rate > 0 { encoder.set_bit_rate(if params.bit_rate > 0 { params.bit_rate } else { decoder.bit_rate() });
encoder.set_bit_rate(params.bit_rate); encoder.set_max_bit_rate(if params.max_bit_rate > 0 { params.max_bit_rate } else { decoder.max_bit_rate() });
} else {
encoder.set_bit_rate(decoder.bit_rate());
}
if params.max_bit_rate > 0 {
encoder.set_max_bit_rate(params.bit_rate);
} else {
encoder.set_max_bit_rate(decoder.max_bit_rate());
}
encoder.set_time_base((1, sample_rate)); encoder.set_time_base((1, sample_rate));
output.set_time_base((1, sample_rate)); output.set_time_base((1, sample_rate));
let in_time_base = decoder.time_base();
let encoder = encoder.open_as(codec)?; let encoder = encoder.open_as(codec)?;
output.set_parameters(&encoder); output.set_parameters(&encoder);
let filter = filter("anull", &decoder, &encoder)?; let filter = filter_graph("anull", &decoder, &encoder)?;
let in_time_base = decoder.time_base();
let out_time_base = output.time_base();
Ok(Transcoder { Ok(Transcoder {
stream: input.index(), stream: input.index(),
params: Arc::new(params),
filter, filter,
decoder, decoder,
encoder, encoder,
in_time_base, in_time_base,
out_time_base, out_time_base: output.time_base(),
}) })
} }
@ -130,8 +100,8 @@ impl Transcoder {
self.encoder.send_frame(frame) self.encoder.send_frame(frame)
} }
pub(crate) fn send_eof_to_encoder(&mut self) { pub(crate) fn send_eof_to_encoder(&mut self) -> Result<(), ffmpeg::Error> {
self.encoder.send_eof().unwrap(); self.encoder.send_eof()
} }
pub(crate) fn receive_and_process_encoded_packets( pub(crate) fn receive_and_process_encoded_packets(
@ -143,20 +113,19 @@ impl Transcoder {
encoded.set_stream(0); encoded.set_stream(0);
encoded.rescale_ts(self.in_time_base, self.out_time_base); encoded.rescale_ts(self.in_time_base, self.out_time_base);
match encoded.write_interleaved(octx) { if let Err(err) = encoded.write_interleaved(octx) {
Err(err) => return Err(err.into()), return Err(err.into());
Ok(_) => (),
} }
} }
Ok(()) Ok(())
} }
fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) { fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> {
self.filter.get("in").unwrap().source().add(frame).unwrap(); self.filter.get("in").unwrap().source().add(frame)
} }
pub(crate) fn flush_filter(&mut self) { pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> {
self.filter.get("in").unwrap().source().flush().unwrap(); self.filter.get("in").unwrap().source().flush()
} }
pub(crate) fn get_and_process_filtered_frames( pub(crate) fn get_and_process_filtered_frames(
@ -165,23 +134,17 @@ impl Transcoder {
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let mut filtered = frame::Audio::empty(); let mut filtered = frame::Audio::empty();
loop { loop {
let mut ctx: ffmpeg::filter::Context = match self.filter.get("out") { let mut ctx = self.filter.get("out").ok_or("cannot get context from filter")?;
None => return Err(Box::from("cannot get context from filter")),
Some(val) => val,
};
if !ctx.sink().frame(&mut filtered).is_ok() { if let Err(err) = ctx.sink().frame(&mut filtered) {
return Err(Box::from("frame is suddenly invalid, stopping...")); if err != ffmpeg::Error::Eof {
return Err(err.into());
}
return Ok(());
} }
match self.send_frame_to_encoder(&filtered) { self.send_frame_to_encoder(&filtered)?;
Err(err) => return Err(err.into()), self.receive_and_process_encoded_packets(octx)?;
Ok(_) => (),
};
match self.receive_and_process_encoded_packets(octx) {
Err(err) => return Err(err.into()),
Ok(_) => (),
}
} }
} }
@ -189,17 +152,11 @@ impl Transcoder {
&mut self, &mut self,
packet: &ffmpeg::Packet, packet: &ffmpeg::Packet,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
match self.decoder.send_packet(packet) { self.decoder.send_packet(packet).map_err(|err| err.into())
Err(err) => return Err(err.into()),
Ok(_) => Ok(()),
}
} }
pub(crate) fn send_eof_to_decoder(&mut self) -> Result<(), Box<dyn Error>> { pub(crate) fn send_eof_to_decoder(&mut self) -> Result<(), Box<dyn Error>> {
match self.decoder.send_eof() { self.decoder.send_eof().map_err(|err| err.into())
Err(err) => return Err(err.into()),
Ok(_) => Ok(()),
}
} }
pub(crate) fn receive_and_process_decoded_frames( pub(crate) fn receive_and_process_decoded_frames(
@ -210,18 +167,20 @@ impl Transcoder {
while self.decoder.receive_frame(&mut decoded).is_ok() { while self.decoder.receive_frame(&mut decoded).is_ok() {
let timestamp = decoded.timestamp(); let timestamp = decoded.timestamp();
decoded.set_pts(timestamp); decoded.set_pts(timestamp);
self.add_frame_to_filter(&decoded); self.add_frame_to_filter(&decoded)?;
match self.get_and_process_filtered_frames(octx) { if let Err(mut err) = self.get_and_process_filtered_frames(octx) {
Err(err) => return Err(err.into()), let expected = ffmpeg::Error::Other { errno: EAGAIN };
Ok(_) => (), if err.downcast_mut::<ffmpeg::error::Error>().ok_or(ffmpeg::Error::Bug) == Err(expected) {
continue
}
} }
} }
Ok(()) Ok(())
} }
} }
fn filter( fn filter_graph(
spec: &str, spec: &str,
decoder: &codec::decoder::Audio, decoder: &codec::decoder::Audio,
encoder: &codec::encoder::Audio, encoder: &codec::encoder::Audio,
@ -239,13 +198,10 @@ fn filter(
filter.add(&filter::find("abuffer").unwrap(), "in", &args)?; filter.add(&filter::find("abuffer").unwrap(), "in", &args)?;
filter.add(&filter::find("abuffersink").unwrap(), "out", "")?; filter.add(&filter::find("abuffersink").unwrap(), "out", "")?;
{
let mut out = filter.get("out").unwrap(); let mut out = filter.get("out").unwrap();
out.set_sample_format(encoder.format()); out.set_sample_format(encoder.format());
out.set_channel_layout(encoder.channel_layout()); out.set_channel_layout(encoder.channel_layout());
out.set_sample_rate(encoder.rate()); out.set_sample_rate(encoder.rate());
}
filter.output("in", 0)?.input("out", 0)?.parse(spec)?; filter.output("in", 0)?.input("out", 0)?.parse(spec)?;
filter.validate()?; filter.validate()?;
@ -253,15 +209,8 @@ fn filter(
println!("{}", filter.dump()); println!("{}", filter.dump());
if let Some(codec) = encoder.codec() { if let Some(codec) = encoder.codec() {
if !codec if !codec.capabilities().contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE) {
.capabilities() filter.get("out").unwrap().sink().set_frame_size(encoder.frame_size());
.contains(codec::capabilities::Capabilities::VARIABLE_FRAME_SIZE)
{
filter
.get("out")
.unwrap()
.sink()
.set_frame_size(encoder.frame_size());
} }
} }