enqueue file by url (helps with bigger files)

This commit is contained in:
Pavel 2024-05-29 18:32:17 +03:00
parent 9392714f7f
commit 03057706f8
8 changed files with 223 additions and 49 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "atranscoder-rpc" name = "atranscoder-rpc"
version = "0.1.0" version = "0.1.1"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -15,19 +15,42 @@ curl --location 'http://localhost:8090/enqueue' \
--form 'file=@"/home/user/Music/test.mp3"' \ --form 'file=@"/home/user/Music/test.mp3"' \
--form 'format="mp4"' \ --form 'format="mp4"' \
--form 'codec="libfdk_aac"' \ --form 'codec="libfdk_aac"' \
--form 'codecOpts="profile=aac_he"' \ --form 'codec_opts="profile=aac_he"' \
--form 'bitRate="160000"' \ --form 'bit_rate="64000"' \
--form 'maxBitRate="160000"' \ --form 'max_bit_rate="64000"' \
--form 'sampleRate="44100"' \ --form 'sample_rate="44100"' \
--form 'channelLayout="stereo"' \ --form 'channel_layout="stereo"' \
--form 'callbackUrl="http://127.0.0.1:8909/callback"' --form 'callback_url="http://127.0.0.1:8909/callback"'
``` ```
3. Your `callbackUrl` will receive JSON response with job ID and error in case of failure. Error will be null if transcoding was successful. 3. Your `callback_url` will receive JSON response with job ID and error in case of failure. Error will be null if transcoding was successful.
4. You can download transcoded file like this (replace `job_id` with the ID you've received): 4. You can download transcoded file like this (replace `job_id` with the ID you've received):
```bash ```bash
curl -L http://localhost:8090/get/job_id -o file.mp4 curl -L http://localhost:8090/get/job_id -o file.mp4
``` ```
You can also enqueue a remote file like this:
```bash
curl --location 'http://localhost:8090/enqueue_url' \
--header 'Content-Type: application/json' \
--data '{
"format": "mp4",
"codec": "libfdk_aac",
"codec_opts": "profile=aac_he",
"bit_rate": 64000,
"max_bit_rate": 64000,
"sample_rate": 44100,
"channel_layout": "stereo",
"url": "https://upload.wikimedia.org/wikipedia/commons/c/c8/Example.ogg",
"callback_url": "http://127.0.0.1:8909/callback"
}'
```
Mandatory fields:
- `format`
- `codec`
- `sample_rate`
- `url` (for `/enqueue_url`)
You can change configuration using this environment variables: You can change configuration using this environment variables:
- `LISTEN` - change this environment variable to change TCP listen address. Default is `0.0.0.0:8090`. - `LISTEN` - change this environment variable to change TCP listen address. Default is `0.0.0.0:8090`.
- `NUM_WORKERS` - can be used to change how many threads will be used to transcode incoming files. Default is equal to logical CPUs. - `NUM_WORKERS` - can be used to change how many threads will be used to transcode incoming files. Default is equal to logical CPUs.
@ -44,7 +67,8 @@ You can change configuration using this environment variables:
- [x] Do not upload files directly, add download route with streaming instead. - [x] Do not upload files directly, add download route with streaming instead.
- [x] Conversion from OGG Opus mono to HE-AAC v1 Stereo outputs high-pitched crackling audio. - [x] Conversion from OGG Opus mono to HE-AAC v1 Stereo outputs high-pitched crackling audio.
- [x] Conversion from OGG Opus mono to AAC sometimes crashes the app with SIGSEGV (this can be seen more often with very short audio). - [x] Conversion from OGG Opus mono to AAC sometimes crashes the app with SIGSEGV (this can be seen more often with very short audio).
- [ ] If FFmpeg fails, `send_error` won't be called - fix that. - [x] ~~If FFmpeg fails, `send_error` won't be called - fix that.~~ It actually works, I just didn't notice before.
- [x] Ability to enqueue a remote file.
- [ ] Default errors are returned in plain text. Change it to the JSON. - [ ] Default errors are returned in plain text. Change it to the JSON.
- [ ] Docker image for `amd64` and `arm64` (currently only `amd64` is supported because `arm64` cross-compilation with QEMU is sloooooooooooowwwww...). - [ ] Docker image for `amd64` and `arm64` (currently only `amd64` is supported because `arm64` cross-compilation with QEMU is sloooooooooooowwwww...).
- [ ] Tests! - [ ] Tests!

View File

@ -9,17 +9,29 @@ pub struct ConvertResponse {
} }
#[derive(TryFromMultipart)] #[derive(TryFromMultipart)]
#[try_from_multipart(rename_all = "camelCase")]
pub struct ConvertRequest { pub struct ConvertRequest {
pub format: String, pub format: String,
pub codec: String, pub codec: String,
pub codec_opts: Option<String>, pub codec_opts: Option<String>,
pub bit_rate: usize, pub bit_rate: Option<usize>,
pub max_bit_rate: usize, pub max_bit_rate: Option<usize>,
pub sample_rate: i32, pub sample_rate: i32,
pub channel_layout: String, pub channel_layout: Option<String>,
pub callback_url: String, pub callback_url: Option<String>,
#[form_data(limit = "1GiB")] #[form_data(limit = "1GiB")]
pub file: FieldData<NamedTempFile>, pub file: FieldData<NamedTempFile>,
} }
#[derive(Serialize, Deserialize)]
pub struct ConvertURLRequest {
pub format: String,
pub codec: String,
pub codec_opts: Option<String>,
pub bit_rate: Option<usize>,
pub max_bit_rate: Option<usize>,
pub sample_rate: i32,
pub channel_layout: Option<String>,
pub url: String,
pub callback_url: Option<String>,
}

11
src/filepath.rs Normal file
View File

@ -0,0 +1,11 @@
use std::path::{Path, PathBuf};
pub const EXT: &str = "atranscoder";
pub fn in_file_path(work_dir: &str, task_id: String) -> PathBuf {
Path::new(work_dir).join(format!("{}.in.atranscoder", task_id))
}
pub fn out_file_path(work_dir: &str, task_id: String) -> PathBuf {
Path::new(work_dir).join(format!("{}.out.atranscoder", task_id))
}

View File

@ -6,6 +6,7 @@ use crate::server::Server;
use crate::thread_pool::ThreadPool; use crate::thread_pool::ThreadPool;
mod dto; mod dto;
mod filepath;
mod server; mod server;
mod task; mod task;
mod thread_pool; mod thread_pool;

View File

@ -15,14 +15,15 @@ use tower_http::trace::TraceLayer;
use tracing::{debug, error}; use tracing::{debug, error};
use uuid::Uuid; use uuid::Uuid;
use crate::dto::{ConvertRequest, ConvertResponse}; use crate::dto::{ConvertRequest, ConvertResponse, ConvertURLRequest};
use crate::task::{Task, TaskParams}; use crate::task::{Task, TaskParams};
use crate::thread_pool::ThreadPool; use crate::thread_pool::ThreadPool;
use crate::filepath;
use crate::filepath::{in_file_path, out_file_path};
use axum::body::Body; use axum::body::Body;
use axum::body::Bytes; use axum::body::Bytes;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::path::Path as StdPath;
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
@ -69,6 +70,7 @@ impl Server {
"/enqueue", "/enqueue",
post(enqueue_file).layer(DefaultBodyLimit::max(this.max_body_size)), post(enqueue_file).layer(DefaultBodyLimit::max(this.max_body_size)),
) )
.route("/enqueue_url", post(enqueue_url))
.route("/get/:identifier", get(download_file)) .route("/get/:identifier", get(download_file))
.with_state(this) .with_state(this)
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());
@ -79,13 +81,58 @@ impl Server {
} }
} }
async fn enqueue_url(
State(server): State<Arc<Server>>,
Json(req): Json<ConvertURLRequest>,
) -> (StatusCode, Json<ConvertResponse>) {
let task_id = Uuid::new_v4();
let input = in_file_path(&server.work_dir, task_id.to_string());
let output = out_file_path(&server.work_dir, task_id.to_string());
let input_path = match input.to_str() {
Some(path) => path,
None => return error_response("Invalid input path"),
};
let output_path = match output.to_str() {
Some(path) => path,
None => return error_response("Invalid output path"),
};
let params = TaskParams {
format: req.format,
codec: req.codec,
codec_opts: req.codec_opts,
bit_rate: req.bit_rate,
max_bit_rate: req.max_bit_rate,
sample_rate: req.sample_rate,
channel_layout: req.channel_layout,
callback_url: req.callback_url,
input_path: input_path.to_string(),
output_path: output_path.to_string(),
url: Some(req.url),
max_body_size: server.max_body_size,
};
let task = Task::new(task_id, params);
// Enqueue the task to the thread pool
server.thread_pool.enqueue(task);
(
StatusCode::CREATED,
Json::from(ConvertResponse {
id: Some(task_id.to_string()),
error: None,
}),
)
}
async fn enqueue_file( async fn enqueue_file(
State(server): State<Arc<Server>>, State(server): State<Arc<Server>>,
TypedMultipart(req): TypedMultipart<ConvertRequest>, TypedMultipart(req): TypedMultipart<ConvertRequest>,
) -> (StatusCode, Json<ConvertResponse>) { ) -> (StatusCode, Json<ConvertResponse>) {
let task_id = Uuid::new_v4(); let task_id = Uuid::new_v4();
let input = StdPath::new(&server.work_dir).join(format!("{}.in.atranscoder", task_id)); let input = in_file_path(&server.work_dir, task_id.to_string());
let output = StdPath::new(&server.work_dir).join(format!("{}.out.atranscoder", task_id)); let output = out_file_path(&server.work_dir, task_id.to_string());
let file = req.file; let file = req.file;
@ -111,6 +158,8 @@ async fn enqueue_file(
callback_url: req.callback_url, callback_url: req.callback_url,
input_path: input_path.to_string(), input_path: input_path.to_string(),
output_path: output_path.to_string(), output_path: output_path.to_string(),
url: None,
max_body_size: server.max_body_size,
}; };
let task = Task::new(task_id, params); let task = Task::new(task_id, params);
@ -133,8 +182,7 @@ async fn download_file(
State(server): State<Arc<Server>>, State(server): State<Arc<Server>>,
Path(identifier): Path<String>, Path(identifier): Path<String>,
) -> Result<impl IntoResponse, StatusCode> { ) -> Result<impl IntoResponse, StatusCode> {
let file_name = format!("{}.out.atranscoder", identifier); let file_path = out_file_path(&server.work_dir, identifier);
let file_path = StdPath::new(&server.work_dir).join(file_name);
if !file_path.exists() { if !file_path.exists() {
return Err(StatusCode::NOT_FOUND); return Err(StatusCode::NOT_FOUND);
@ -201,7 +249,7 @@ async fn cleanup_directory(dir_path: &str, ttl: u64) -> Result<(), Box<dyn std::
.and_then(OsStr::to_str) .and_then(OsStr::to_str)
.map(|ext| ext.to_lowercase()) .map(|ext| ext.to_lowercase())
{ {
if extension.eq("atranscoder") { if extension.eq(filepath::EXT) {
// Get the metadata of the file // Get the metadata of the file
let metadata = fs::metadata(&file_path).await?; let metadata = fs::metadata(&file_path).await?;

View File

@ -3,8 +3,11 @@ use crate::transcoder::{Transcoder, TranscoderParams};
use ffmpeg_next::channel_layout::ChannelLayout; use ffmpeg_next::channel_layout::ChannelLayout;
use ffmpeg_next::{format, Dictionary}; use ffmpeg_next::{format, Dictionary};
use std::error::Error; use std::error::Error;
use std::fs::File;
use std::io::{self, Write};
use std::path::Path; use std::path::Path;
use tracing::{debug, error}; use tracing::{debug, error};
use ureq::Error as UreqError;
use ureq::Response; use ureq::Response;
#[derive(Clone)] #[derive(Clone)]
@ -19,13 +22,31 @@ impl Task {
} }
pub fn execute(self) -> Result<(), Box<dyn Error>> { pub fn execute(self) -> Result<(), Box<dyn Error>> {
if let Some(download_url) = &self.params.url {
if let Err(err) = download_file(
download_url,
&self.params.input_path,
self.params.max_body_size,
) {
std::fs::remove_file(Path::new(&self.params.input_path)).ok();
if let Err(send_err) = send_error(
self.id,
&format!("Couldn't download the file: {}", err),
self.params.callback_url,
) {
eprintln!("Failed to send error callback: {}", send_err);
}
return Err(err);
}
}
if let Err(err) = self.clone().transcode() { if let Err(err) = self.clone().transcode() {
std::fs::remove_file(Path::new(&self.params.input_path)).ok(); std::fs::remove_file(Path::new(&self.params.input_path)).ok();
std::fs::remove_file(Path::new(&self.params.output_path)).ok(); std::fs::remove_file(Path::new(&self.params.output_path)).ok();
send_error( send_error(
self.id, self.id,
format!("Couldn't transcode: {}", err).as_str(), format!("Couldn't transcode: {}", err).as_str(),
&self.params.callback_url, self.params.callback_url,
) )
.ok(); .ok();
return Err(err); return Err(err);
@ -33,18 +54,18 @@ impl Task {
std::fs::remove_file(Path::new(&self.params.input_path)).ok(); std::fs::remove_file(Path::new(&self.params.input_path)).ok();
if let Err(err) = send_ok(self.id, &self.params.callback_url) { if let Err(err) = send_ok(self.id, self.params.clone().callback_url) {
error!( error!(
"couldn't send result callback for job id={}, url {}: {}", "couldn't send result callback for job id={}, url {}: {}",
&self.id.to_string(), &self.id.to_string(),
&self.params.callback_url, &self.params.callback_url.unwrap_or_default(),
err err
); );
} else { } else {
debug!( debug!(
"job id={} result was sent to callback {}", "job id={} result was sent to callback {}",
&self.id.to_string(), &self.id.to_string(),
&self.params.callback_url &self.params.callback_url.unwrap_or_default()
); );
} }
@ -91,7 +112,7 @@ impl Task {
bit_rate: self.params.bit_rate, bit_rate: self.params.bit_rate,
max_bit_rate: self.params.max_bit_rate, max_bit_rate: self.params.max_bit_rate,
sample_rate: self.params.sample_rate, sample_rate: self.params.sample_rate,
channel_layout: match self.params.channel_layout.as_str() { channel_layout: match self.params.channel_layout.unwrap_or_default().as_str() {
"stereo" => ChannelLayout::STEREO, "stereo" => ChannelLayout::STEREO,
"mono" => ChannelLayout::MONO, "mono" => ChannelLayout::MONO,
"stereo_downmix" => ChannelLayout::STEREO_DOWNMIX, "stereo_downmix" => ChannelLayout::STEREO_DOWNMIX,
@ -179,21 +200,67 @@ pub struct TaskParams {
pub format: String, pub format: String,
pub codec: String, pub codec: String,
pub codec_opts: Option<String>, pub codec_opts: Option<String>,
pub bit_rate: usize, pub bit_rate: Option<usize>,
pub max_bit_rate: usize, pub max_bit_rate: Option<usize>,
pub sample_rate: i32, pub sample_rate: i32,
pub channel_layout: String, pub channel_layout: Option<String>,
pub url: Option<String>,
pub input_path: String, pub input_path: String,
pub output_path: String, pub output_path: String,
pub callback_url: String, pub callback_url: Option<String>,
pub max_body_size: usize,
}
fn download_file(url: &str, output_path: &str, max_size: usize) -> Result<(), Box<dyn Error>> {
let response = ureq::get(url).call();
match response {
Ok(response) => {
if response.status() != 200 {
return Err(format!("Failed to download file: HTTP {}", response.status()).into());
}
let mut reader = response.into_reader();
let mut file = File::create(output_path)?;
let mut buffer = vec![0; 8 * 1024]; // Read in 8KB chunks
let mut total_size = 0;
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
total_size += bytes_read;
if total_size > max_size {
return Err("Response body exceeds the limit".into());
}
file.write_all(&buffer[..bytes_read])?;
}
}
Err(UreqError::Status(code, _response)) => {
return Err(format!("Failed to download file: HTTP {}", code).into());
}
Err(e) => {
return Err(format!("Failed to make request: {}", e).into());
}
}
Ok(())
} }
fn send_error( fn send_error(
id: uuid::Uuid, id: uuid::Uuid,
error: &str, error: &str,
url: &str, maybe_url: Option<String>,
) -> Result<Response, Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let response = ureq::post(url) let url = maybe_url.unwrap_or_default();
if url.is_empty() {
return Ok(());
}
let response = ureq::post(url.as_str())
.set("Content-Type", "application/json") .set("Content-Type", "application/json")
.send_json(ConvertResponse { .send_json(ConvertResponse {
id: Some(id.to_string()), id: Some(id.to_string()),
@ -201,7 +268,7 @@ fn send_error(
})?; })?;
if response.status() == 200 { if response.status() == 200 {
Ok(response) Ok(())
} else { } else {
Err(format!( Err(format!(
"failed to send callback to {}. Status: {}", "failed to send callback to {}. Status: {}",
@ -212,8 +279,13 @@ fn send_error(
} }
} }
fn send_ok(id: uuid::Uuid, url: &str) -> Result<Response, Box<dyn std::error::Error>> { fn send_ok(id: uuid::Uuid, maybe_url: Option<String>) -> Result<(), Box<dyn Error>> {
let response = ureq::post(url) let url = maybe_url.unwrap_or_default();
if url.is_empty() {
return Ok(());
}
let response = ureq::post(url.as_str())
.set("Content-Type", "application/json") .set("Content-Type", "application/json")
.send_json(ConvertResponse { .send_json(ConvertResponse {
id: Some(id.to_string()), id: Some(id.to_string()),
@ -221,7 +293,7 @@ fn send_ok(id: uuid::Uuid, url: &str) -> Result<Response, Box<dyn std::error::Er
})?; })?;
if response.status() == 200 { if response.status() == 200 {
Ok(response) Ok(())
} else { } else {
Err(format!( Err(format!(
"failed to send callback to {}. Status: {}", "failed to send callback to {}. Status: {}",

View File

@ -19,8 +19,8 @@ pub struct Transcoder {
pub struct TranscoderParams { pub struct TranscoderParams {
pub codec: String, pub codec: String,
pub codec_opts: Option<String>, pub codec_opts: Option<String>,
pub bit_rate: usize, pub bit_rate: Option<usize>,
pub max_bit_rate: usize, pub max_bit_rate: Option<usize>,
pub sample_rate: i32, pub sample_rate: i32,
pub channel_layout: ffmpeg::channel_layout::ChannelLayout, pub channel_layout: ffmpeg::channel_layout::ChannelLayout,
} }
@ -86,16 +86,22 @@ impl Transcoder {
.ok_or("no supported formats found for codec")?, .ok_or("no supported formats found for codec")?,
); );
encoder.set_bit_rate(if params.bit_rate > 0 { if let Some(bit_rate) = params.bit_rate {
params.bit_rate encoder.set_bit_rate(if bit_rate > 0 {
bit_rate
} else { } else {
decoder.bit_rate() decoder.bit_rate()
}); });
encoder.set_max_bit_rate(if params.max_bit_rate > 0 { }
params.max_bit_rate
if let Some(max_bit_rate) = params.max_bit_rate {
encoder.set_max_bit_rate(if max_bit_rate > 0 {
max_bit_rate
} else { } else {
decoder.max_bit_rate() decoder.max_bit_rate()
}); });
}
encoder.set_time_base((1, sample_rate)); encoder.set_time_base((1, sample_rate));
output.set_time_base((1, sample_rate)); output.set_time_base((1, sample_rate));