I am currently pursuing my undergrad degree in computer engineering. Being a student, I have lots of data to store, courses, reference materials, etc. I used to use uni's Google Drive for storage, but google recently capped the storage. So, it made me wonder where I could store all of my data. I tried telegram because of its file limit, but it was too slow. I used to store small files, which I frequently used to share on Discord, mainly because it provided unrestricted access to people having the link. Discord had and still has a very reliable storage architecture and network. It is a good choice for me to store the files, but with only one limitation, we can only upload 8 MB files without nitro.
What if I split the files into small chunks of ~8 MB each, and then upload them? Downloading them would be easy, just a synchronous fetch stream on the list of buffers, served (and cached) through serverless functions. Although this sounds pretty easy, there are a lot of factors to consider, machine's uploading capacity to handle big files, rate limits imposed, serverless rate limits on the free plan, and much more that would come along the way in developing the application/service. I think this would be fun and a great learning experience, so why not try it out ;)
I was thinking of Node.js for the tech stack, mainly because I could use the stream. Transform class to convert the file into chunks, but then there could be cases where I could run out of memory. This could be a problem, however, there could be ways in which I could handle this, I thought of using rust for it. Why? Simply because I wanted to work on a project in Rust for so long. The last “real” project I had in Rust was in 2021, Mayuzumi, I worked on it for almost 6 months, then the project was dropped. I enjoyed using Rust for that project. So now that I decided what language to use, now I had to decide what crates should I use. The requirements were simple:
should split the files into chunks
should not run out of memory
should upload them to discord
while uploading, it should not block other processes
The last point is important because imagine you have a compressed file of about 1 GB, and after the chunking, you would have roughly 125 files, which would be needed to upload to discord. Assuming it would take almost a minute to upload them to discord, the whole process would take around 125 minutes, or roughly 2 hours, just to upload a single GB. To tackle this, we would leverage the threading mechanism. With this initial idea and requirements in mind, I started looking for libraries. One of the main requirements, should not run out of memory was a bit tricky, but after some digging, I was able to find StreamingIterator
crate, which would help me avoid using the memory most of the time, as, theoretically, we could modify it to directly stream chunks from the file system itself. I gathered some more libraries' Rayon, Serenity. These are not the “only” libraries we need, but I think these are enough to start with the MVP. At this point, I just had the libraries which I would use, but didn't try any of them.
17 Jan '23 - As I said, I didn't try any of the libraries, but while implementing it, it turns out that it may end up taking more memory than the inbuilt BufReader
. So, I resolved to BufReader
to read data chunk by chunk, each chunk being 7 MB. And for the uploading part, I chose tokio
because it would be an upload operation, for which tokio
is more suited than rayon
.
Storing the Data
Here is the initial code I wrote to test the file chunking logic.
use futures::future::join_all;
use std::{
fs::File,
sync::Arc,
io::{BufReader, Read}
};
use tokio::{sync::Semaphore, time};
async fn chunk_file() {
// semaphore to allow 24 concurrent upload
// to avoid ratelimit
let semaphore = Arc::new(Semaphore::new(24));
// open the file, for testing, rq.rar file was ~2GB
let file = File::open("trial/rq.rar").unwrap();
// use BufReader to add buffering to the file
let mut reader = BufReader::new(file);
// initialize this to the size of the chunk we want to generate
let buffer_size = 7 * 1024 * 1024; // 7MB buffer
// create a mutable vector with (7 * 1024 * 1024) 0s
let mut buffer = vec![0; buffer_size];
// initialize mut index
// it would be used to identify the chunks while joining
let mut i = 0;
// an vector to store all the future tasks
let mut handles = vec![];
// iterate infinitely
loop {
// read the file data into our buffer
// reads only 7MB at a time
let bytes_read = reader.read(&mut buffer).unwrap();
// break the loop if no bytes are left to read
if bytes_read == 0 {
break;
}
// take a slice of the data we read
// converts it to a Vector<u8>
// because in the last iteration the data might be smaller
// it may be 2.3MB instead of 7MB
// that case is handled
let data = (&buffer[..bytes_read]).to_vec();
// spawn a background process to upload the chunk to discord
// runs concurrently (in parallel)
// assign handle to it
let handle = tokio::spawn(async move {
// try to acquire a semaphore lock
// if none exists, wait for earlier tasks to finish
let _permit = semaphore_handle.acquire().await.unwrap();
// after getting a permit, upload to discord
upload_chunk(i, data).await;
// upload complete, we drop the permit
// next chunk in the queue can now start
drop(_permit);
});
// push that handle to our futures group
handles.push(handle);
// increment the index
i += 1;
}
// wait for all the futures to be completed
join_all(handles).await;
}
Takes ~1 second to split a 2 GB file into 293 chunks of 7 MB each. Assuming 24 concurrent tasks don't hit the rate limit, and it takes around 30 seconds to upload 7 MB, we can upload 2 GB of data in around ~6 minutes. But it's highly likely that we could hit the rate limit, so the only way to know it is by actually trying it. We also have not considered the internet upload speed here, which may also be a crucial factor while estimating the total time to upload the file.
So, now we move to the upload part. For this, I first initialized all webhooks using the serenity library concurrently. Since Http
from serenity doesn't implement Clone
, I had to wrap it in Arc
.
#[tokio::main]
async fn main() {
// ...
// ... initialization code
// create an vector to store the webhooks
let webhooks = Arc::new(Mutex::new(vec![]));
// wrap the http new into an arc
// we can now clone it and access it concurrently
let http = Arc::new(Http::new(""));
// an vector to store futures
let mut webhook_handles = vec![];
// iterate over the webhook URLs
for webhook in cfg.webhooks {
// clone to access them from spawned tasks
let http = http.clone();
let webhook_clone = webhooks.clone();
// spawm tasks to initialize the webhooks
let handle = tokio::spawn(async move {
let webhook = Webhook::from_url(http.as_ref(), &webhook)
.await
.expect(&format!("Failed to get webhook from url: {}", webhook));
// access the webhook vector
let mut webhook_vec = webhook_clone.lock().unwrap();
// push the created webhook
webhook_vec.push(webhook);
});
// push it to the future vector
webhook_handles.push(handle);
}
// wait for all the futures to be over
join_all(webhook_handles).await;
}
Now that all the webhooks have been initialized, I have to pass them to my chunk_files function, so that the chunked files can be uploaded. For this, I had two options, pass the instance of Http
and each Webhook
as it is, or create a struct to send them. The first one is an easy option, and the most sensible one, but I went with the second option, which is to create a struct to combine Http
and Webhook
, and pass it. The WebhookData
struct would need to implement Clone
as it is a custom structure, which would be needed to clone while using it concurrently. This is how it would look.
struct WebhookData {
http: Arc<Http>,
webhook: Webhook,
}
impl Clone for WebhookData {
fn clone(&self) -> Self {
WebhookData {
http: self.http.clone(),
webhook: self.webhook.clone(),
}
}
}
Now that we have our struct, we need to change some of our code to support the passing of the data.
let webhooks: Arc<Mutex<Vec<WebhookData>>> = Arc::new(Mutex::new(vec![]));
let http = Arc::new(Http::new(""));
let mut webhook_handles = vec![];
for webhook in cfg.webhooks {
let http = http.clone();
let webhook_clone = webhooks.clone();
let handle = tokio::spawn(async move {
let webhook = Webhook::from_url(http.as_ref(), &webhook)
.await
.expect(&format!("Failed to get webhook from url: {}", webhook));
let mut webhook_vec = webhook_clone.lock().unwrap();
webhook_vec.push({
WebhookData {
http: http.clone(),
webhook: webhook.clone(),
}
});
});
webhook_handles.push(handle);
}
join_all(webhook_handles).await;
I ran the code, and it took 558.476888248
seconds, which is approximately 9 minutes. It's a lot for a 2 GB file, but it uploaded ~295 chunks of files, all that while respecting the rate limits of discord. The average upload speed I could get was ~6.5 MiB/s.
Now, let's experiment, all the rate limits are handled by the Http
handler we declared, but what if we declared two Http
handlers instead of one. This may or may not work, as serenity
handles rate limits based on the response headers of the webhooks, but trying doesn't hurt, does it?
So, now we assign all odd index Http
handler http_one
, and the even index Http
handler http_two
. So, now our code becomes something like this.
let http_one = Arc::new(Http::new(""));
let http_two = Arc::new(Http::new(""));
let mut webhook_handles = vec![];
let mut webhook_index = 1;
for webhook in cfg.webhooks {
let http = if webhook_index % 2 != 0 {
http_one.clone()
} else {
http_two.clone()
};
let webhook_clone = webhooks.clone();
let handle = tokio::spawn(async move {
let webhook = Webhook::from_url(http.as_ref(), &webhook)
.await
.expect(&format!("Failed to get webhook from url: {}", webhook));
let mut webhook_vec = webhook_clone.lock().unwrap();
webhook_vec.push({
WebhookData {
http: http.clone(),
webhook: webhook.clone(),
}
});
});
webhook_handles.push(handle);
webhook_index += 1;
}
join_all(webhook_handles).await;
But, as expected this did not work, it still took 556.185072046
seconds to upload the same file, which estimates to be around ~9 minutes, so not much of a difference. Kudos to the authors of serenity
though, they have implemented rate limit handling in such a neat way.
So we now got chunked file upload feature sorted, now there are two more main features left, storing the links in the database-so that it is possible to retrieve the chunks from discord and merge them into a single file, and the other feature being is to download the chunks from discord and merge them into a single file with the order of data from the database. I'm calling it a day now, I have spent more than 7 hours coding on this project today.
18 January '23 - Today, I have to figure out which database to choose, implement the code to download, and merge all the chunks into a single file.
Database
So, for the database, I decided to go with Google's Firestore
, because I also have plans to convert this into a multi-device application. Firestore
would fit all my needs. Fortunately, there is a library firestore
which is being actively maintained, with its latest version released 3 days ago. So, I went ahead and created a new Firebase project.
Now, we have the firestore database ready, we create a service account, because we would need a token to connect to our firestore database. Now, we have to configure the rules, I am thinking of adding an auth_token field in all data sent, not the best possible way to do it, but meh
rules_version = '2';
service cloud.firestore {
match /databases/{database}/documents {
match /{document=**} {
allow read, write: if request.resource.data["auth_token"] == "your_unique_auth_token";
}
}
}
Now, let's grab our service account file.
Now, that we have firestore setup, let's install the firestore
dependency into our project. After installing it, we need to configure it to use our service account. This is going to be tricky, as there is no documentation on how to use it with a service account, it expects the settings to be in the GKE environment, however, it is possible to use a service account, but the builds of the underlying library gcloud-sdk
, are continuously failing. The documentation mentions that we could set an environment variable, but I believe that it may coincide with any developer environment the user may configure. So, we have to go through the source code to find out. Created an issue for the documentation bit, so we wait till we get any updates.
Fortunately, the author was quick, and it was clear that there was no other option, but to set the environment variable. We would use the set_var
to set the path of the config file. I have not coded much today, so let's get into it!
// set the environment variable for the process
set_var(
"GOOGLE_APPLICATION_CREDENTIALS",
cfg.service_account_file_location,
);
// create an firestore instance
let db = FirestoreDb::new("discord-ddrive").await.unwrap();
With much effort, I configured it to store data in firestore. Much of it is not documented here, because of the time limitations I had today, I wanted to complete both, storing into the database and retrieving today, but only ended up storing them in the database.
Here is what I did today in brief, I created a separate function called add_to_database
, which stores the data in our firestore database. It appends every chunk's metadata to a field - parts
in the document, which is an array. When retrieving, I would use this array to download the chunks and merge them in order. This is somewhat the backbone of the whole "after-upload" thingy.
async fn add_to_database(db: FirestoreDb, name: String, part: i32, attachment_url: String) {
let master_directory_child: MasterDirectoryChildPart = MasterDirectoryChildPart {
name: name.clone(),
id: format!("{}.part{}", &name, part),
part: part,
parent: name,
url: attachment_url,
};
let mut transaction = db.begin_transaction().await.unwrap();
let _added = db
.fluent()
.update()
.fields(paths!(MasterDirectoryChild::parts))
.in_col(MASTER_DIRECTORY_COLLECTION_NAME)
.document_id(&master_directory_child.name)
.transforms(|transform_builder| {
vec![transform_builder
.field(path!(MasterDirectoryChild::parts))
.append_missing_elements([&master_directory_child])
.unwrap()]
})
.only_transform()
.add_to_transaction(&mut transaction)
.unwrap();
transaction.commit().await.unwrap();
}
This code adds the chunk data into parts
- which is an array field.
Today was a not-so-productive day IMO, I only coded for over more than 2.5 hours today.
Only 4 minutes left for the day to be over. Tomorrow I would focus on retrieving and merging the data into a single file as it was before.
19 January '23 - Today I have to work on merging the file back into one piece. So, the initial idea I have in mind is to first download all parts on disk for redundancy, and after all chunks have been downloaded, open the file in append mode, iterate over the chunks on disk, and then append the chunk to the main file, and delete the downloaded chunk file. This would avoid filling the memory.
Retrieving the Data
So, right now, the plan is to use BufWriter
with a capacity of 1 GB. This would make sure that we don't have frequent disk operations for every 7 MB file, and data is flushed into the main file only after 1 GB of the buffer memory is filled. For this, we would need to download the files from discord, so I'll install the reqwest
crate.
Now, I create a function retrieve_and_save
to download chunks, and store them on disk. It gets the document from the database and then iterates over the parts
field, spawns a tokio
task to download the chunk using the reqwest
crate, and then saves it to a file on disk. Now, we are using tokio
to spawn the tasks, and semaphores
to only allow 24 concurrent download tasks at once, we also have to make sure we do not block the execution of task using file operations. This may lead to poor performance and it may delay the execution of the next task. To solve this, we are going to use the non-blocking file module provided by tokio
crate. This is how my function looks right now.
async fn retrieve_and_save(db: FirestoreDb, name: &str) {
let semaphore = Arc::new(Semaphore::new(24));
let master_directory: MasterDirectoryChild = db
.fluent()
.select()
.by_id_in(MASTER_DIRECTORY_COLLECTION_NAME)
.obj()
.one(name)
.await
.unwrap()
.unwrap();
let mut part_ids_children = master_directory.parts.to_vec();
part_ids_children.sort_by(|a, b| a.part.cmp(&b.part));
let part_ids = part_ids_children
.iter()
.map(|part| part.id.clone())
.collect::<Vec<String>>();
let mut handles = vec![];
for part in master_directory.parts {
let semaphore = semaphore.clone();
let handle = tokio::spawn(async move {
let permit = semaphore.acquire().await.unwrap();
let response = reqwest::get(&part.url)
.await
.unwrap()
.bytes()
.await
.unwrap();
let mut file = tokio::fs::File::create(format!("chunks/{}", part.id))
.await
.unwrap();
file.write_all(&response)
.await
.expect("Couldn't write to file");
file.flush().await.unwrap();
drop(permit);
drop(response);
});
handles.push(handle);
}
join_all(handles).await;
}
the notable things here are tokio::fs::File::create
, file.write_all
and flush
. It is what it looks like, first it creates the files, then it writes the response to the file, and then makes sure everything is flushed down in the file before continuing.
Now, we have to move ahead and merge all of them into one. This one is really easy, just use BufWriter
to create an in-memory buffer of some capacity, I am thinking of 500MB. It would then open the chunk file and then use the std::io::copy
function to copy the data from reader
to writer
. The maximum amount of data that will be held in memory at any given time is the buffer size that we set, which is 500MB. After merging, it deletes the chunk file, so that we don't hog the disk space.
Here is how I implemented it.
fn merge_files(files: Vec<String>, name: &str) {
let output_file = File::create(name).unwrap();
// 500MB capacity
let mut writer = BufWriter::with_capacity(1024 * 512 * 500,output_file);
for file in files {
let input_file = File::open(format!("chunks/{}", file)).unwrap();
let mut reader = BufReader::new(input_file);
// Copy the contents of the input file to the output file
std::io::copy(&mut reader, &mut writer).unwrap();
// Delete the input file
std::fs::remove_file(format!("chunks/{}", file)).unwrap();
}
println!("Successfully Merged files!");
}
And yes, the file exists and I was able to successfully un-archive it
We did it \o/
Final Steps
Now that our MVP is ready, let's clean up our codebase, and implement it in command line format!
Creating a CLI application
I don't have any experience in creating CLI programs in rust, so I did a quick google search and found the awesome Rust CLI Book. I gave it a quick read and started playing around. I played more, then I stumbled across structopt
, it is a great library to parse command line arguments. I skimmed through the docs and created an initial parser.
This is how it looks:
#[derive(StructOpt, Debug)]
#[structopt(name = "discord_drive")]
struct Opts {
#[structopt(subcommand)]
command: Command,
}
#[derive(StructOpt, Debug)]
enum Command {
#[structopt(name = "store")]
Store {
#[structopt(name = "file", parse(from_os_str))]
file: std::path::PathBuf,
},
#[structopt(name = "retrieve")]
Retrieve {
#[structopt(name = "word")]
word: String,
},
}
#[tokio::main]
async fn main() {
let opts = Opts::from_args();
let start = Instant::now();
println!("Hello, world!");
let config_path = Path::new("config/config.toml");
let cfg: DriveConfig = confy::load_path(config_path).unwrap();
set_var(
"GOOGLE_APPLICATION_CREDENTIALS",
cfg.service_account_file_location,
);
let db = FirestoreDb::new("discord-ddrive").await.unwrap();
if cfg.webhooks.len() <= 0 {
panic!("No webhooks found in config file")
}
let webhooks: Arc<Mutex<Vec<WebhookData>>> = Arc::new(Mutex::new(vec![]));
let http_one = Arc::new(Http::new(""));
let http_two = Arc::new(Http::new(""));
let mut webhook_handles = vec![];
let mut webhook_index = 1;
for webhook in cfg.webhooks {
let http = if webhook_index % 2 != 0 {
http_one.clone()
} else {
http_two.clone()
};
let webhook_clone = webhooks.clone();
let handle = tokio::spawn(async move {
let webhook = Webhook::from_url(http.as_ref(), &webhook)
.await
.expect(&format!("Failed to get webhook from url: {}", webhook));
let mut webhook_vec = webhook_clone.lock().unwrap();
webhook_vec.push({
WebhookData {
http: http.clone(),
webhook: webhook.clone(),
}
});
});
webhook_handles.push(handle);
webhook_index += 1;
}
join_all(webhook_handles).await;
print!("Webhooks: {} ", webhooks.lock().unwrap().len());
let webhooks_clone = webhooks.clone().lock().unwrap().to_vec();
match opts.command {
Command::Store { file } => {
// check if file is a directory
if file.is_dir() {
eprintln!("Error: {} is a directory", file.display());
std::process::exit(1);
}
// store the file
println!("Storing file: {}", file.display());
chunk_file(file, webhooks_clone, db).await;
}
Command::Retrieve { word } => {
println!("Retrieving word: {}", word);
retrieve_and_save(db, "rq.rar").await;
}
}
let duration = start.elapsed();
println!("Time taken: {:?}s", duration);
}
That's all!
Future Scope
This is an MVP, there are many things still left to do, but one of the main things is the organization. Professional Rustaceans might be annoyed because of the single file structure, but it would be sorted in future iterations :eyes:
There are a lot of features that would be implemented
storing whole directories
able to view and download the data directly from the browser itself.
and many more...
Source Code
The source code for this blog is available on Github: https://github.com/officialpiyush/discord-drive
Conclusion
These three days were very interesting, I learned a lot, coded a lot, and experienced a lot of new unexplored stuff. I enjoyed creating this project. I think I would work more on this project, if I do, I would most probably write a Part 2 blog for it.
I spent almost 13 hours coding into this project