better error handling for tasks

This commit is contained in:
Pavel 2024-05-26 12:32:09 +03:00
parent ed61d12070
commit debe714215
2 changed files with 17 additions and 12 deletions

View File

@ -1,3 +1,4 @@
use std::error::Error;
use crate::transcoder::{Transcoder, TranscoderParams}; use crate::transcoder::{Transcoder, TranscoderParams};
use ffmpeg_next::channel_layout::ChannelLayout; use ffmpeg_next::channel_layout::ChannelLayout;
use ffmpeg_next::{format, Dictionary}; use ffmpeg_next::{format, Dictionary};
@ -13,7 +14,7 @@ impl Task {
Task { id, params } Task { id, params }
} }
pub fn execute(self) { pub fn execute(self) -> Result<(), Box<dyn Error>> {
debug!( debug!(
"performing transcoding for task with id: {}", "performing transcoding for task with id: {}",
self.id.to_string() self.id.to_string()
@ -22,7 +23,7 @@ impl Task {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
error!("couldn't initialize input context: {:?}", err); error!("couldn't initialize input context: {:?}", err);
return; return Err(err.into());
} }
}; };
@ -40,7 +41,7 @@ impl Task {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
error!("couldn't initialize output context: {:?}", err); error!("couldn't initialize output context: {:?}", err);
return; return Err(err.into());
} }
}; };
@ -65,7 +66,7 @@ impl Task {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
error!("couldn't initialize FFmpeg transcoder: {:?}", err); error!("couldn't initialize FFmpeg transcoder: {:?}", err);
return; return Err(err.into());
} }
}; };
@ -73,7 +74,7 @@ impl Task {
if let Err(err) = octx.write_header() { if let Err(err) = octx.write_header() {
error!("couldn't start transcoding: {:?}", err); error!("couldn't start transcoding: {:?}", err);
return; return Err(err.into());
} }
for (stream, mut packet) in ictx.packets() { for (stream, mut packet) in ictx.packets() {
@ -82,7 +83,7 @@ impl Task {
if let Err(err) = transcoder.send_packet_to_decoder(&packet) { if let Err(err) = transcoder.send_packet_to_decoder(&packet) {
error!("error sending packet to decoder: {:?}", err); error!("error sending packet to decoder: {:?}", err);
return; return Err(err.into());
} }
transcoder transcoder
@ -95,17 +96,17 @@ impl Task {
if let Err(err) = transcoder.send_eof_to_decoder() { if let Err(err) = transcoder.send_eof_to_decoder() {
error!("error sending EOF to decoder: {:?}", err); error!("error sending EOF to decoder: {:?}", err);
return; return Err(err.into());
} }
if let Err(err) = transcoder.receive_and_process_decoded_frames(&mut octx) { if let Err(err) = transcoder.receive_and_process_decoded_frames(&mut octx) {
error!("error receiving and processing decoded frames: {:?}", err); error!("error receiving and processing decoded frames: {:?}", err);
return; return Err(err.into());
} }
if let Err(err) = transcoder.flush_filter() { if let Err(err) = transcoder.flush_filter() {
error!("couldn't flush filter: {:?}", err); error!("couldn't flush filter: {:?}", err);
return; return Err(err.into());
} }
transcoder transcoder
@ -114,7 +115,7 @@ impl Task {
if let Err(err) = transcoder.send_eof_to_encoder() { if let Err(err) = transcoder.send_eof_to_encoder() {
error!("couldn't send EOF to encoder: {:?}", err); error!("couldn't send EOF to encoder: {:?}", err);
return; return Err(err.into());
} }
transcoder transcoder
@ -123,13 +124,15 @@ impl Task {
if let Err(err) = octx.write_trailer() { if let Err(err) = octx.write_trailer() {
error!("couldn't finish transcoding: {:?}", err); error!("couldn't finish transcoding: {:?}", err);
return; return Err(err.into());
} }
debug!( debug!(
"finished transcoding for task with id: {}", "finished transcoding for task with id: {}",
self.id.to_string() self.id.to_string()
); );
Ok(())
} }
} }

View File

@ -54,7 +54,9 @@ impl Worker {
match task { match task {
Ok(task) => { Ok(task) => {
debug!("worker {} got a task; executing.", id); debug!("worker {} got a task; executing.", id);
task.execute(); if let Err(err) = task.execute() {
error!("worker {} failed to finish the task: {:?}", id, err);
}
} }
Err(e) => { Err(e) => {
error!("worker {} failed to receive task: {:?}", id, e); error!("worker {} failed to receive task: {:?}", id, e);