| from typing import List, Union |
|
|
| import torch |
| import numpy as np |
|
|
| from transformers.utils import is_flash_attn_2_available |
| from transformers.models.qwen2 import Qwen2Model |
| from transformers.models.qwen2.tokenization_qwen2_fast import Qwen2TokenizerFast |
| from transformers.models.qwen2.configuration_qwen2 import Qwen2Config |
|
|
|
|
| INSTRUCTION_CONFIG = { |
| "nl2code": { |
| "query": "Find the most relevant code snippet given the following query:\n", |
| "passage": "Candidate code snippet:\n" |
| }, |
| "qa": { |
| "query": "Find the most relevant answer given the following question:\n", |
| "passage": "Candidate answer:\n" |
| }, |
| "code2code": { |
| "query": "Find an equivalent code snippet given the following code snippet:\n", |
| "passage": "Candidate code snippet:\n" |
| }, |
| "code2nl": { |
| "query": "Find the most relevant comment given the following code snippet:\n", |
| "passage": "Candidate comment:\n" |
| }, |
| "code2completion": { |
| "query": "Find the most relevant completion given the following start of code snippet:\n", |
| "passage": "Candidate completion:\n" |
| } |
| } |
|
|
|
|
| def batch(iterable, n=1): |
| items = len(iterable) |
| for ndx in range(0, items, n): |
| yield iterable[ndx : min(ndx + n, items)] |
|
|
|
|
| def last_token_pooling(model_output, attention_mask): |
| token_embeddings = model_output[0] |
| left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) |
| if left_padding: |
| return token_embeddings[:, -1] |
| else: |
| sequence_lengths = attention_mask.sum(dim=1) - 1 |
| batch_size = token_embeddings.shape[0] |
| return token_embeddings[torch.arange(batch_size, device=token_embeddings.device), sequence_lengths].float() |
|
|
|
|
| class JinaEmbeddingsC1Model(Qwen2Model): |
| def __init__(self, config: Qwen2Config): |
| Qwen2Model.__init__(self, config) |
| self.instructions = INSTRUCTION_CONFIG |
| |
|
|
| def forward( |
| self, |
| input_ids: torch.LongTensor, |
| attention_mask: torch.Tensor, |
| **kwargs |
| ) -> List[torch.Tensor]: |
| """ |
| Forward pass through the model. |
| """ |
| batch_model_output = super().forward( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| **kwargs |
| ) |
| batch_sentence_embeddings = last_token_pooling( |
| batch_model_output, attention_mask |
| ) |
| return batch_sentence_embeddings |
|
|
|
|
| def encode( |
| self, |
| sentences: List[str], |
| batch_size: int = 32, |
| max_length: int = 32768, |
| task: str = "nl2code", |
| prompt_name: str = "query", |
| return_numpy: bool = False, |
| truncate_dim: int = 896, |
| ) -> Union[np.ndarray, List[torch.Tensor]]: |
| """ |
| Encodes a list of texts into embeddings. |
| Args: |
| sentences: list of text strings to encode |
| batch_size: Number of texts to process at once |
| max_length: Maximum token length for text processing |
| task: Type of retrieval task ('nl2code', 'qa', or 'code2code') |
| prompt_name: Type of text being encoded ('query' or 'passage') |
| return_numpy: Whether to return numpy arrays instead of torch tensors |
| truncate_dim: Dimension to truncate embeddings to (64, 128, 256, 512, or 896) |
| Returns: |
| List of text embeddings as tensors or numpy arrays |
| """ |
| assert task in self.config.task_names, \ |
| f"Invalid task: {task}. Must be one of {self.config.task_names}." |
| assert prompt_name in self.config.prompt_names, \ |
| f"Invalid prompt name: {prompt_name}. Must be one of {self.config.prompt_names}." |
| assert truncate_dim in self.config.matryoshka_dims, \ |
| f"Invalid embedding dimension: {truncate_dim}. Must be one of {self.config.matryoshka_dims}." |
|
|
| instruction = self.instructions[task][prompt_name] |
| sentences = [f'{instruction}{sentence}' for sentence in sentences] |
| embeddings = [] |
|
|
| self.eval() |
|
|
| with torch.inference_mode(): |
| for batch_of_sentences in batch(sentences, n=batch_size): |
| batch_encoded_input = self.tokenizer( |
| batch_of_sentences, |
| padding=True, |
| truncation=True, |
| return_tensors="pt", |
| max_length=max_length |
| ).to(self.device) |
|
|
| batch_sentence_embeddings = self( |
| **batch_encoded_input, |
| output_attentions=False, |
| return_dict=True, |
| max_length=max_length |
| ) |
|
|
| batch_sentence_embeddings = batch_sentence_embeddings[:, :truncate_dim] |
| batch_sentence_embeddings = torch.nn.functional.normalize( |
| batch_sentence_embeddings, p=2, dim=-1 |
| ).to("cpu") |
|
|
| embeddings.append(batch_sentence_embeddings) |
|
|
| if return_numpy: |
| return np.concatenate([b.numpy() for b in embeddings], axis=0) |
| return [t for b in embeddings for t in torch.unbind(b, dim=0)] |
| |
|
|
| @classmethod |
| def from_pretrained( |
| cls, |
| pretrained_model_name_or_path, |
| *args, |
| **kwargs, |
| ): |
| """ |
| Loads a pretrained model. |
| """ |
| if "torch_dtype" not in kwargs: |
| kwargs["torch_dtype"] = "auto" |
| |
| if "attn_implementation" not in kwargs: |
| kwargs["attn_implementation"] = "flash_attention_2" if is_flash_attn_2_available() else "sdpa" |
| |
| model = super().from_pretrained( |
| pretrained_model_name_or_path, *args, **kwargs |
| ) |
|
|
| model.tokenizer = Qwen2TokenizerFast.from_pretrained( |
| pretrained_model_name_or_path, |
| trust_remote_code=True |
| ) |
|
|
| return model |
|
|
|
|