diff --git a/src/task.rs b/src/task.rs index 6106d77..e7a3442 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,8 +1,6 @@ use crate::transcoder::{Transcoder, TranscoderParams}; use ffmpeg_next::channel_layout::ChannelLayout; -use ffmpeg_next::format::context; -use ffmpeg_next::format::context::Input; -use ffmpeg_next::{format, Dictionary, Error}; +use ffmpeg_next::{format, Dictionary}; use tracing::{debug, error}; pub struct Task { @@ -61,16 +59,15 @@ impl Task { } }; - let octx: Result; - if self.codec_opts.is_some() { - octx = format::output_as_with( + let octx = if let Some(codec_opts) = self.codec_opts { + format::output_as_with( &self.output_path, &self.format, - params_to_avdictionary(&self.codec_opts.unwrap_or_default()), - ); + params_to_avdictionary(&codec_opts), + ) } else { - octx = format::output_as(&self.output_path, &self.format); - } + format::output_as(&self.output_path, &self.format) + }; let mut octx = match octx { Ok(val) => val, @@ -96,6 +93,7 @@ impl Task { }, }, ); + let mut transcoder = match transcoder { Ok(val) => val, Err(err) => { @@ -103,29 +101,57 @@ impl Task { return; } }; + octx.set_metadata(ictx.metadata().to_owned()); - octx.write_header() - .unwrap_or_else(|err| error!("couldn't start transcoding: {:?}", err)); + + if let Err(err) = octx.write_header() { + error!("couldn't start transcoding: {:?}", err); + return; + } for (stream, mut packet) in ictx.packets() { if stream.index() == transcoder.stream { packet.rescale_ts(stream.time_base(), transcoder.in_time_base); - transcoder.send_packet_to_decoder(&packet); - transcoder.receive_and_process_decoded_frames(&mut octx); + + if let Err(err) = transcoder.send_packet_to_decoder(&packet) { + error!("error sending packet to decoder: {:?}", err); + return; + } + + transcoder.receive_and_process_decoded_frames(&mut octx) + .unwrap_or_else(|err| error!("failure during processing decoded frames: {:?}", err)); } } - transcoder.send_eof_to_decoder(); - transcoder.receive_and_process_decoded_frames(&mut octx); + if let Err(err) = transcoder.send_eof_to_decoder() { + error!("error sending EOF to decoder: {:?}", err); + return; + } - transcoder.flush_filter(); - transcoder.get_and_process_filtered_frames(&mut octx); + if let Err(err) = transcoder.receive_and_process_decoded_frames(&mut octx) { + error!("error receiving and processing decoded frames: {:?}", err); + return; + } - transcoder.send_eof_to_encoder(); - transcoder.receive_and_process_encoded_packets(&mut octx); + if let Err(err) = transcoder.flush_filter() { + error!("couldn't flush filter: {:?}", err); + return; + } - octx.write_trailer() - .unwrap_or_else(|err| error!("couldn't finish transcoding: {:?}", err)); + transcoder.get_and_process_filtered_frames(&mut octx) + .unwrap_or_else(|err| error!("failure during processing filtered frames: {:?}", err)); + + if let Err(err) = transcoder.send_eof_to_encoder() { + error!("couldn't send EOF to encoder: {:?}", err); + return; + } + + transcoder.receive_and_process_encoded_packets(&mut octx) + .unwrap_or_else(|err| error!("failure during transcoding: {:?}", err)); + + if let Err(err) = octx.write_trailer() { + error!("couldn't finish transcoding: {:?}", err); + } } } diff --git a/src/transcoder.rs b/src/transcoder.rs index 41b0e0f..9106674 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -4,11 +4,9 @@ use std::error::Error; use std::sync::Arc; use ffmpeg::{codec, filter, format, frame, media}; -use ffmpeg_next::codec::Audio; -use ffmpeg_next::Codec; +use ffmpeg_next::error::EAGAIN; pub struct Transcoder { - params: Arc, pub(crate) stream: usize, filter: filter::Graph, decoder: codec::decoder::Audio, @@ -90,7 +88,6 @@ impl Transcoder { Ok(Transcoder { stream: input.index(), - params: Arc::new(params), filter, decoder, encoder, @@ -103,8 +100,8 @@ impl Transcoder { self.encoder.send_frame(frame) } - pub(crate) fn send_eof_to_encoder(&mut self) { - self.encoder.send_eof().unwrap(); + pub(crate) fn send_eof_to_encoder(&mut self) -> Result<(), ffmpeg::Error> { + self.encoder.send_eof() } pub(crate) fn receive_and_process_encoded_packets( @@ -123,12 +120,12 @@ impl Transcoder { Ok(()) } - fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) { - self.filter.get("in").unwrap().source().add(frame).unwrap(); + fn add_frame_to_filter(&mut self, frame: &ffmpeg::Frame) -> Result<(), ffmpeg::Error> { + self.filter.get("in").unwrap().source().add(frame) } - pub(crate) fn flush_filter(&mut self) { - self.filter.get("in").unwrap().source().flush().unwrap(); + pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> { + self.filter.get("in").unwrap().source().flush() } pub(crate) fn get_and_process_filtered_frames( @@ -139,8 +136,11 @@ impl Transcoder { loop { let mut ctx = self.filter.get("out").ok_or("cannot get context from filter")?; - if ctx.sink().frame(&mut filtered).is_err() { - return Err("frame is suddenly invalid, stopping...".into()); + if let Err(err) = ctx.sink().frame(&mut filtered) { + if err != ffmpeg::Error::Eof { + return Err(err.into()); + } + return Ok(()); } self.send_frame_to_encoder(&filtered)?; @@ -167,8 +167,14 @@ impl Transcoder { while self.decoder.receive_frame(&mut decoded).is_ok() { let timestamp = decoded.timestamp(); decoded.set_pts(timestamp); - self.add_frame_to_filter(&decoded); - self.get_and_process_filtered_frames(octx)?; + self.add_frame_to_filter(&decoded)?; + + if let Err(mut err) = self.get_and_process_filtered_frames(octx) { + let expected = ffmpeg::Error::Other { errno: EAGAIN }; + if err.downcast_mut::().ok_or(ffmpeg::Error::Bug) == Err(expected) { + continue + } + } } Ok(()) }