Commit 86348a1f by Gianluca Lombardi

Initialize repository

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
debug/
logs/
datasets/
config/
wandb/
slurm/
"""Handle data processing and laoding."""
from typing import Any, Optional, Tuple
from pathlib import Path
import json
import logging
import gzip
import pickle as pkl
import numpy as np
from sklearn.utils import compute_class_weight
from datasets import Dataset, DatasetDict, load_from_disk, disable_progress_bars
_SEED = 42
_DATA_DIR = (Path(__file__).parent / "datasets").resolve()
_DATASETS_NAMES = [d.name for d in _DATA_DIR.iterdir() if d.is_dir()]
def load_data(
dataset_name: str,
predefined_split_info: Optional[dict[str, Any]] = None,
use_validation: bool = True,
test_size: float = 0.15,
) -> Tuple[Dataset, Dataset, Dataset]:
disable_progress_bars()
assert (
dataset_name in _DATASETS_NAMES
), f"Dataset {dataset_name} not found. Must be one of {_DATASETS_NAMES}"
dataset_path = _DATA_DIR / dataset_name / "hf_dataset"
try:
if predefined_split_info:
logging.info(f"Loading dataset {dataset_name} from predefined splits")
train_data, test_data, val_data = _load_data_from_predefined_splits(
dataset_name, predefined_split_info, use_validation
)
else:
logging.info(f"Loading dataset {dataset_name} from HuggingFace dataset")
train_data, test_data, val_data = _load_data_from_hf_splits(
dataset_name, use_validation
)
except FileNotFoundError:
logging.warning(
f"Dataset {dataset_name} not found in {dataset_path}. Processing raw data."
)
train_data, test_data, val_data = preprocess_data(dataset_name, test_size, use_validation)
logging.info(f"Saving processed dataset to {dataset_path}")
if val_data is not None:
dataset_dict = DatasetDict(
{"train": train_data, "test": test_data, "validation": val_data}
)
else:
dataset_dict = DatasetDict({"train": train_data, "test": test_data})
dataset_dict.save_to_disk(dataset_path)
return train_data, test_data, val_data
def _load_data_from_hf_splits(dataset_name, use_validation):
dataset_path = _DATA_DIR / dataset_name / "hf_dataset"
dataset_dict = load_from_disk(dataset_path)
train_data = dataset_dict["train"]
test_data = dataset_dict["test"]
if use_validation and "validation" not in dataset_dict:
raise FileNotFoundError
val_data = dataset_dict["validation"] if use_validation else None
return train_data, test_data, val_data
def _load_data_from_predefined_splits(dataset_name, predefined_split_info, use_validation):
"""Load dataset from pre-split txt files containing proteins IDs."""
dataset_dir = _DATA_DIR / dataset_name
assert dataset_dir.exists(), f"Dataset {dataset_name} not found in {dataset_dir}"
sequences = _unpickle(dataset_dir / "sequences.pkl")
labels = _unpickle(dataset_dir / "labels.pkl")
data = {"id": [], "sequence": [], "labels": []}
for key in labels.keys():
data["id"].append(key)
data["sequence"].append("".join(sequences[key]))
data["labels"].append(labels[key])
if len(sequences[key]) != len(labels[key]):
logging.warning(f"Sequence and label lengths do not match for {key}")
data = Dataset.from_dict(data)
splits_file = dataset_dir / predefined_split_info["splits_file"] # relative path to JSON file
if not splits_file.exists():
raise FileNotFoundError(f"Splits dictionary not found in {splits_file}")
data_splits = {}
with open(splits_file, "r") as f:
splits_data = json.load(f)
for split_name in predefined_split_info["splits"]:
if "val" in split_name and not use_validation:
data_splits["val"] = None
continue
split_ids = splits_data[split_name]
data_splits[split_name] = data.filter(lambda x: x["id"] in split_ids)
return data_splits.values() # data_splits["train"], data_splits["test"], data_splits["val"]
def _unpickle(file: Path) -> Any:
try:
with open(file, "rb") as f:
return pkl.load(f)
except pkl.UnpicklingError:
with gzip.open(file, "rb") as f:
return pkl.load(f)
def preprocess_data(
dataset_name: str, test_size=0.15, use_validation=True
) -> Tuple[Dataset, Dataset]:
dataset_dir = _DATA_DIR / dataset_name
assert dataset_dir.exists(), f"Dataset {dataset_name} not found in {dataset_dir}"
sequences = _unpickle(dataset_dir / "sequences.pkl")
labels = _unpickle(dataset_dir / "labels.pkl")
data = {"id": [], "sequence": [], "labels": []}
for key in sequences.keys():
data["id"].append(key)
data["sequence"].append("".join(sequences[key]))
data["labels"].append(labels[key])
if len(sequences[key]) != len(labels[key]):
logging.warning(f"Sequence and label lengths do not match for {key}")
data = Dataset.from_dict(data) # pd.DataFrame(data)
# split data into train, val with fixed seed
train_test_split = data.train_test_split(test_size=test_size, seed=_SEED)
train_data = train_test_split["train"]
test_data = train_test_split["test"]
val_data = None
if use_validation:
train_val_split = train_data.train_test_split(test_size=0.15, seed=_SEED)
train_data = train_val_split["train"]
val_data = train_val_split["test"]
return train_data, test_data, val_data
def post_process_data(data: Dataset, max_length: int = None, pos_range=None) -> Dataset:
# cut sequences to max_length and labels accordingly
if max_length is not None:
data = data.map(
lambda x: {"sequence": x["sequence"][:max_length], "labels": x["labels"][:max_length]},
batched=False,
)
if pos_range is not None:
# remove samples with too few positive labels or too many positive labels
pos_frac = np.array([sum(labels) / len(labels) for labels in data["labels"]])
filter_mask = (pos_frac >= pos_range[0]) & (pos_frac <= pos_range[1])
data = data.select(np.where(filter_mask)[0])
return data
def prune(data: Dataset, prune_frac: float = 0.2, random_seed: int = _SEED) -> Dataset:
# prune a fraction of the data
rng = np.random.default_rng(random_seed)
indices = rng.choice(len(data), int(len(data) * (1 - prune_frac)), replace=False)
return data.select(indices)
def downsample_negatives(
X: np.ndarray, y: np.ndarray, downsample_ratio: float, random_seed: int = _SEED
) -> Tuple[np.ndarray, np.ndarray]:
"""
Downsample the vectors in X that have label 0, according to the specified downsample_ratio.
Args:
- X (np.ndarray): The input array of vectors (shape: [n_samples, n_features]).
- y (np.ndarray): The corresponding label array (shape: [n_samples, ]).
- downsample_ratio (float): The fraction of label 0 vectors to retain (0.0 < downsample_ratio <= 1.0).
- random_seed (int, optional): A random seed to ensure reproducibility. Defaults to None.
Returns:
- X_downsampled (np.ndarray): The downsampled array of vectors.
- y_downsampled (np.ndarray): The corresponding downsampled labels.
"""
if downsample_ratio <= 0.0 or downsample_ratio > 1.0:
raise ValueError("downsample_ratio must be between 0.0 and 1.0")
# Set random seed for reproducibility
rng = np.random.default_rng(random_seed)
# Get the indices of vectors with label 0 and label 1
indices_negatives = np.where(y == 0)[0]
indices_positives = np.where(y == 1)[0]
num_to_keep = int(len(indices_negatives) * downsample_ratio)
indices_negatives_to_keep = rng.choice(indices_negatives, num_to_keep, replace=False)
indices_to_keep = np.concatenate([indices_negatives_to_keep, indices_positives])
np.random.shuffle(indices_to_keep)
# Return the downsampled X and y
return X[indices_to_keep], y[indices_to_keep]
def get_class_weights(dataset: Dataset, label_column: str = "labels"):
labels = np.concatenate(dataset[label_column])
class_weights = compute_class_weight(
class_weight="balanced", classes=np.unique(labels), y=labels
)
return class_weights
File added
This diff is collapsed. Click to expand it.
"""Hyperparameter search for HuggingFace models using Optuna backend."""
from typing import Tuple, Callable, Any
from pathlib import Path
import os
import sys
import yaml
import logging
import importlib.util
from transformers import AutoTokenizer, AutoModelForTokenClassification, EarlyStoppingCallback
import peft
sys.path.append(Path(__file__).parent)
import utils
import hf_utils
class HPOModelHelper(hf_utils.HFModelHelper):
"""
Provide access to hyperparameters initialization
and direct conversion to peft model.
"""
def __init__(
self,
model_name: str,
model_path: str = None,
device: str = "cpu",
dropout: float = 0.2,
lora_r: int = 16,
lora_target_modules: list = ["q_proj", "v_proj", "query", "value", "q", "v"],
):
model_id = utils.HF_PRETRAINED_MODELS.get(model_name.upper(), model_name)
if model_path is None:
model_path = model_id
self.model_path = model_path
self.tokenizer = AutoTokenizer.from_pretrained(
model_id, use_fast=("rostlab" not in model_id.lower())
)
# load base pre-trained model
model = AutoModelForTokenClassification.from_pretrained(
model_path, num_labels=2, classifier_dropout=dropout
)
if model.dropout.p != dropout:
logging.warning(
f"Classifier droput was not initialized to {dropout}. Setting it manually."
)
model.dropout.p = dropout
# load peft adapter if present
is_peft = False
try:
model = peft.PeftModel.from_pretrained(model, model_id=model_path)
is_peft = True
except ValueError:
logging.warning(
f"PEFT adapter not found at {model_path}. Loading base model and initializing lora"
" layers."
)
if not is_peft:
lora_config = peft.LoraConfig(
task_type=peft.TaskType.TOKEN_CLS,
r=lora_r,
base_model_name_or_path=self.model_path,
target_modules=lora_target_modules,
lora_dropout=dropout,
lora_alpha=2 * lora_r,
)
lora_model = peft.get_peft_model(model, lora_config)
lora_model.print_trainable_parameters()
self.model = lora_model
else:
self.model = model
self.device = device
def convert_to_peft(self):
raise NotImplementedError(
"Conversion to PEFT model is already done during initialization."
)
def set_wandb_env(logging_dir: str):
if importlib.util.find_spec("wandb") is not None:
project = logging_dir.split("hflogs/")[-1].replace("/", "-")
os.environ["WANDB_PROJECT"] = project
os.environ["WANDB_WATCH"] = "parameters"
logging.info("wandb is available and environment variables are set.")
else:
logging.info("wandb is not available.")
def _prepare_hpo(model_name: str, hpo_config: dict) -> Tuple[Callable[..., Any]]:
hyperparams = hpo_config["hyperparameters"]
objective = hpo_config.get("objective", "roc_auc")
logging.info(f"Objective metric: {objective}")
def optuna_hp_space(trial):
learning_rate = trial.suggest_float(
"learning_rate", **hyperparams["learning_rate"], log=True
)
weight_decay = trial.suggest_categorical("weight_decay", hyperparams["weight_decay"])
warmup_ratio = trial.suggest_categorical("warmup_ratio", hyperparams["warmup_ratio"])
params = {
"learning_rate": learning_rate,
"weight_decay": weight_decay,
"warmup_ratio": warmup_ratio,
}
logging.info(f"Trial hyperparameters: {params}")
return params
def compute_objective(metrics):
return metrics["eval_" + objective]
def model_init(trial):
"""Initialize the model."""
# In the first initialization, trial is None and default values are used to avoid errors
# logging.info("Initializing model during trial")
dropout = (
trial.suggest_categorical("dropout", hyperparams["dropout"])
if "dropout" in hyperparams and trial is not None
else 0.2
)
lora_r = (
trial.suggest_categorical("lora_r", hyperparams["lora_r"])
if "lora_r" in hyperparams and trial is not None
else 16
)
lora_target_modules = (
trial.suggest_categorical("lora_target_modules", hyperparams["lora_target_modules"])
if "lora_target_modules" in hyperparams and trial is not None
else ["q_proj", "v_proj", "query", "value", "q", "v"]
)
# since optuna `suggest_categorical requires choices to be hashable, in the config file
# the choices are defined as strings separated by "__"
if isinstance(lora_target_modules, str):
lora_target_modules = lora_target_modules.split("__")
params = {"dropout": dropout, "lora_r": lora_r, "lora_target_modules": lora_target_modules}
if trial is not None:
logging.info(f"Model hyperparameters: {params}")
model_helper = HPOModelHelper(
model_name, **params
) # may be necessary to add trial arguments
return model_helper.get_model()
return model_init, optuna_hp_space, compute_objective
def run_hpo(
model_name: str,
config_file: str,
logging_dir: str = "./log",
verbosity: int = 2,
):
hf_utils.set_seed()
hf_utils.set_verbosity(verbosity)
device = utils.set_torch_device()
# process config file and copy to logging directory
Path(logging_dir).mkdir(parents=True, exist_ok=True)
config = utils.process_config_file(config_file, logging_dir)
max_length = config["dataset"].get("max_length", 1024)
# initlialize base model helper for loading data and computing class weights
model_helper = hf_utils.HFModelHelper(model_name, device=device)
model_helper.convert_to_peft()
train_data, val_data, _ = hf_utils.load_data(config, model_helper)
print(train_data[0])
class_weights = hf_utils.get_class_weights(train_data, device=device)
data_collator = model_helper.get_collator()
# get default training arguments
set_wandb_env(logging_dir)
train_config = config["trainer"]
patience = train_config.pop("early_stopping_patience", None)
train_args = hf_utils.get_training_arguments(
train_config, model_helper, max_length, logging_dir
)
del model_helper # remove model helper to free up memory
# get hpo util functions
hpo_config = config["hpo"]
model_init, optuna_hp_space, compute_objective = _prepare_hpo(model_name, hpo_config)
# initialize trainer
trainer = hf_utils.BalancedTrainer(
class_weights=class_weights,
model=None,
model_init=model_init,
args=train_args,
data_collator=data_collator,
train_dataset=train_data,
eval_dataset=val_data,
callbacks=[EarlyStoppingCallback(early_stopping_patience=patience)] if patience else None,
compute_metrics=hf_utils.compute_metrics,
)
# run hyperparameter search
logging.info("Starting hyperparameter search...")
best_trial = trainer.hyperparameter_search(
direction="maximize",
hp_space=optuna_hp_space,
compute_objective=compute_objective,
n_trials=hpo_config.get("n_trials", 20),
backend="optuna",
)
# Save the best trial hyperparameters to a YAML file
result = dict()
result["run_id"] = best_trial.run_id
result["roc_auc"] = best_trial.objective
result["hyperparameters"] = best_trial.hyperparameters
output_file = Path(logging_dir) / "best_trial.yaml"
with open(output_file, "w") as file:
yaml.dump(result, file, default_flow_style=False)
logging.info(f"Best trial hyperparameters saved to {output_file}")
if __name__ == "__main__":
args = utils.parse_cli(__doc__)
utils.setup_logging(args.verbosity)
logging.info(f"Starting hyperparameter search for model: {args.model_name}")
run_hpo(args.model_name, args.config_file, args.logging_dir, args.verbosity)
This source diff could not be displayed because it is too large. You can view the blob instead.
# Installing PyTorch from a specific index URL, depending on CUDA version
# In this case, CUDA 12.1
--index-url https://download.pytorch.org/whl/cu121
torch==2.3.0
torchvision==0.18
torchaudio==2.3.0
# Return to PyPI for the rest of the packages
--index-url https://pypi.org/simple
h5py
numpy==1.23.5
scipy=1.15.0
pandas==2.2.2
scikit-learn==1.5.0
transformers==4.45.0
accelerate==1.0.0
huggingface-hub==0.27.1
peft==0.13.1
datasets==3.0.1
optuna==4.0.0
wandb==0.18.5
tqdm==4.67.1
"""Utils functions for setup, logging and training."""
from pathlib import Path
import logging
import yaml
import shutil
from sklearn import metrics
from scipy.special import softmax
def setup_logging(verbosity: int) -> None:
if verbosity == 2:
level = logging.DEBUG
elif verbosity == 1:
level = logging.INFO
elif verbosity == 0:
level = logging.WARNING
logging.basicConfig(
level=level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
def parse_cli(description):
import argparse
parser = argparse.ArgumentParser(
description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"-m",
"--model-name",
type=str,
required=True,
help=(
"The name of the model to train. Must be a valid Hugging Face model, a local"
" checkpoint path, or 'sklearn'."
),
)
parser.add_argument(
"-c",
"--config-file",
type=str,
required=True,
help="The path to the configuration file.",
)
parser.add_argument(
"-o",
"--logging-dir",
type=str,
default="./log",
help="The directory where logs and model checkpoints will be saved.",
)
parser.add_argument(
"--pred-file",
type=str,
default=None,
help="The path to the file where the predictions will be saved.",
)
parser.add_argument(
"-v",
"--verbosity",
type=int,
default=2,
help="The verbosity level of the logger.",
)
args = parser.parse_args()
# Check for invalid arguments and set defaults
if args.verbosity > 2:
logging.warning("Verbosity level must be between 0 and 2. Setting to 2.")
args.verbosity = 2
elif args.verbosity < 0:
logging.warning("Verbosity level must be between 0 and 2. Setting to 0.")
args.verbosity = 0
return args
def process_config_file(config_file, logging_dir, sanity_check_fn=None):
# load config file
with open(config_file, "r") as file:
config = yaml.safe_load(file)
# run sanity check function
if sanity_check_fn:
sanity_check_fn(config)
# remove *.yml files from logging directory
for file in Path(logging_dir).glob("*.yml"):
file.unlink()
shutil.copy2(config_file, logging_dir)
return config
# Pretrained models hub IDs
HF_PRETRAINED_MODELS = {
"ESM2_35M": "facebook/esm2_t12_35M_UR50D",
"ESM2_650M": "facebook/esm2_t33_650M_UR50D",
"PROTT5": "Rostlab/prot_t5_xl_uniref50",
"ANKH_BASE": "ElnaggarLab/ankh-base",
"ANKH_LARGE": "ElnaggarLab/ankh-large",
"SAPROT_35M": "westlake-repl/SaProt_35M_AF2",
"SAPROT_650M": "westlake-repl/SaProt_650M_AF2",
}
_DEAFAULT_EVAL_METRICS = {
"f1": metrics.f1_score,
"mc": metrics.matthews_corrcoef,
"balanced_accuracy": metrics.balanced_accuracy_score,
"average_precision": metrics.average_precision_score,
"precision": metrics.precision_score,
"recall": metrics.recall_score,
"roc_auc": metrics.roc_auc_score,
"accuracy": metrics.accuracy_score,
}
def compute_metrics(labels, preds):
eval_results = {}
for metric_name, metric_func in _DEAFAULT_EVAL_METRICS.items():
try:
eval_results[metric_name] = metric_func(labels, softmax(preds, axis=-1)[:, 1])
except ValueError:
try:
eval_results[metric_name] = metric_func(labels, preds.argmax(axis=-1))
except Exception as e:
raise e
return eval_results
def set_torch_device():
import torch
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"
else:
device = "cpu"
logging.info(f"Using default device: {device}")
return device
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment