o
    Zh'                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZ d dlZd dl	m
Z d dlmZ d dlmZmZ d dlmZ d dlmZmZ d dlmZmZmZmZmZ d d	lmZmZmZ d d
l m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z) g dZ*G dd de'Z+G dd deZ,dee-ej.f dee-ej.f fddZ/dee-ej.f dee-ej.f fddZ0e1dkrAG dd deZ2e 3 Z4e4j5de-ddd  e2D e2j6d! e4j5d"e-d#d$ e4j5d%e-d&d$ e47 Z8e9d'e8j: d(e8j; d)e8j< d* d+e8j: d,Z=e8j<e2j6j>krej?@e8j:re0e8j:e8j; dS e9e= dS e8j<e2jAj>kr9ej?Be8j:r3e/e8j:e8j; dS e9e= dS eCd-e8j< dS ).    N)Enum)castOptionalUnion)narrow_tensor_by_index)FileSystemReaderFileSystemWriter)flatten_state_dict)_EmptyStateDictLoadPlannerDefaultLoadPlanner)MetadataSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeLoadPlanLoadPlanner)_create_chunk_list)_load_state_dict)_save_state_dict)StorageReader)Future)dcp_to_torch_savetorch_save_to_dcpBroadcastingTorchSaveReaderDynamicMetaLoadPlannerc                   @   s   e Zd ZdZ		ddeeeejf  de	ddfddZ
defd	d
Zdededed fddZdededdfddZdedefddZdee dee fddZddeeejdf ddfddZedeeejf defddZdS )r   aI  
    StorageReader for reading a Torch Save file. This reader will read the entire checkpoint
    on the coordinator rank, and then broadcast and shard each tensor to all ranks.

    . N.B. Intended to be used with DynamicMetaLoadPlanner

    .. warning::
        Current implementation only supports loading Tensors.

    >>> # xdoctest: +SKIP("undefined vars")
    >>> sd = {"mode": model}
    >>> dcp.load(
    >>>    sd,
    >>>    storage_reader=BroadcastingTorchSaveReader(),
    >>>    planner=DynamicMetaLoadPlanner(),
    >>>    checkpoint_id="path_to_model.pt"
    >>> )
    Nr   checkpoint_idcoordinator_rankreturnc                 C   s   || _ || _d S N)r   r   )selfr   r    r"   X/var/www/auris/lib/python3.10/site-packages/torch/distributed/checkpoint/format_utils.py__init__;   s   
z$BroadcastingTorchSaveReader.__init__c                 C   s
   t i dS )zGExtends the default StorageReader to support building the metadata filestate_dict_metadata)r   )r!   r"   r"   r#   read_metadataC   s   
z)BroadcastingTorchSaveReader.read_metadataplanplannerc           
   	   C   sF  t t|}| jr"| jdusJ tj| jddd}|jr!t|\}}nd}|jD ]q}|jt	j
kr?td|jj dt| j d| jrQtj }||jj |}n
t|j|jj }tj|| jdd t||j|j}|| }| | ksJ d	|j d
|  d|  || ||| q't }	|	 d |	S )z
        Reads torch save data on the coordinator rank, and broadcast afterwards
        this incurrs a communication cost, but avoids having to load
        the entire checkpoint on each rank, hopefully preventing OOM issues
        NcpuF)Zmap_locationweights_onlyNon-tensor value identified at . At this time  only supports loading Tensors.)srcZasync_opzreq z mismatch sizes, z vs )!r   r   is_coordinatorr   torchloadr	   itemstyper   ZBYTE_IORuntimeErrorZstorage_indexZfqn__name__distZdistributed_c10dZ_get_pg_default_devicetoZ
empty_like
state_dict	broadcastr   r   Zstorage_offsetslengthsZresolve_tensordetachsizeZcopy_Zcommit_tensorr   
set_result)
r!   r(   r)   Ztorch_state_dict_reqZ	pg_devicetensorZtarget_tensorfutr"   r"   r#   	read_dataI   sH   





z%BroadcastingTorchSaveReader.read_datametadatar0   c                 C   s0   || _ | j rt | jksJ | jdusJ dS *Implementation of the StorageReader methodN)r0   r7   Zget_rankr   r   )r!   rD   r0   r"   r"   r#   set_up_storage_reader|   s   z1BroadcastingTorchSaveReader.set_up_storage_readerc                 C      |S rF   r"   )r!   r(   r"   r"   r#   prepare_local_plan      z.BroadcastingTorchSaveReader.prepare_local_planglobal_planc                 C   rH   rI   r"   )r!   rL   r"   r"   r#   prepare_global_plan   rK   z/BroadcastingTorchSaveReader.prepare_global_planc                 C   s
   || _ dS rE   )r   )r!   r   r"   r"   r#   reset   s   
z!BroadcastingTorchSaveReader.resetc                 C   s   t j|S rI   )ospathisfile)clsr   r"   r"   r#   validate_checkpoint_id   s   z2BroadcastingTorchSaveReader.validate_checkpoint_id)Nr   r    )r6   
__module____qualname____doc__r   r   strrO   PathLikeintr$   r   r'   r   r   r   rC   boolrG   rJ   listrM   rN   classmethodrS   r"   r"   r"   r#   r   '   s&    
3 "r   c                	       s<   e Zd ZdZ		d
dedee deddf fdd	Z  Z	S )r   a  
    Extension of DefaultLoadPlanner, which creates a new Metadata object based on the passed in state dict,
    avoiding the need to read metadata from disk. This is useful when reading formats which don't have a
    metadata file, like Torch Save files.

    . N.B. Intended to be used with BroadcastingTorchSaveReader

    .. warning::
        Current implementation only supports loading Tensors.

    >>> # xdoctest: +SKIP("undefined vars")
    >>> sd = {"mode": model}
    >>> dcp.load(
    >>>    sd,
    >>>    storage_reader=BroadcastingTorchSaveReader(),
    >>>    planner=DynamicMetaLoadPlanner(),
    >>>    checkpoint_id="path_to_model.pt"
    >>> )
    NFr9   rD   r0   r   c                    s~   t  ||| i }| j D ]'\}}t|s&td| dt| j dt	t
|jd| t|||< qt|d| _dS )zdSetups of the planner, extnding default behavior by creating the Metadata object from the state dictr,   r-   r.   )dtyper%   N)superset_up_plannerr9   r3   r1   Z	is_tensorr5   r4   r6   r   r   r]   r=   r   r   rD   )r!   r9   rD   r0   r&   keyrA   	__class__r"   r#   r_      s   


z%DynamicMetaLoadPlanner.set_up_planner)NF)
r6   rT   rU   rV   r   r   r   rZ   r_   __classcell__r"   r"   ra   r#   r      s    r   dcp_checkpoint_dirtorch_save_pathc                 C   s*   i }t |t| t dd t|| dS )aq  
    Given a directory containing a DCP checkpoint, this function will convert it into a
    Torch save file.

    Args:
        dcp_checkpoint_dir: Directory containing the DCP checkpoint.
        torch_save_path: Filename to store the converted Torch save file.

    .. warning::
        To avoid OOM, it's recommended to only run this function on a single rank.
    T)Zstorage_readerr)   no_distN)r   r   r
   r1   save)rd   re   sdr"   r"   r#   r      s   r   c                 C   s$   t j| dd}t|t|dd dS )aB  
    Given the location of a torch save file, converts it into a DCP checkpoint.

    Args:
        torch_save_path: Filename of the Torch save file.
        dcp_checkpoint_dir: Directory to store the DCP checkpoint.

    .. warning::
        To avoid OOM, it's recommended to only run this function on a single rank.
    F)r+   T)Zstorage_writerrf   N)r1   r2   r   r   )re   rd   r9   r"   r"   r#   r      s   

r   __main__c                   @   s   e Zd ZdZdZdS )
FormatModeZtorch_to_dcpZdcp_to_torchN)r6   rT   rU   TORCH_TO_DCPDCP_TO_TORCHr"   r"   r"   r#   rj      s    rj   modezConversion modec                 C   s   g | ]}|j qS r"   )value).0mr"   r"   r#   
<listcomp>   s    rq   )r4   helpchoicesdefaultr/   zPath to the source model)r4   rr   dstzPath to the destination modelzConverting checkpoint from z to z using method: ''zNo checkpoint found at z. Skipping conversion.zUnknown conversion mode: )DargparserO   enumr   typingr   r   r   r1   Ztorch.distributeddistributedr7   Ztorch.distributed._shard._utilsr   Ztorch.distributed.checkpointr   r   Z)torch.distributed.checkpoint._nested_dictr	   Z,torch.distributed.checkpoint.default_plannerr
   r   Z%torch.distributed.checkpoint.metadatar   r   r   r   r   Z$torch.distributed.checkpoint.plannerr   r   r   Z,torch.distributed.checkpoint.planner_helpersr   Z.torch.distributed.checkpoint.state_dict_loaderr   Z-torch.distributed.checkpoint.state_dict_saverr   Z$torch.distributed.checkpoint.storager   Ztorch.futuresr   __all__r   r   rW   rX   r   r   r6   rj   ArgumentParserparseradd_argumentrk   
parse_argsargsprintr/   ru   rm   Zcheckpoint_missing_warningrn   rP   rQ   rl   isdir
ValueErrorr"   r"   r"   r#   <module>   st   o.


