diff --git a/Cargo.toml b/Cargo.toml index 9314875..b06bce0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,6 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } tower-http = { version = "0.5.2", features = ["trace"] } num_cpus = "1.16.0" -ffmpeg-next = "7.0.1" \ No newline at end of file +ffmpeg-next = "7.0.1" +ureq = { version = "*", features = ["json", "charset"] } +mime_guess = "2.0.4" diff --git a/README.md b/README.md index 1baedca..12b22a7 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,9 @@ curl --location 'http://localhost:8090/enqueue' \ - [ ] Implement acceptable error handling. - [ ] Restart threads in case of panic. - [ ] Remove old conversion results and input files every Nth hours. -- [ ] Remove input file after transcoding it. -- [ ] Implement file upload to `uploadUrl` (if `Content-Type: application/json` then conversion was not successful and body contains an error info). -- [ ] Remove transcoding result after uploading it to the `uploadUrl`. +- [x] Remove input file after transcoding it. +- [x] Implement file upload to `uploadUrl` (if `Content-Type: application/json` then conversion was not successful and body contains an error info). +- [x] Remove transcoding result after uploading it to the `uploadUrl`. - [ ] (Optional) Make `uploadUrl` optional and allow the client to download the file on-demand. - [ ] Docker image for `amd64` and `aarch64`. - [ ] Statically linked binary for Docker image & result docker image based on `scratch` (reduce image size). diff --git a/src/main.rs b/src/main.rs index 2c412c0..fcbe4cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,7 @@ async fn main() { env::var("NUM_WORKERS") .ok() .and_then(|val| val.parse::().ok()) - .filter(|&val| val > 0) + .filter(|&val| val > 0), ); let temp_dir = env::var("TEMP_DIR").unwrap_or_else(|_| { env::temp_dir() diff --git a/src/task.rs b/src/task.rs index 162e5b5..809b2e5 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,9 +1,16 @@ -use std::error::Error; +use crate::dto::ConvertResponse; use crate::transcoder::{Transcoder, TranscoderParams}; use ffmpeg_next::channel_layout::ChannelLayout; use ffmpeg_next::{format, Dictionary}; +use mime_guess::from_path; +use std::error::Error; +use std::fs::File; +use std::io::Read; +use std::path::Path; use tracing::{debug, error}; +use ureq::Response; +#[derive(Clone)] pub struct Task { id: uuid::Uuid, params: TaskParams, @@ -15,6 +22,40 @@ impl Task { } pub fn execute(self) -> Result<(), Box> { + if let Err(err) = self.clone().transcode() { + std::fs::remove_file(Path::new(&self.params.input_path)).ok(); + std::fs::remove_file(Path::new(&self.params.output_path)).ok(); + send_error( + self.id, + format!("Couldn't transcode: {}", err).as_str(), + &self.params.upload_url, + ) + .ok(); + return Err(err); + } + + std::fs::remove_file(Path::new(&self.params.input_path)).ok(); + + if let Err(err) = upload_file(&self.params.output_path, &self.params.upload_url) { + error!( + "couldn't upload result for job id={}, file path {}: {}", + &self.id.to_string(), + &self.params.output_path, + err + ); + } else { + debug!( + "job id={} result was uploaded to {}", + &self.id.to_string(), + &self.params.upload_url + ); + } + + std::fs::remove_file(Path::new(&self.params.output_path)).ok(); + Ok(()) + } + + pub fn transcode(self) -> Result<(), Box> { debug!( "performing transcoding for task with id: {}", self.id.to_string() @@ -27,7 +68,7 @@ impl Task { } }; - let octx = if let Some(codec_opts) = self.params.codec_opts { + let octx = if let Some(codec_opts) = &self.params.codec_opts { format::output_as_with( &self.params.output_path, &self.params.format, @@ -136,6 +177,7 @@ impl Task { } } +#[derive(Clone)] pub struct TaskParams { pub format: String, pub codec: String, @@ -149,6 +191,57 @@ pub struct TaskParams { pub upload_url: String, } +fn send_error( + id: uuid::Uuid, + error: &str, + url: &str, +) -> Result> { + let response = ureq::post(url) + .set("Content-Type", "application/json") + .send_json(ConvertResponse { + id: Some(id.to_string()), + error: Some(error.to_string()), + })?; + + if response.status() == 200 { + Ok(response) + } else { + Err(format!("Failed to send an error. Status: {}", response.status()).into()) + } +} + +fn upload_file>( + file_path: P, + url: &str, +) -> Result> { + let path = file_path.as_ref(); + let file_name = path + .file_name() + .ok_or("Invalid file path")? + .to_str() + .ok_or("Invalid file name")?; + + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + + let mime_type = from_path(path).first_or_octet_stream(); + + let response = ureq::post(url) + .set("Content-Type", mime_type.as_ref()) + .set( + "Content-Disposition", + &format!("attachment; filename=\"{}\"", file_name), + ) + .send_bytes(&buffer)?; + + if response.status() == 200 { + Ok(response) + } else { + Err(format!("Failed to upload file. Status: {}", response.status()).into()) + } +} + fn params_to_avdictionary(input: &str) -> Dictionary { let mut dict: Dictionary = Dictionary::new(); for pair in input.split(';') { diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 9279ae1..8a0ed42 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -1,8 +1,8 @@ +use ffmpeg_next::log::Level; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use ffmpeg_next::log::Level; use tracing::{debug, error}; diff --git a/src/transcoder.rs b/src/transcoder.rs index 6ec5528..4f3e037 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -141,21 +141,16 @@ impl Transcoder { let mut source = ctx.source(); source.add(frame) } else { - Err(ffmpeg::Error::Other { - errno: 0, - }) + Err(ffmpeg::Error::Other { errno: 0 }) } } - pub(crate) fn flush_filter(&mut self) -> Result<(), ffmpeg::Error> { if let Some(mut ctx) = self.filter.get("in") { let mut source = ctx.source(); source.flush() } else { - Err(ffmpeg::Error::Other { - errno: 0, - }) + Err(ffmpeg::Error::Other { errno: 0 }) } }