Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
ZicsX committed Sep 6, 2023
1 parent df97f86 commit c6077c6
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ __pycache__
*dist
*egg-info*
test.py
env
17 changes: 8 additions & 9 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
lm_dataformat
lm-dataformat==0.0.18
tqdm
gdown
concurrent_iterator
pytablewriter
gitpython
fasttext
best-download
gsutil
virtualenv
gdown==3.12.2
concurrent-iterator==0.2.6
pytablewriter==0.58.0
GitPython==3.1.11
fasttext==0.9.2
best-download==0.0.1
gsutil==4.57
226 changes: 226 additions & 0 deletions test.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_sources = [\n",
" \"ArXiv\",\n",
" \"BookCorpus2\",\n",
" \"Books3\",\n",
" \"DM Mathematics\",\n",
" \"Enron Emails\",\n",
" \"EuroParl\",\n",
" \"FreeLaw\",\n",
" \"Github\",\n",
" \"Gutenberg (PG-19)\",\n",
" \"HackerNews\",\n",
" \"NIH ExPorter\",\n",
" \"OpenSubtitles\",\n",
" \"OpenWebText2\",\n",
" \"PhilPapers\",\n",
" \"Pile-CC\",\n",
" \"PubMed Abstracts\",\n",
" \"PubMed Central\",\n",
" \"StackExchange\",\n",
" \"UPSTO Backgrounds\",\n",
" \"Ubuntu IRC\",\n",
" \"Wikipedia (en)\",\n",
" \"YoutubeSubtitles\"\n",
"]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import gc\n",
"from datasets import load_dataset, concatenate_datasets\n",
"import pyarrow.parquet as pq\n",
"import pyarrow as pa\n",
"import os\n",
"from tqdm import tqdm\n",
"\n",
"data_sources = [\n",
" \"NIH ExPorter\",\n",
" \"PhilPapers\",\n",
" \"Enron Emails\"\n",
" ]\n",
"\n",
"os.makedirs(\"parquet_files\", exist_ok=True)\n",
"\n",
"for subset_of_interest in tqdm(data_sources, desc=\"Data Sources\"):\n",
" print(f\"Processing {subset_of_interest}...\")\n",
" \n",
" folder_name = subset_of_interest.replace(\" \", \"_\")\n",
" \n",
" dataset = load_dataset(\"ArmelR/the-pile-splitted\", subset_of_interest, num_proc=8)\n",
" \n",
" concatenated_dataset = concatenate_datasets([dataset['train'], dataset['test']])\n",
" \n",
" os.makedirs(f\"parquet_files/{folder_name}\", exist_ok=True)\n",
"\n",
" total_rows = len(concatenated_dataset)\n",
" total_size_bytes = concatenated_dataset.data.nbytes\n",
" size_per_file = 1_000_000_000\n",
" rows_per_file = int((total_rows / total_size_bytes) * size_per_file)\n",
"\n",
" start_idx = 0\n",
" file_idx = 0\n",
" pbar = tqdm(total=total_rows, desc=f\"Saving {subset_of_interest}\")\n",
" while start_idx < total_rows:\n",
" end_idx = min(start_idx + rows_per_file, total_rows)\n",
" \n",
" subset_data = concatenated_dataset.select(range(start_idx, end_idx))\n",
" \n",
" subset_table = pa.Table.from_pandas(subset_data.data.to_pandas())\n",
" \n",
" pq.write_table(subset_table, f\"parquet_files/{folder_name}/dataset_{file_idx}.parquet\")\n",
" \n",
" pbar.update(end_idx - start_idx)\n",
" \n",
" start_idx = end_idx\n",
" file_idx += 1\n",
" \n",
" pbar.close()\n",
" print(f\"Exported {subset_of_interest} to {file_idx} Parquet files.\")\n",
" \n",
" del dataset\n",
" del concatenated_dataset\n",
" del subset_data\n",
" del subset_table\n",
" \n",
" gc.collect()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import gc\n",
"from datasets import load_dataset, concatenate_datasets\n",
"import pyarrow.parquet as pq\n",
"import pyarrow as pa\n",
"import os\n",
"import subprocess\n",
"from threading import Thread\n",
"from tqdm import tqdm\n",
"\n",
"def upload_to_s3(folder_name):\n",
" s3_bucket = \"your-s3-bucket-name\"\n",
" subprocess.run([\"aws\", \"s3\", \"sync\", f\"parquet_pile/{folder_name}\", f\"s3:https://{s3_bucket}/{folder_name}\"])\n",
" subprocess.run([\"rm\", \"-r\", f\"parquet_pile/{folder_name}\"])\n",
"\n",
"data_sources = [\n",
" \"ArXiv\",\n",
" \"BookCorpus2\",\n",
" \"Books3\",\n",
" \"DM Mathematics\",\n",
" \"Enron Emails\",\n",
" \"EuroParl\",\n",
" \"FreeLaw\",\n",
" \"Github\",\n",
" \"Gutenberg (PG-19)\",\n",
" \"HackerNews\",\n",
" \"NIH ExPorter\",\n",
" \"OpenSubtitles\",\n",
" \"OpenWebText2\",\n",
" \"PhilPapers\",\n",
" \"Pile-CC\",\n",
" \"PubMed Abstracts\",\n",
" \"PubMed Central\",\n",
" \"StackExchange\",\n",
" \"UPSTO Backgrounds\",\n",
" \"Ubuntu IRC\",\n",
" \"Wikipedia (en)\",\n",
" \"YoutubeSubtitles\"\n",
"]\n",
"\n",
"\n",
"# Create a parent directory to store all Parquet files\n",
"os.makedirs(\"parquet_pile\", exist_ok=True)\n",
"\n",
"for subset in tqdm(data_sources, desc=\"Data Sources\"):\n",
" print(f\"Processing {subset}...\")\n",
" \n",
" folder_name = subset.replace(\" \", \"_\")\n",
" \n",
" dataset = load_dataset(\"ArmelR/the-pile-splitted\", subset, num_proc=8)\n",
" \n",
" concatenated_dataset = concatenate_datasets([dataset['train'], dataset['test']])\n",
" \n",
" os.makedirs(f\"parquet_pile/{folder_name}\", exist_ok=True)\n",
"\n",
" total_rows = len(concatenated_dataset)\n",
" total_size_bytes = concatenated_dataset.data.nbytes\n",
" size_per_file = 1_000_000_000\n",
" rows_per_file = int((total_rows / total_size_bytes) * size_per_file)\n",
"\n",
" start_idx = 0\n",
" file_idx = 0\n",
" pbar = tqdm(total=total_rows, desc=f\"Saving {subset}\")\n",
" while start_idx < total_rows:\n",
" end_idx = min(start_idx + rows_per_file, total_rows)\n",
" subset_data = concatenated_dataset.select(range(start_idx, end_idx))\n",
" subset_table = pa.Table.from_pandas(subset_data.data.to_pandas())\n",
" pq.write_table(subset_table, f\"parquet_pile/{folder_name}/dataset_{file_idx}.parquet\")\n",
" pbar.update(end_idx - start_idx)\n",
" start_idx = end_idx\n",
" file_idx += 1\n",
" \n",
" pbar.close()\n",
"\n",
" # Start a new thread to upload this dataset to S3\n",
" Thread(target=upload_to_s3, args=(folder_name,)).start()\n",
" \n",
" print(f\"Exported {subset} to {file_idx} Parquet files.\")\n",
" \n",
" del dataset\n",
" del concatenated_dataset\n",
" del subset_data\n",
" del subset_table\n",
" gc.collect()\n",
"\n",
"# # Load parquet in Stream\n",
"# dataset = load_dataset(\n",
"# \"parquet\", data_files=[\"s3:https://<bucket name>/<data folder>/data-parquet\"],\n",
"# storage_options=fs.storage_options, streaming=True)\n",
"\n",
"# i = 0\n",
"# for e in dataset['train']:\n",
"# if i == 5:\n",
"# break\n",
"# i+=1\n",
"# print(e)\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.16"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit c6077c6

Please sign in to comment.