Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
729 changes: 729 additions & 0 deletions monarch_remotemount/README.md

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions monarch_remotemount/examples/run_apptainer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

rm -f /tmp/overlay.img
apptainer overlay create --size 2048 /tmp/overlay.img

rm -rf /tmp/apptainer-work
mkdir /tmp/apptainer-work

apptainer exec --containall --network none --workdir /tmp/apptainer-work --overlay /tmp/overlay.img /tmp/myapp/img.sif uv pip install requests numpy pandas
apptainer exec --containall --network none --workdir /tmp/apptainer-work --overlay /tmp/overlay.img /tmp/myapp/img.sif python -c "import pandas; print(pandas.__version__)"
7 changes: 7 additions & 0 deletions monarch_remotemount/examples/run_cached_pip.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -ex

python -m venv /tmp/myvenv
source /tmp/myvenv/bin/activate
pip install --no-index --find-links /tmp/flat_wheels torch transformers sentencepiece
6 changes: 6 additions & 0 deletions monarch_remotemount/examples/run_disk_large_file.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

( time cat /scratch/cpuhrsch/myfiledir/myfile.img > /dev/null ) 2> /tmp/total_time
cat /tmp/total_time
( time cat /scratch/cpuhrsch/myfiledir/myfile.img > /dev/null ) 2> /tmp/total_time
cat /tmp/total_time
8 changes: 8 additions & 0 deletions monarch_remotemount/examples/run_disk_time.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

find /scratch/cpuhrsch/venv -type f > /tmp/all_files
python -c "import random; random.seed(123); import sys; lines = sys.stdin.read().split('\n')[:-1]; random.shuffle(lines); print('\n'.join(lines))" < /tmp/all_files > /tmp/all_files_shuf

( time xargs -d '\n' cat < /tmp/all_files_shuf > /tmp/bigfile ) 2> /tmp/total_time

cat /tmp/total_time
7 changes: 7 additions & 0 deletions monarch_remotemount/examples/run_hf_example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -ex

source /scratch/cpuhrsch/venv/bin/activate
cd /scratch/cpuhrsch/venv
HF_HOME=/scratch/cpuhrsch/venv/hf_cache python hf_example.py
7 changes: 7 additions & 0 deletions monarch_remotemount/examples/run_import.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -ex

source /scratch/cpuhrsch/venv_torch/bin/activate
cd /scratch/cpuhrsch/venv
python -c "import torch; print(torch.randn(123).cuda().mean())"
5 changes: 5 additions & 0 deletions monarch_remotemount/examples/run_import_torch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

source /scratch/cpuhrsch/venv/bin/activate
( time python -c "import torch" ) 2>&1
( time python -c "import torch" ) 2>&1
43 changes: 43 additions & 0 deletions monarch_remotemount/hf_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import torch
from transformers import pipeline

# 1. Select a sub-10B model
# Microsoft Phi-3 Mini is ~3.8 Billion parameters
model_id = "microsoft/Phi-3-mini-4k-instruct"

print(f"Downloading and loading {model_id}...")

# 2. Initialize the pipeline
# device_map="auto" will automatically use your GPU if available, otherwise CPU
# torch_dtype=torch.float16 reduces memory usage by half (requires GPU usually)
pipe = pipeline(
"text-generation",
model=model_id,
model_kwargs={
"dtype": torch.float16 if torch.cuda.is_available() else torch.float32,
"low_cpu_mem_usage": True,
},
device_map="auto",
)

# 3. Define your prompt
messages = [
{"role": "user", "content": "Explain the concept of recursion to a 5-year-old."},
]

# 4. Run the generation
print("\nGenerating response...")
outputs = pipe(
messages,
max_new_tokens=256,
do_sample=True,
temperature=0.7,
)

# 5. Print the result
generated_text = outputs[0]["generated_text"][-1]["content"]
print("-" * 50)
print(f"Prompt: {messages[0]['content']}")
print("-" * 50)
print(f"Response:\n{generated_text}")
print("-" * 50)
1 change: 1 addition & 0 deletions monarch_remotemount/remotemount/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .remotemount import remotemount
169 changes: 169 additions & 0 deletions monarch_remotemount/remotemount/_fast_pack.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Fast file packing using mmap + parallel reads.
* Accepts file list from Python to ensure consistent ordering with metadata.
*/
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>

#define NUM_THREADS 16
#define MAP_POPULATE 0x008000

typedef struct {
const char *path;
size_t offset;
size_t size;
} FileInfo;

typedef struct {
FileInfo *files;
int start_idx;
int end_idx;
char *buffer;
} ThreadArg;

static void *read_files_thread(void *arg) {
ThreadArg *ta = (ThreadArg *)arg;
char *buffer = ta->buffer;

for (int i = ta->start_idx; i < ta->end_idx; i++) {
FileInfo *f = &ta->files[i];
if (f->size == 0) continue;

int fd = open(f->path, O_RDONLY);
if (fd < 0) continue;

size_t total_read = 0;
while (total_read < f->size) {
ssize_t bytes_read = read(fd, buffer + f->offset + total_read, f->size - total_read);
if (bytes_read <= 0) break;
total_read += bytes_read;
}
close(fd);
}
return NULL;
}

/*
* pack_files_with_offsets(file_list, total_size)
*
* Pack files into a buffer given a list of (path, offset, size) tuples.
* Returns a memoryview of the packed buffer.
*/
static PyObject *pack_files_with_offsets(PyObject *self, PyObject *args) {
PyObject *file_list;
unsigned long long total_size;

if (!PyArg_ParseTuple(args, "OK", &file_list, &total_size)) {
return NULL;
}

if (!PyList_Check(file_list)) {
PyErr_SetString(PyExc_TypeError, "First argument must be a list");
return NULL;
}

Py_ssize_t num_files = PyList_Size(file_list);
if (num_files == 0 || total_size == 0) {
/* Return empty bytes */
return PyBytes_FromStringAndSize(NULL, 0);
}

/* Parse file list into FileInfo array */
FileInfo *files = (FileInfo *)malloc(num_files * sizeof(FileInfo));
if (!files) {
PyErr_NoMemory();
return NULL;
}

for (Py_ssize_t i = 0; i < num_files; i++) {
PyObject *item = PyList_GET_ITEM(file_list, i);
PyObject *path_obj, *offset_obj, *size_obj;

if (!PyTuple_Check(item) || PyTuple_Size(item) != 3) {
free(files);
PyErr_SetString(PyExc_TypeError, "Each item must be a (path, offset, size) tuple");
return NULL;
}

path_obj = PyTuple_GET_ITEM(item, 0);
offset_obj = PyTuple_GET_ITEM(item, 1);
size_obj = PyTuple_GET_ITEM(item, 2);

if (!PyUnicode_Check(path_obj)) {
free(files);
PyErr_SetString(PyExc_TypeError, "Path must be a string");
return NULL;
}

files[i].path = PyUnicode_AsUTF8(path_obj);
files[i].offset = PyLong_AsUnsignedLongLong(offset_obj);
files[i].size = PyLong_AsUnsignedLongLong(size_obj);

if (PyErr_Occurred()) {
free(files);
return NULL;
}
}

/* Allocate mmap buffer with MAP_POPULATE */
char *buffer = (char *)mmap(NULL, (size_t)total_size,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE,
-1, 0);
if (buffer == MAP_FAILED) {
free(files);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}

/* Parallel file reading */
pthread_t threads[NUM_THREADS];
ThreadArg thread_args[NUM_THREADS];
int files_per_thread = (num_files + NUM_THREADS - 1) / NUM_THREADS;

for (int t = 0; t < NUM_THREADS; t++) {
thread_args[t].files = files;
thread_args[t].start_idx = t * files_per_thread;
thread_args[t].end_idx = (t + 1) * files_per_thread;
if (thread_args[t].end_idx > num_files) {
thread_args[t].end_idx = num_files;
}
thread_args[t].buffer = buffer;
pthread_create(&threads[t], NULL, read_files_thread, &thread_args[t]);
}

for (int t = 0; t < NUM_THREADS; t++) {
pthread_join(threads[t], NULL);
}

free(files);

/* Create memoryview from mmap buffer - must be writable for RDMA */
PyObject *result = PyMemoryView_FromMemory(buffer, (Py_ssize_t)total_size, PyBUF_WRITE);
return result;
}

static PyMethodDef FastPackMethods[] = {
{"pack_files_with_offsets", pack_files_with_offsets, METH_VARARGS,
"Pack files into a buffer given a list of (path, offset, size) tuples."},
{NULL, NULL, 0, NULL}
};

static struct PyModuleDef fastpackmodule = {
PyModuleDef_HEAD_INIT,
"_fast_pack",
"Fast file packing using mmap + parallel reads",
-1,
FastPackMethods
};

PyMODINIT_FUNC PyInit__fast_pack(void) {
return PyModule_Create(&fastpackmodule);
}
Loading