From 03057706f8b22907bdc37dffc2526a3500dffd42 Mon Sep 17 00:00:00 2001 From: Neur0toxine Date: Wed, 29 May 2024 18:32:17 +0300 Subject: [PATCH] enqueue file by url (helps with bigger files) --- Cargo.toml | 2 +- README.md | 40 ++++++++++++++---- src/dto.rs | 22 +++++++--- src/filepath.rs | 11 +++++ src/main.rs | 1 + src/server.rs | 62 +++++++++++++++++++++++---- src/task.rs | 104 +++++++++++++++++++++++++++++++++++++++------- src/transcoder.rs | 30 +++++++------ 8 files changed, 223 insertions(+), 49 deletions(-) create mode 100644 src/filepath.rs diff --git a/Cargo.toml b/Cargo.toml index 3dc3f91..62b8dbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "atranscoder-rpc" -version = "0.1.0" +version = "0.1.1" edition = "2021" [dependencies] diff --git a/README.md b/README.md index 0bbdd27..0bc72f0 100644 --- a/README.md +++ b/README.md @@ -15,19 +15,42 @@ curl --location 'http://localhost:8090/enqueue' \ --form 'file=@"/home/user/Music/test.mp3"' \ --form 'format="mp4"' \ --form 'codec="libfdk_aac"' \ ---form 'codecOpts="profile=aac_he"' \ ---form 'bitRate="160000"' \ ---form 'maxBitRate="160000"' \ ---form 'sampleRate="44100"' \ ---form 'channelLayout="stereo"' \ ---form 'callbackUrl="http://127.0.0.1:8909/callback"' +--form 'codec_opts="profile=aac_he"' \ +--form 'bit_rate="64000"' \ +--form 'max_bit_rate="64000"' \ +--form 'sample_rate="44100"' \ +--form 'channel_layout="stereo"' \ +--form 'callback_url="http://127.0.0.1:8909/callback"' ``` -3. Your `callbackUrl` will receive JSON response with job ID and error in case of failure. Error will be null if transcoding was successful. +3. Your `callback_url` will receive JSON response with job ID and error in case of failure. Error will be null if transcoding was successful. 4. You can download transcoded file like this (replace `job_id` with the ID you've received): ```bash curl -L http://localhost:8090/get/job_id -o file.mp4 ``` +You can also enqueue a remote file like this: +```bash +curl --location 'http://localhost:8090/enqueue_url' \ +--header 'Content-Type: application/json' \ +--data '{ + "format": "mp4", + "codec": "libfdk_aac", + "codec_opts": "profile=aac_he", + "bit_rate": 64000, + "max_bit_rate": 64000, + "sample_rate": 44100, + "channel_layout": "stereo", + "url": "https://upload.wikimedia.org/wikipedia/commons/c/c8/Example.ogg", + "callback_url": "http://127.0.0.1:8909/callback" +}' +``` + +Mandatory fields: +- `format` +- `codec` +- `sample_rate` +- `url` (for `/enqueue_url`) + You can change configuration using this environment variables: - `LISTEN` - change this environment variable to change TCP listen address. Default is `0.0.0.0:8090`. - `NUM_WORKERS` - can be used to change how many threads will be used to transcode incoming files. Default is equal to logical CPUs. @@ -44,7 +67,8 @@ You can change configuration using this environment variables: - [x] Do not upload files directly, add download route with streaming instead. - [x] Conversion from OGG Opus mono to HE-AAC v1 Stereo outputs high-pitched crackling audio. - [x] Conversion from OGG Opus mono to AAC sometimes crashes the app with SIGSEGV (this can be seen more often with very short audio). -- [ ] If FFmpeg fails, `send_error` won't be called - fix that. +- [x] ~~If FFmpeg fails, `send_error` won't be called - fix that.~~ It actually works, I just didn't notice before. +- [x] Ability to enqueue a remote file. - [ ] Default errors are returned in plain text. Change it to the JSON. - [ ] Docker image for `amd64` and `arm64` (currently only `amd64` is supported because `arm64` cross-compilation with QEMU is sloooooooooooowwwww...). - [ ] Tests! \ No newline at end of file diff --git a/src/dto.rs b/src/dto.rs index 6b2c318..1603bc1 100644 --- a/src/dto.rs +++ b/src/dto.rs @@ -9,17 +9,29 @@ pub struct ConvertResponse { } #[derive(TryFromMultipart)] -#[try_from_multipart(rename_all = "camelCase")] pub struct ConvertRequest { pub format: String, pub codec: String, pub codec_opts: Option, - pub bit_rate: usize, - pub max_bit_rate: usize, + pub bit_rate: Option, + pub max_bit_rate: Option, pub sample_rate: i32, - pub channel_layout: String, - pub callback_url: String, + pub channel_layout: Option, + pub callback_url: Option, #[form_data(limit = "1GiB")] pub file: FieldData, } + +#[derive(Serialize, Deserialize)] +pub struct ConvertURLRequest { + pub format: String, + pub codec: String, + pub codec_opts: Option, + pub bit_rate: Option, + pub max_bit_rate: Option, + pub sample_rate: i32, + pub channel_layout: Option, + pub url: String, + pub callback_url: Option, +} diff --git a/src/filepath.rs b/src/filepath.rs new file mode 100644 index 0000000..890e4bd --- /dev/null +++ b/src/filepath.rs @@ -0,0 +1,11 @@ +use std::path::{Path, PathBuf}; + +pub const EXT: &str = "atranscoder"; + +pub fn in_file_path(work_dir: &str, task_id: String) -> PathBuf { + Path::new(work_dir).join(format!("{}.in.atranscoder", task_id)) +} + +pub fn out_file_path(work_dir: &str, task_id: String) -> PathBuf { + Path::new(work_dir).join(format!("{}.out.atranscoder", task_id)) +} diff --git a/src/main.rs b/src/main.rs index 798faee..764217f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use crate::server::Server; use crate::thread_pool::ThreadPool; mod dto; +mod filepath; mod server; mod task; mod thread_pool; diff --git a/src/server.rs b/src/server.rs index 626af36..3ed0184 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,14 +15,15 @@ use tower_http::trace::TraceLayer; use tracing::{debug, error}; use uuid::Uuid; -use crate::dto::{ConvertRequest, ConvertResponse}; +use crate::dto::{ConvertRequest, ConvertResponse, ConvertURLRequest}; use crate::task::{Task, TaskParams}; use crate::thread_pool::ThreadPool; +use crate::filepath; +use crate::filepath::{in_file_path, out_file_path}; use axum::body::Body; use axum::body::Bytes; use futures_util::StreamExt; -use std::path::Path as StdPath; use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -69,6 +70,7 @@ impl Server { "/enqueue", post(enqueue_file).layer(DefaultBodyLimit::max(this.max_body_size)), ) + .route("/enqueue_url", post(enqueue_url)) .route("/get/:identifier", get(download_file)) .with_state(this) .layer(TraceLayer::new_for_http()); @@ -79,13 +81,58 @@ impl Server { } } +async fn enqueue_url( + State(server): State>, + Json(req): Json, +) -> (StatusCode, Json) { + let task_id = Uuid::new_v4(); + let input = in_file_path(&server.work_dir, task_id.to_string()); + let output = out_file_path(&server.work_dir, task_id.to_string()); + + let input_path = match input.to_str() { + Some(path) => path, + None => return error_response("Invalid input path"), + }; + let output_path = match output.to_str() { + Some(path) => path, + None => return error_response("Invalid output path"), + }; + + let params = TaskParams { + format: req.format, + codec: req.codec, + codec_opts: req.codec_opts, + bit_rate: req.bit_rate, + max_bit_rate: req.max_bit_rate, + sample_rate: req.sample_rate, + channel_layout: req.channel_layout, + callback_url: req.callback_url, + input_path: input_path.to_string(), + output_path: output_path.to_string(), + url: Some(req.url), + max_body_size: server.max_body_size, + }; + let task = Task::new(task_id, params); + + // Enqueue the task to the thread pool + server.thread_pool.enqueue(task); + + ( + StatusCode::CREATED, + Json::from(ConvertResponse { + id: Some(task_id.to_string()), + error: None, + }), + ) +} + async fn enqueue_file( State(server): State>, TypedMultipart(req): TypedMultipart, ) -> (StatusCode, Json) { let task_id = Uuid::new_v4(); - let input = StdPath::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id)); - let output = StdPath::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id)); + let input = in_file_path(&server.work_dir, task_id.to_string()); + let output = out_file_path(&server.work_dir, task_id.to_string()); let file = req.file; @@ -111,6 +158,8 @@ async fn enqueue_file( callback_url: req.callback_url, input_path: input_path.to_string(), output_path: output_path.to_string(), + url: None, + max_body_size: server.max_body_size, }; let task = Task::new(task_id, params); @@ -133,8 +182,7 @@ async fn download_file( State(server): State>, Path(identifier): Path, ) -> Result { - let file_name = format!("{}.out.atranscoder", identifier); - let file_path = StdPath::new(&server.work_dir).join(file_name); + let file_path = out_file_path(&server.work_dir, identifier); if !file_path.exists() { return Err(StatusCode::NOT_FOUND); @@ -201,7 +249,7 @@ async fn cleanup_directory(dir_path: &str, ttl: u64) -> Result<(), Box Result<(), Box> { + if let Some(download_url) = &self.params.url { + if let Err(err) = download_file( + download_url, + &self.params.input_path, + self.params.max_body_size, + ) { + std::fs::remove_file(Path::new(&self.params.input_path)).ok(); + if let Err(send_err) = send_error( + self.id, + &format!("Couldn't download the file: {}", err), + self.params.callback_url, + ) { + eprintln!("Failed to send error callback: {}", send_err); + } + return Err(err); + } + } + if let Err(err) = self.clone().transcode() { std::fs::remove_file(Path::new(&self.params.input_path)).ok(); std::fs::remove_file(Path::new(&self.params.output_path)).ok(); send_error( self.id, format!("Couldn't transcode: {}", err).as_str(), - &self.params.callback_url, + self.params.callback_url, ) .ok(); return Err(err); @@ -33,18 +54,18 @@ impl Task { std::fs::remove_file(Path::new(&self.params.input_path)).ok(); - if let Err(err) = send_ok(self.id, &self.params.callback_url) { + if let Err(err) = send_ok(self.id, self.params.clone().callback_url) { error!( "couldn't send result callback for job id={}, url {}: {}", &self.id.to_string(), - &self.params.callback_url, + &self.params.callback_url.unwrap_or_default(), err ); } else { debug!( "job id={} result was sent to callback {}", &self.id.to_string(), - &self.params.callback_url + &self.params.callback_url.unwrap_or_default() ); } @@ -91,7 +112,7 @@ impl Task { bit_rate: self.params.bit_rate, max_bit_rate: self.params.max_bit_rate, sample_rate: self.params.sample_rate, - channel_layout: match self.params.channel_layout.as_str() { + channel_layout: match self.params.channel_layout.unwrap_or_default().as_str() { "stereo" => ChannelLayout::STEREO, "mono" => ChannelLayout::MONO, "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, @@ -179,21 +200,67 @@ pub struct TaskParams { pub format: String, pub codec: String, pub codec_opts: Option, - pub bit_rate: usize, - pub max_bit_rate: usize, + pub bit_rate: Option, + pub max_bit_rate: Option, pub sample_rate: i32, - pub channel_layout: String, + pub channel_layout: Option, + pub url: Option, pub input_path: String, pub output_path: String, - pub callback_url: String, + pub callback_url: Option, + pub max_body_size: usize, +} + +fn download_file(url: &str, output_path: &str, max_size: usize) -> Result<(), Box> { + let response = ureq::get(url).call(); + + match response { + Ok(response) => { + if response.status() != 200 { + return Err(format!("Failed to download file: HTTP {}", response.status()).into()); + } + + let mut reader = response.into_reader(); + let mut file = File::create(output_path)?; + let mut buffer = vec![0; 8 * 1024]; // Read in 8KB chunks + let mut total_size = 0; + + loop { + let bytes_read = reader.read(&mut buffer)?; + if bytes_read == 0 { + break; + } + + total_size += bytes_read; + if total_size > max_size { + return Err("Response body exceeds the limit".into()); + } + + file.write_all(&buffer[..bytes_read])?; + } + } + Err(UreqError::Status(code, _response)) => { + return Err(format!("Failed to download file: HTTP {}", code).into()); + } + Err(e) => { + return Err(format!("Failed to make request: {}", e).into()); + } + } + + Ok(()) } fn send_error( id: uuid::Uuid, error: &str, - url: &str, -) -> Result> { - let response = ureq::post(url) + maybe_url: Option, +) -> Result<(), Box> { + let url = maybe_url.unwrap_or_default(); + if url.is_empty() { + return Ok(()); + } + + let response = ureq::post(url.as_str()) .set("Content-Type", "application/json") .send_json(ConvertResponse { id: Some(id.to_string()), @@ -201,7 +268,7 @@ fn send_error( })?; if response.status() == 200 { - Ok(response) + Ok(()) } else { Err(format!( "failed to send callback to {}. Status: {}", @@ -212,8 +279,13 @@ fn send_error( } } -fn send_ok(id: uuid::Uuid, url: &str) -> Result> { - let response = ureq::post(url) +fn send_ok(id: uuid::Uuid, maybe_url: Option) -> Result<(), Box> { + let url = maybe_url.unwrap_or_default(); + if url.is_empty() { + return Ok(()); + } + + let response = ureq::post(url.as_str()) .set("Content-Type", "application/json") .send_json(ConvertResponse { id: Some(id.to_string()), @@ -221,7 +293,7 @@ fn send_ok(id: uuid::Uuid, url: &str) -> Result, - pub bit_rate: usize, - pub max_bit_rate: usize, + pub bit_rate: Option, + pub max_bit_rate: Option, pub sample_rate: i32, pub channel_layout: ffmpeg::channel_layout::ChannelLayout, } @@ -86,16 +86,22 @@ impl Transcoder { .ok_or("no supported formats found for codec")?, ); - encoder.set_bit_rate(if params.bit_rate > 0 { - params.bit_rate - } else { - decoder.bit_rate() - }); - encoder.set_max_bit_rate(if params.max_bit_rate > 0 { - params.max_bit_rate - } else { - decoder.max_bit_rate() - }); + if let Some(bit_rate) = params.bit_rate { + encoder.set_bit_rate(if bit_rate > 0 { + bit_rate + } else { + decoder.bit_rate() + }); + } + + if let Some(max_bit_rate) = params.max_bit_rate { + encoder.set_max_bit_rate(if max_bit_rate > 0 { + max_bit_rate + } else { + decoder.max_bit_rate() + }); + } + encoder.set_time_base((1, sample_rate)); output.set_time_base((1, sample_rate));