diff --git a/src/task.rs b/src/task.rs index d7044bf..162e5b5 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,3 +1,4 @@ +use std::error::Error; use crate::transcoder::{Transcoder, TranscoderParams}; use ffmpeg_next::channel_layout::ChannelLayout; use ffmpeg_next::{format, Dictionary}; @@ -13,7 +14,7 @@ impl Task { Task { id, params } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), Box> { debug!( "performing transcoding for task with id: {}", self.id.to_string() @@ -22,7 +23,7 @@ impl Task { Ok(val) => val, Err(err) => { error!("couldn't initialize input context: {:?}", err); - return; + return Err(err.into()); } }; @@ -40,7 +41,7 @@ impl Task { Ok(val) => val, Err(err) => { error!("couldn't initialize output context: {:?}", err); - return; + return Err(err.into()); } }; @@ -65,7 +66,7 @@ impl Task { Ok(val) => val, Err(err) => { error!("couldn't initialize FFmpeg transcoder: {:?}", err); - return; + return Err(err.into()); } }; @@ -73,7 +74,7 @@ impl Task { if let Err(err) = octx.write_header() { error!("couldn't start transcoding: {:?}", err); - return; + return Err(err.into()); } for (stream, mut packet) in ictx.packets() { @@ -82,7 +83,7 @@ impl Task { if let Err(err) = transcoder.send_packet_to_decoder(&packet) { error!("error sending packet to decoder: {:?}", err); - return; + return Err(err.into()); } transcoder @@ -95,17 +96,17 @@ impl Task { if let Err(err) = transcoder.send_eof_to_decoder() { error!("error sending EOF to decoder: {:?}", err); - return; + return Err(err.into()); } if let Err(err) = transcoder.receive_and_process_decoded_frames(&mut octx) { error!("error receiving and processing decoded frames: {:?}", err); - return; + return Err(err.into()); } if let Err(err) = transcoder.flush_filter() { error!("couldn't flush filter: {:?}", err); - return; + return Err(err.into()); } transcoder @@ -114,7 +115,7 @@ impl Task { if let Err(err) = transcoder.send_eof_to_encoder() { error!("couldn't send EOF to encoder: {:?}", err); - return; + return Err(err.into()); } transcoder @@ -123,13 +124,15 @@ impl Task { if let Err(err) = octx.write_trailer() { error!("couldn't finish transcoding: {:?}", err); - return; + return Err(err.into()); } debug!( "finished transcoding for task with id: {}", self.id.to_string() ); + + Ok(()) } } diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 85bf017..9279ae1 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -54,7 +54,9 @@ impl Worker { match task { Ok(task) => { debug!("worker {} got a task; executing.", id); - task.execute(); + if let Err(err) = task.execute() { + error!("worker {} failed to finish the task: {:?}", id, err); + } } Err(e) => { error!("worker {} failed to receive task: {:?}", id, e);