simplify the code (2)

This commit is contained in:
Pavel 2024-05-25 23:58:12 +03:00
parent d1331462a6
commit 0d92a39309
2 changed files with 68 additions and 36 deletions

View File

@ -1,8 +1,6 @@
use crate::transcoder::{Transcoder, TranscoderParams}; use crate::transcoder::{Transcoder, TranscoderParams};
use ffmpeg_next::channel_layout::ChannelLayout; use ffmpeg_next::channel_layout::ChannelLayout;
use ffmpeg_next::format::context; use ffmpeg_next::{format, Dictionary};
use ffmpeg_next::format::context::Input;
use ffmpeg_next::{format, Dictionary, Error};
use tracing::{debug, error}; use tracing::{debug, error};
pub struct Task { pub struct Task {
@ -61,16 +59,15 @@ impl Task {
} }
}; };
let octx: Result<context::Output, ffmpeg_next::Error>; let octx = if let Some(codec_opts) = self.codec_opts {
if self.codec_opts.is_some() { format::output_as_with(
octx = format::output_as_with(
&self.output_path, &self.output_path,
&self.format, &self.format,
params_to_avdictionary(&self.codec_opts.unwrap_or_default()), params_to_avdictionary(&codec_opts),
); )
} else { } else {
octx = format::output_as(&self.output_path, &self.format); format::output_as(&self.output_path, &self.format)
} };
let mut octx = match octx { let mut octx = match octx {
Ok(val) => val, Ok(val) => val,
@ -96,6 +93,7 @@ impl Task {
}, },
}, },
); );
let mut transcoder = match transcoder { let mut transcoder = match transcoder {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
@ -103,29 +101,57 @@ impl Task {
return; return;
} }
}; };
octx.set_metadata(ictx.metadata().to_owned()); octx.set_metadata(ictx.metadata().to_owned());
octx.write_header()
.unwrap_or_else(|err| error!("couldn't start transcoding: {:?}", err)); if let Err(err) = octx.write_header() {
error!("couldn't start transcoding: {:?}", err);
return;
}
for (stream, mut packet) in ictx.packets() { for (stream, mut packet) in ictx.packets() {
if stream.index() == transcoder.stream { if stream.index() == transcoder.stream {
packet.rescale_ts(stream.time_base(), transcoder.in_time_base); packet.rescale_ts(stream.time_base(), transcoder.in_time_base);
transcoder.send_packet_to_decoder(&packet);
transcoder.receive_and_process_decoded_frames(&mut octx); if let Err(err) = transcoder.send_packet_to_decoder(&packet) {
error!("error sending packet to decoder: {:?}", err);
return;
}
transcoder.receive_and_process_decoded_frames(&mut octx)
.unwrap_or_else(|err| error!("failure during processing decoded frames: {:?}", err));
} }
} }
transcoder.send_eof_to_decoder(); if let Err(err) = transcoder.send_eof_to_decoder() {
transcoder.receive_and_process_decoded_frames(&mut octx); error!("error sending EOF to decoder: {:?}", err);
return;
}
transcoder.flush_filter(); if let Err(err) = transcoder.receive_and_process_decoded_frames(&mut octx) {
transcoder.get_and_process_filtered_frames(&mut octx); error!("error receiving and processing decoded frames: {:?}", err);
return;
}
transcoder.send_eof_to_encoder(); if let Err(err) = transcoder.flush_filter() {
transcoder.receive_and_process_encoded_packets(&mut octx); error!("couldn't flush filter: {:?}", err);
return;
}
octx.write_trailer() transcoder.get_and_process_filtered_frames(&mut octx)
.unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err)); .unwrap_or_else(|err| error!("failure during processing filtered frames: {:?}", err));
if let Err(err) = transcoder.send_eof_to_encoder() {
error!("couldn't send EOF to encoder: {:?}", err);
return;
}
transcoder.receive_and_process_encoded_packets(&mut octx)
.unwrap_or_else(|err| error!("failure during transcoding: {:?}", err));
if let Err(err) = octx.write_trailer() {
error!("couldn't finish transcoding: {:?}", err);
}
} }
} }

View File

@ -4,11 +4,9 @@ use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use ffmpeg::{codec, filter, format, frame, media}; use ffmpeg::{codec, filter, format, frame, media};
use ffmpeg_next::codec::Audio; use ffmpeg_next::error::EAGAIN;
use ffmpeg_next::Codec;
pub struct Transcoder { pub struct Transcoder {
params: Arc<TranscoderParams>,
pub(crate) stream: usize, pub(crate) stream: usize,
filter: filter::Graph, filter: filter::Graph,
decoder: codec::decoder::Audio, decoder: codec::decoder::Audio,
@ -90,7 +88,6 @@ impl Transcoder {
Ok(Transcoder { Ok(Transcoder {
stream: input.index(), stream: input.index(),
params: Arc::new(params),
filter, filter,
decoder, decoder,
encoder, encoder,
@ -103,8 +100,8 @@ impl Transcoder {
self.encoder.send_frame(frame) self.encoder.send_frame(frame)
} }
pub(crate) fn send_eof_to_encoder(&mut self) { pub(crate) fn send_eof_to_encoder(&mut self) -> Result<(), ffmpeg::Error> {
self.encoder.send_eof().unwrap(); self.encoder.send_eof()
} }
pub(crate) fn receive_and_process_encoded_packets( pub(crate) fn receive_and_process_encoded_packets(
@ -123,12 +120,12 @@ impl Transcoder {
Ok(()) Ok(())
} }
fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) { fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> {
self.filter.get("in").unwrap().source().add(frame).unwrap(); self.filter.get("in").unwrap().source().add(frame)
} }
pub(crate) fn flush_filter(&mut self) { pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> {
self.filter.get("in").unwrap().source().flush().unwrap(); self.filter.get("in").unwrap().source().flush()
} }
pub(crate) fn get_and_process_filtered_frames( pub(crate) fn get_and_process_filtered_frames(
@ -139,8 +136,11 @@ impl Transcoder {
loop { loop {
let mut ctx = self.filter.get("out").ok_or("cannot get context from filter")?; let mut ctx = self.filter.get("out").ok_or("cannot get context from filter")?;
if ctx.sink().frame(&mut filtered).is_err() { if let Err(err) = ctx.sink().frame(&mut filtered) {
return Err("frame is suddenly invalid, stopping...".into()); if err != ffmpeg::Error::Eof {
return Err(err.into());
}
return Ok(());
} }
self.send_frame_to_encoder(&filtered)?; self.send_frame_to_encoder(&filtered)?;
@ -167,8 +167,14 @@ impl Transcoder {
while self.decoder.receive_frame(&mut decoded).is_ok() { while self.decoder.receive_frame(&mut decoded).is_ok() {
let timestamp = decoded.timestamp(); let timestamp = decoded.timestamp();
decoded.set_pts(timestamp); decoded.set_pts(timestamp);
self.add_frame_to_filter(&decoded); self.add_frame_to_filter(&decoded)?;
self.get_and_process_filtered_frames(octx)?;
if let Err(mut err) = self.get_and_process_filtered_frames(octx) {
let expected = ffmpeg::Error::Other { errno: EAGAIN };
if err.downcast_mut::<ffmpeg::error::Error>().ok_or(ffmpeg::Error::Bug) == Err(expected) {
continue
}
}
} }
Ok(()) Ok(())
} }