o
    ZhZM                     @   s   d dl mZmZmZmZ ddlmZ ddlmZm	Z	m
Z
 ddlmZmZmZ ddlmZmZmZ ddlmZ e r=d dlZG d	d
 d
eddZdZdd edD dd edD  Zdd ZG dd deZdgZdS )    )ClassVarListOptionalUnion   )BatchFeature)
ImageInputis_valid_imagemake_flat_list_of_images)ProcessingKwargsProcessorMixinUnpack)
AddedTokenPreTokenizedInput	TextInput)is_torch_availableNc                   @   s&   e Zd ZddidddddidZd	S )
ColPaliProcessorKwargspaddinglongestZchannels_firstT)Zdata_formatZdo_convert_rgbZreturn_tensorspt)text_kwargsimages_kwargsZcommon_kwargsN)__name__
__module____qualname__	_defaults r   r   ]/var/www/auris/lib/python3.10/site-packages/transformers/models/colpali/processing_colpali.pyr   $   s    
r   F)totalz<image>c                 C      g | ]	}d |ddqS )z<locz0>4>r   .0ir   r   r   
<listcomp>2       r$   i   c                 C   r   )z<segz0>3r    r   r!   r   r   r   r$   2   r%      c                 C   s   || |  | |  dS )aZ  
    Builds a string from the input prompt and image tokens.
    For example, for the call:
    build_string_from_input(
        prompt="Prefix str"
        bos_token="<s>",
        image_seq_len=3,
        image_token="<im>",
    )
    The output will be:
    "<im><im><im><s>Initial str"
    Args:
        prompt (`List[Union[str, ImageInput]]`): The input prompt.
        bos_token (`str`): The beginning of sentence token.
        image_seq_len (`int`): The length of the image sequence.
        image_token (`str`): The image token.
        num_images (`int`): Number of images in the prompt.
    
r   prompt	bos_tokenZimage_seq_lenimage_tokenZ
num_imagesr   r   r   build_string_from_input5   s   r,   c                       s\  e Zd ZU dZddgZdgZdZdZdZe	e
 ed< d	Ze	e
 ed
< 			d, fdd	Z				d-dedeeeee ee f dee defddZdd Zdd Zedd Zede
fddZ	d.dedee defddZdeeee f dee defddZ	 		!d/d"ed#ed# f d$ed#ed# f d%ed&ed' d(ed)e
f dd#fd*d+Z   Z!S )0ColPaliProcessora  
    Constructs a ColPali processor which wraps a PaliGemmaProcessor and special methods to process images and queries, as
    well as to compute the late-interaction retrieval score.

    [`ColPaliProcessor`] offers all the functionalities of [`PaliGemmaProcessor`]. See the [`~PaliGemmaProcessor.__call__`]
    for more information.

    Args:
        image_processor ([`SiglipImageProcessor`], *optional*):
            The image processor is a required input.
        tokenizer ([`LlamaTokenizerFast`], *optional*):
            The tokenizer is a required input.
        chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages
            in a chat into a tokenizable string.
    image_processor	tokenizerchat_template)ZSiglipImageProcessorZSiglipImageProcessorFast)ZGemmaTokenizerZGemmaTokenizerFastzDescribe the image.visual_prompt_prefixz
Question: query_prefixNc                    s   |d u rt d|d u rt dt|dst d|j| _t|ds=ttddd}d	|gi}|| |t| _t| _n|j| _|j| _|	t
 d|_d|_t j|||d
 d S )Nz)You need to specify an `image_processor`.z"You need to specify a `tokenizer`.image_seq_lengthz;Image processor is missing an `image_seq_length` attribute.r+   FT)
normalizedZspecialZadditional_special_tokens)r0   )
ValueErrorhasattrr3   r   IMAGE_TOKENZadd_special_tokensZconvert_tokens_to_idsZimage_token_idr+   Z
add_tokensEXTRA_TOKENSZadd_bos_tokenZadd_eos_tokensuper__init__)selfr.   r/   r0   kwargsr+   Ztokens_to_add	__class__r   r   r:   d   s&   




zColPaliProcessor.__init__imagestextr<   returnc                    sn   j tfd jji|}|d dd}|durdnd}|du r)|du r)td|dur5|dur5td|durt|rA|g}n$t|trMt|d	 rMnt|trat|d	 trat|d	 d	 setd
 j	gt
| }	dd |D } fddt|	|D }
t|} j|fi |d d }|d dddur|d d   j7  <  j|
fddi|d }i |d|i}|r|d |d d	kd}|d|i t|dS |dur5t|tr|g}nt|trt|d	 tstd|du r jd }g }|D ]} jj j | }||7 }|d7 }|| q |d dd|d d<  j|fddi|d }|S dS )a	  
        Main method to prepare for the model either (1) one or several texts, either (2) one or several image(s). This method is custom
        wrapper around the PaliGemmaProcessor's [`~PaliGemmaProcessor.__call__`] method adapted for the ColPali model. It cannot process
        both text and images at the same time.

        When preparing the text(s), this method forwards the `text` and `kwargs` arguments to LlamaTokenizerFast's
        [`~LlamaTokenizerFast.__call__`].
        When preparing the image(s), this method forwards the `images` and `kwargs` arguments to SiglipImageProcessor's
        [`~SiglipImageProcessor.__call__`].
        Please refer to the docstring of the above two methods for more information.

        Args:
            images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`):
                The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
                tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a
                number of channels, H and W are image height and width.
            text (`str`, `List[str]`, `List[List[str]]`):
                The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
                (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
                `is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
            return_tensors (`str` or [`~utils.TensorType`], *optional*):
                If set, will return tensors of a particular framework. Acceptable values are:

                - `'tf'`: Return TensorFlow `tf.constant` objects.
                - `'pt'`: Return PyTorch `torch.Tensor` objects.
                - `'np'`: Return NumPy `np.ndarray` objects.
                - `'jax'`: Return JAX `jnp.ndarray` objects.

        Returns:
            [`BatchFeature`]: A [`BatchFeature`] with the following fields:

            - **input_ids** -- List of token ids to be fed to a model.
            - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
              `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
              `None`).
            - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
        Ztokenizer_init_kwargsr   suffixNTFz&Either text or images must be providedz5Only one of text or images can be processed at a timer   zAimages must be an image, list of images or list of list of imagesc                 S   s   g | ]}| d qS )RGB)convert)r"   imager   r   r   r$      s    z-ColPaliProcessor.__call__.<locals>.<listcomp>c              
      s:   g | ]\}}t | jj jtt|trt|nd dqS )   r(   )r,   r/   r*   r3   r7   
isinstancelistlen)r"   r)   Z
image_listr;   r   r   r$      s    r   pixel_values
max_lengthreturn_token_type_idsZ	input_idsZtoken_type_idsilabels)dataz*Text must be a string or a list of strings
   r'   2   )Z_merge_kwargsr   r/   Zinit_kwargspopr5   r	   rG   rH   r1   rI   zipr
   r.   getr3   Zmasked_fillupdater   strquery_augmentation_tokenr*   r2   append)r;   r?   r@   ZaudioZvideosr<   Zoutput_kwargsrB   rM   Z	texts_docZinput_stringsrK   ZinputsZreturn_datarN   Ztexts_queryqueryZbatch_queryr   rJ   r   __call__   s   -(





zColPaliProcessor.__call__c                 O      | j j|i |S )z
        This method forwards all its arguments to GemmaTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
        refer to the docstring of this method for more information.
        )r/   batch_decoder;   argsr<   r   r   r   r\        zColPaliProcessor.batch_decodec                 O   r[   )z
        This method forwards all its arguments to GemmaTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
        the docstring of this method for more information.
        )r/   decoder]   r   r   r   r`   
  r_   zColPaliProcessor.decodec                 C   s"   | j j}| jj}tt|| S N)r/   model_input_namesr.   rH   dictfromkeys)r;   Ztokenizer_input_namesZimage_processor_input_namesr   r   r   rb     s   z"ColPaliProcessor.model_input_namesc                 C   s   | j jS )z
        Return the query augmentation token.

        Query augmentation buffers are used as reasoning buffers during inference.
        )r/   Z	pad_tokenrJ   r   r   r   rW     s   z)ColPaliProcessor.query_augmentation_tokenc                 K      | j dd|i|S )a  
        Prepare for the model one or several image(s). This method is a wrapper around the `__call__` method of the ColPaliProcessor's
        [`ColPaliProcessor.__call__`].

        This method forwards the `images` and `kwargs` arguments to SiglipImageProcessor's [`~SiglipImageProcessor.__call__`].

        Args:
            images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`):
                The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
                tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a
                number of channels, H and W are image height and width.
            return_tensors (`str` or [`~utils.TensorType`], *optional*):
                If set, will return tensors of a particular framework. Acceptable values are:

                - `'tf'`: Return TensorFlow `tf.constant` objects.
                - `'pt'`: Return PyTorch `torch.Tensor` objects.
                - `'np'`: Return NumPy `np.ndarray` objects.
                - `'jax'`: Return JAX `jnp.ndarray` objects.

        Returns:
            [`BatchFeature`]: A [`BatchFeature`] with the following fields:

            - **input_ids** -- List of token ids to be fed to a model.
            - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
              `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
              `None`).
            - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
        r?   Nr   rZ   )r;   r?   r<   r   r   r   process_images   s   !zColPaliProcessor.process_imagesc                 K   re   )a  
        Prepare for the model one or several texts. This method is a wrapper around the `__call__` method of the ColPaliProcessor's
        [`ColPaliProcessor.__call__`].

        This method forwards the `text` and `kwargs` arguments to LlamaTokenizerFast's [`~LlamaTokenizerFast.__call__`].

        Args:
            text (`str`, `List[str]`, `List[List[str]]`):
                The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
                (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
                `is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
            return_tensors (`str` or [`~utils.TensorType`], *optional*):
                If set, will return tensors of a particular framework. Acceptable values are:

                - `'tf'`: Return TensorFlow `tf.constant` objects.
                - `'pt'`: Return PyTorch `torch.Tensor` objects.
                - `'np'`: Return NumPy `np.ndarray` objects.
                - `'jax'`: Return JAX `jnp.ndarray` objects.

        Returns:
            [`BatchFeature`]: A [`BatchFeature`] with the following fields:

            - **input_ids** -- List of token ids to be fed to a model.
            - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
              `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
              `None`).
        r@   Nr   rf   )r;   r@   r<   r   r   r   process_queriesC  s    z ColPaliProcessor.process_queriesr&   cpuquery_embeddingsztorch.Tensorpassage_embeddings
batch_sizeoutput_dtypeztorch.dtypeoutput_deviceztorch.devicec              	   C   s@  t |dkr
tdt |dkrtd|d j|d jkr"td|d j|d jkr0td|du r9|d j}g }tdt ||D ]U}g }tjjjj	||||  ddd}	tdt ||D ]'}
tjjjj	||
|
|  ddd}|
td	|	|jd
dd jdd q`|
tj|dd|| qCtj|ddS )aZ  
        Compute the late-interaction/MaxSim score (ColBERT-like) for the given multi-vector
        query embeddings (`qs`) and passage embeddings (`ps`). For ColPali, a passage is the
        image of a document page.

        Because the embedding tensors are multi-vector and can thus have different shapes, they
        should be fed as:
        (1) a list of tensors, where the i-th tensor is of shape (sequence_length_i, embedding_dim)
        (2) a single tensor of shape (n_passages, max_sequence_length, embedding_dim) -> usually
            obtained by padding the list of tensors.

        Args:
            query_embeddings (`Union[torch.Tensor, List[torch.Tensor]`): Query embeddings.
            passage_embeddings (`Union[torch.Tensor, List[torch.Tensor]`): Passage embeddings.
            batch_size (`int`, *optional*, defaults to 128): Batch size for computing scores.
            output_dtype (`torch.dtype`, *optional*, defaults to `torch.float32`): The dtype of the output tensor.
                If `None`, the dtype of the input embeddings is used.
            output_device (`torch.device` or `str`, *optional*, defaults to "cpu"): The device of the output tensor.

        Returns:
            `torch.Tensor`: A tensor of shape `(n_queries, n_passages)` containing the scores. The score
            tensor is saved on the "cpu" device.
        r   zNo queries providedzNo passages providedz/Queries and passages must be on the same devicez-Queries and passages must have the same dtypeNT)Zbatch_firstZpadding_valuezbnd,csd->bcnsr   )dim   rF   )rI   r5   ZdeviceZdtyperangetorchnnutilsZrnnZpad_sequencerX   Zeinsummaxsumcatto)r;   rj   rk   rl   rm   rn   Zscoresr#   Zbatch_scoresZbatch_queriesjZbatch_passagesr   r   r   score_retrievale  s2    


 "z ColPaliProcessor.score_retrieval)NNN)NNNNra   )r&   Nri   )"r   r   r   __doc__
attributesZvalid_kwargsZimage_processor_classZtokenizer_classr1   r   rV   __annotations__r2   r:   r   r   r   r   r   r   r   r   rZ   r\   r`   propertyrb   rW   rg   rh   intr   rz   __classcell__r   r   r=   r   r-   K   s|   
 "



#
&
r-   )typingr   r   r   r   Zfeature_extraction_utilsr   Zimage_utilsr   r	   r
   Zprocessing_utilsr   r   r   Ztokenization_utils_baser   r   r   rt   r   rr   r   r7   rq   r8   r,   r-   __all__r   r   r   r   <module>   s    $  
]