o
    Zh?                  	   @   s  U d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ dd	lmZmZmZmZmZmZmZ dd
lmZmZmZmZm Z m!Z! ddl"m#Z#m$Z$ dgZ%e&e' e(d< dedede)fddZ*de&e de&e de&e fddZ+dej,defddZ-dedefddZ.dededefddZ/d e'dede fd!d"Z0d e'dedede fd#d$Z1d e'dej,de fd%d&Z2d e'd'efd(d)Z3d*d+ Z4d,d- Z5d e'd.ed/e&e de&e fd0dZ6d1edefd2d3Z7d e'd4ede&e  fd5d6Z8dedefd7d8Z9dej,de&e fd9d:Z:d e'd;ed<ede&e fd=d>Z;d1e<e'ef defd?d@Z=dAedBedCedDefdEdFZ>dS )G    N)AnyCallablecast)_get_device_module)ShardMetadata)ShardedTensor)DTensor)%compute_local_shape_and_global_offset   )BytesStorageMetadataChunkStorageMetadataMetadataIndexSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeReadItemSavePlanTensorWriteData	WriteItemWriteItemType)"_check_shard_metadata_pair_overlap+_shards_get_overlap_region_wrt_saved_tensor create_read_items_for_chunk_list__all__plan
other_planreturnc           
      C   s  | j |j krdS t| jt|jkrdS t| j|jD ]j\}}|j|jkr( dS |j}|j}|j|jks@|j|jks@|j|jkrC dS |j}|j}|rM|rQ|sT|rT dS |r|r|j	|j	kra dS |j
}|j
}	|rk|	ro|sr|	rr dS |r|	r|j|	jks|j|	jkr dS qdS )a  
    Compare the two Save plans and return True if they are equal.

    Args:
        plan (SavePlan): First SavePlan to compare.
        other_plan (SavePlan): Second SavePlan to compare.

    Returns:
       True if the two plans are equal, False otherwise.
    FT)usablelenitemsziptypeindexfqnoffsettensor_datasizechunkoffsetssizes)
r   r   Z	plan_itemZother_plan_itemZplan_metadata_indexZother_plan_metadata_indexr'   Zother_tensor_datar)   Zother_chunk r,   [/var/www/auris/lib/python3.10/site-packages/torch/distributed/checkpoint/planner_helpers.py_compare_save_plans'   sD   r.   cached_plansdelta_plansc                 C   s<   g }t | |D ]\}}|r|js|| q|| q|S )ac  
    Merge a list of delta plans into a single plan.

    Args:
        cached_plans (List[SavePlan]): A list of cached plans.
        delta_plans (List[SavePlan]): A list of delta plans to merge. It can contain empty plans

    Returns:
        A single merged plan. If a delta plan is not usable, use the cached plan. Otherwise, use the delta plan.
    )r"   r   append)r/   r0   Zmerged_plansZcached_planZ
delta_planr,   r,   r-   _merge_delta_local_plansh   s   
r2   tensorc                 C   s$   t tdgt|   |  dS )Nr   r*   r+   )r   torchSizer    r(   )r3   r,   r,   r-   _create_chunk_from_tensor   s   r7   shard_mdc                 C   s   t t| jt| jdS Nr4   )r   r5   r6   shard_offsetsZshard_sizes)r8   r,   r,   r-   _chunk_for_shard   s   

r;   sharded_tensorc                 C   s>   |   j}t|j|j|j|j|jd}tt	|||   j
dS )N)dtypelayoutrequires_gradmemory_format
pin_memoryr)   
propertiesr(   )metadataZtensor_propertiesr   r=   r>   r?   r@   rA   r   r;   r(   )r<   r8   Zshard_propertiesrC   r,   r,   r-   _sharded_tensor_metadata   s   
rE   r%   c              	   C   sb   t |j|j|j\}}t|t|}}tt| |tj	t
t||dt| | ddS )Nr4   rB   r$   r#   r'   )r	   shapedevice_mesh
placementsr5   r6   r   r   r   SHARDr   r   r   create_from_tensorto_localr(   )r%   r3   r+   r*   r,   r,   r-   _create_write_items_for_dtensor   s    rM   c                 C   s(   t |j}tt| |tjt||dS )NrF   )r5   r6   r:   r   r   r   rJ   rE   )r%   r<   r8   r*   r,   r,   r-   _create_write_item_for_shard   s   rN   c                 C   sN   t dgt|  }tt| |tjtt	|| dt
|| ddS )Nr   r4   rB   rF   )r5   r6   r    r(   r   r   r   TENSORr   r   r   rK   )r%   r3   r*   r,   r,   r-   _create_write_item_for_tensor   s   rP   bytesc                 C   s   t t| tjdS )N)r$   r#   )r   r   r   BYTE_IO)r%   rQ   r,   r,   r-   _create_write_item_for_bytesio   s   rS   c              	   C   s.   t tj| t|f|t|ft|fdS N)r#   
dest_indexdest_offsetsstorage_indexstorage_offsetslengths)r   r   rR   r5   r6   rU   Zdest_offsetrW   Zstorage_offsetlengthr,   r,   r-   _create_read_item_for_byteio   s   


r\   c              	   C   s(   t tj| t||t|t|dS rT   )r   r   rO   r5   r6   rU   rV   rW   rX   rY   r,   r,   r-   _create_read_item_for_tensor   s   r^   checkpoint_mdlocal_chunksc                 C   s   g }t |D ]L\}}t |jD ]B\}}t||sqg }g }	g }
t||dD ]\}}}}|| |	| |
| q%|tt| |j||	t| |j|||
d qq|S )aW  
    Create a list of ``ReadItem`` based on the checkpoint and local chunks.

    This applies the resharding algorithm and computes the reads needed
    to satisfy ``local_chunks`` with a checkpoint described by ``checkpoint_md``.

    Args:
        fqn (str) : The state_dict FQN to pass to ``ReadItem``.
        checkpoint_md (TensorStorageMetadata): metadata for a given tensor
            from a checkpoint.
        local_chunks (List[ChunkStorageMetadata]): Local chunks that needs to be
            loaded.

    Returns:
        A list of ``ReadItem`` that will satisfy all input chunks.
    )Zsaved_shardZcurrent_shardr]   )	enumeratechunksr   r   r1   r^   r   r*   )r%   r_   r`   Z
read_itemsidxshardZstorage_idxZ
storage_mdrX   rV   rY   Z_dimZoffset_for_saved_tensorZoffset_for_current_tensorr[   r,   r,   r-   r      s<   



state_dictc                    s   g }|   D ]?\ ttr|t  qttr.| fdd jD  qtt	j
r=|t  q|t  qt|S )Nc                 3   s    | ]	}t  |V  qd S )N)rN   ).0r8   r%   objr,   r-   	<genexpr>,  s
    

z5_create_default_metadata_only_plan.<locals>.<genexpr>)r!   
isinstancer   r1   rM   r   extendrD   Zshards_metadatar5   TensorrP   rS   r   )re   requestsr,   rg   r-   "_create_default_metadata_only_plan&  s   


rn   objectc                    s\   t dr S ttr fdd D S ttjr(t gS t gS )N__create_write_items__c                    s   g | ]	}t  |jqS r,   )rN   rD   rf   rd   r%   ro   r,   r-   
<listcomp><      z'_create_write_items.<locals>.<listcomp>)	hasattrrp   rj   r   local_shardsr5   rl   rP   rS   rr   r,   rr   r-   _create_write_items7  s   

rw   c                 C   s8   t | j| j| j\}}t|t|}}t||dS r9   )r	   rG   rH   rI   r5   r6   r   )r3   r+   r*   r,   r,   r-   _create_chunk_from_dtensorF  s   rx   c                 C   sb   t | dr|  }|S t| trdd |  D }|S t| tjr(t| g}|S tdt	|  )N__create_chunk_list__c                 S   s   g | ]}t |jqS r,   )r;   rD   rq   r,   r,   r-   rs   V  s    
z&_create_chunk_list.<locals>.<listcomp>zMUnsupported Type, expecting one of [Tensor, DTensor, ShardedTensor] ,but got )
ru   ry   rj   r   rv   r5   rl   r7   
ValueErrorr#   )r3   r`   r,   r,   r-   _create_chunk_listQ  s    


r{   mdrh   c              
   C   sx   t |ts.zt|}W n ty' } ztd|  ddt|  |d }~ww t| ||S tt| dt| dddgS )Nz Invalid checkpoint metadata for z, z(expected BytesStorageMetadata but found r   rZ   )rj   r   r{   rz   r#   r   r\   r   )r%   r|   rh   r`   exr,   r,   r-   _create_read_itemsd  s,   

r~   c                 C   s>   dt fdd}dtfdd}dtjfdd}t| ||| dS )	zP
    Initializes meta tensor if the meta tensor is DTensor or torch.Tensor.
    valuec                 S   st   t | dd }|tdkr8tj j}ttjt|	 }tj
|  |d}tj|| j| j|  |  d}|S | S )Ndevicemetar   )rH   rI   rG   stride)getattrr5   r   distdistributed_c10d_get_pg_default_devicer#   r   r   current_device
empty_likerL   r   Z
from_localrH   rI   r(   r   )r   r   device_typeZnew_local_tensorZdtensorr,   r,   r-   dtensor_func  s    z&_init_state_dict.<locals>.dtensor_funcc                 S   s2   t | dd }|tdkrtdt|  d| S )Nr   r   zFound unsupported type z for meta device loading.)r   r5   r   RuntimeErrorr#   )r   r   r,   r,   r-   sharded_tensor_func  s   z-_init_state_dict.<locals>.sharded_tensor_funcc                 S   sP   t | dd }|tdkr&tj j}ttjt|	 }tj
| |d}|S | S )Nr   r   r   )r   r5   r   r   r   r   r#   r   r   r   r   )r   r   r   r3   r,   r,   r-   tensor_func  s   z%_init_state_dict.<locals>.tensor_funcN)r   r   r5   rl   _iterate_state_dict)re   r   r   r   r,   r,   r-   _init_state_dict{  s   	r   iter_objectr   r   r   c                    s   t | tr	 | S t | tr| S t | tjr| S t | ttttt	j
fs+| du r-| S t | trF|  D ]\}}t| | |< q6| S t | ttfrc fdd| D }t | trat|}|S dS )a$  
    Iterate through the state dict, applying the given functions to each tensor type
    and update the state dict in place.

    Args:
        iter_object (Any): the target state_dict.
        sharded_tensor_func (Callable): the function to apply to ShardedTensor
        dtensor_func (Callable): the function to apply to DTensor
        tensor_func (Callable): the function to apply to Tensor

    # TODO: let state_dict_util._iterate_state_dict() to support in place option
    so we don't need to have two versions of _iterate_state_dict.
    Nc                    s   g | ]	}t | qS r,   )r   )rf   vr   r   r   r,   r-   rs     rt   z'_iterate_state_dict.<locals>.<listcomp>)rj   r   r   r5   rl   intfloatstrrQ   ioBytesIOdictr!   r   listtuple)r   r   r   r   keyr   retr,   r   r-   r     s0   




r   )?r   typingr   r   r   r5   Ztorch.distributeddistributedr   Ztorch._utilsr   Z!torch.distributed._shard.metadatar   Z'torch.distributed._shard.sharded_tensorr   Ztorch.distributed.tensorr   Ztorch.distributed.tensor._utilsr	   rD   r   r   r   r   r   r   r   Zplannerr   r   r   r   r   r   Z
reshardingr   r   r   r   r   __annotations__boolr.   r2   rl   r7   r;   rE   rM   rN   rP   rS   r\   r^   r   rn   rw   rx   r{   r~   r   r   r   r,   r,   r,   r-   <module>   s   
$ 	A



77