a
    h@                     @   sV  U d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ dd	lmZmZmZmZmZmZmZ dd
lmZmZmZmZm Z m!Z! ddl"m#Z#m$Z$ dgZ%e&e' e(d< eee)dddZ*e&e e)dddZ+e&e e&e e&e dddZ,ej-edddZ.eedddZ/eeedddZ0e'ee d d!d"Z1e'eee d#d$d%Z2e'ej-e d d&d'Z3e'ed(d)d*Z4d+d, Z5d-d. Z6e'ee&e e&e d/d0dZ7eed1d2d3Z8e'ee&e  d4d5d6Z9eedd7d8Z:ej-e&e dd9d:Z;e'eee&e d;d<d=Z<e=e'ef ed1d>d?Z>eeeed@dAdBZ?dS )C    N)AnyCallablecast)_get_device_module)ShardMetadata)ShardedTensor)DTensor)%compute_local_shape_and_global_offset   )BytesStorageMetadataChunkStorageMetadataMetadataIndexSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeReadItemSavePlanTensorWriteData	WriteItemWriteItemType)"_check_shard_metadata_pair_overlap+_shards_get_overlap_region_wrt_saved_tensor create_read_items_for_chunk_list__all__)plan
other_planreturnc           
      C   s  | j |j krdS t| jt|jkr(dS t| j|jD ]\}}|j|jkrP dS |j}|j}|j|jks|j|jks|j|jkr dS |j}|j}|r|r|s|r dS |r6|r6|j	|j	kr dS |j
}|j
}	|r|	r|s|	r dS |r6|	r6|j|	jks|j|	jkr6 dS q6dS )a  
    Compare the two Save plans and return True if they are equal.

    Args:
        plan (SavePlan): First SavePlan to compare.
        other_plan (SavePlan): Second SavePlan to compare.

    Returns:
       True if the two plans are equal, False otherwise.
    FT)usablelenitemsziptypeindexfqnoffsettensor_datasizechunkoffsetssizes)
r   r   Z	plan_itemZother_plan_itemZplan_metadata_indexZother_plan_metadata_indexr'   Zother_tensor_datar)   Zother_chunk r,   Z/var/www/auris/lib/python3.9/site-packages/torch/distributed/checkpoint/planner_helpers.py_compare_save_plans'   sL    




r.   )delta_plansr   c                 C   s   t dd | D S )z
    Check if any delta plan is usable, indicating the plan has changed.

    Args:
        delta_plans (List[SavePlan]): A list of delta plans to check.
    Returns:
        True if any delta plan is usable, False otherwise.
    c                 s   s   | ]}|o|j V  qd S N)r   ).0
delta_planr,   r,   r-   	<genexpr>q       z(_contains_usable_plan.<locals>.<genexpr>)any)r/   r,   r,   r-   _contains_usable_planh   s    	r6   )cached_plansr/   r   c                 C   s<   g }t | |D ](\}}|r,|js,|| q|| q|S )ac  
    Merge a list of delta plans into a single plan.

    Args:
        cached_plans (List[SavePlan]): A list of cached plans.
        delta_plans (List[SavePlan]): A list of delta plans to merge. It can contain empty plans

    Returns:
        A single merged plan. If a delta plan is not usable, use the cached plan. Otherwise, use the delta plan.
    )r"   r   append)r7   r/   Zmerged_plansZcached_planr2   r,   r,   r-   _merge_delta_local_planst   s    
r9   )tensorr   c                 C   s$   t tdgt|   |  dS )Nr   r*   r+   )r   torchSizer    r(   )r:   r,   r,   r-   _create_chunk_from_tensor   s    r>   )shard_mdr   c                 C   s   t t| jt| jdS Nr;   )r   r<   r=   shard_offsetsZshard_sizes)r?   r,   r,   r-   _chunk_for_shard   s    

rB   )sharded_tensorr?   r   c                 C   s>   |   j}t|j|j|j|j|jd}tt	|||   j
dS )N)dtypelayoutrequires_gradmemory_format
pin_memoryr)   
propertiesr(   )metadataZtensor_propertiesr   rD   rE   rF   rG   rH   r   rB   r(   )rC   r?   Zshard_propertiesrJ   r,   r,   r-   _sharded_tensor_metadata   s    
rL   )r%   r:   r   c              	   C   sb   t |j|j|j\}}t|t| }}tt| |tj	t
t||dt| | ddS )Nr;   rI   r$   r#   r'   )r	   shapedevice_mesh
placementsr<   r=   r   r   r   SHARDr   r   r   create_from_tensorto_localr(   )r%   r:   r+   r*   r,   r,   r-   _create_write_items_for_dtensor   s     rT   )r%   rC   r?   r   c                 C   s(   t |j}tt| |tjt||dS )NrM   )r<   r=   rA   r   r   r   rQ   rL   )r%   rC   r?   r*   r,   r,   r-   _create_write_item_for_shard   s    rU   c                 C   sN   t dgt|  }tt| |tjtt	|| dt
|| ddS )Nr   r;   rI   rM   )r<   r=   r    r(   r   r   r   TENSORr   r   r   rR   )r%   r:   r*   r,   r,   r-   _create_write_item_for_tensor   s    rW   r%   bytesc                 C   s   t t| tjdS )N)r$   r#   )r   r   r   BYTE_IOrX   r,   r,   r-   _create_write_item_for_bytesio   s    r[   c              	   C   s.   t tj| t|f|t|ft|fdS N)r#   
dest_indexdest_offsetsstorage_indexstorage_offsetslengths)r   r   rZ   r<   r=   r]   Zdest_offsetr_   Zstorage_offsetlengthr,   r,   r-   _create_read_item_for_byteio   s    


rd   c              	   C   s(   t tj| t||t|t|dS r\   )r   r   rV   r<   r=   r]   r^   r_   r`   ra   r,   r,   r-   _create_read_item_for_tensor   s    rf   )r%   checkpoint_mdlocal_chunksr   c                 C   s   g }t |D ]\}}t |jD ]\}}t||s2qg }g }	g }
t||dD ]*\}}}}|| |	| |
| qJ|tt| |j||	t| |j|||
d qq|S )aW  
    Create a list of ``ReadItem`` based on the checkpoint and local chunks.

    This applies the resharding algorithm and computes the reads needed
    to satisfy ``local_chunks`` with a checkpoint described by ``checkpoint_md``.

    Args:
        fqn (str) : The state_dict FQN to pass to ``ReadItem``.
        checkpoint_md (TensorStorageMetadata): metadata for a given tensor
            from a checkpoint.
        local_chunks (List[ChunkStorageMetadata]): Local chunks that needs to be
            loaded.

    Returns:
        A list of ``ReadItem`` that will satisfy all input chunks.
    )Zsaved_shardZcurrent_shardre   )	enumeratechunksr   r   r8   rf   r   r*   )r%   rg   rh   Z
read_itemsidxshardZstorage_idxZ
storage_mdr`   r^   ra   Z_dimZoffset_for_saved_tensorZoffset_for_current_tensorrc   r,   r,   r-   r      s:    


	)
state_dictr   c                    s   g }|   D ]~\ ttr0|t  qttr\| fdd jD  qtt	j
rz|t  q|t  qt|S )Nc                 3   s   | ]}t  |V  qd S r0   )rU   )r1   r?   r%   objr,   r-   r3   8  s   z5_create_default_metadata_only_plan.<locals>.<genexpr>)r!   
isinstancer   r8   rT   r   extendrK   Zshards_metadatar<   TensorrW   r[   r   )rm   requestsr,   rn   r-   "_create_default_metadata_only_plan2  s    


rt   )r%   objectr   c                    s`   t dr S ttr8 fdd D S ttjrPt gS t gS d S )N__create_write_items__c                    s   g | ]}t  |jqS r,   )rU   rK   r1   rl   r%   ru   r,   r-   
<listcomp>H  s   z'_create_write_items.<locals>.<listcomp>)	hasattrrv   rp   r   local_shardsr<   rr   rW   r[   rx   r,   rx   r-   _create_write_itemsC  s    

r|   c                 C   s8   t | j| j| j\}}t|t| }}t||dS r@   )r	   rN   rO   rP   r<   r=   r   )r:   r+   r*   r,   r,   r-   _create_chunk_from_dtensorR  s    r}   c                 C   s`   t | dr|  }nHt| tr2dd |  D }n*t| tjrJt| g}ntdt	|  |S )N__create_chunk_list__c                 S   s   g | ]}t |jqS r,   )rB   rK   rw   r,   r,   r-   ry   b  s   z&_create_chunk_list.<locals>.<listcomp>zMUnsupported Type, expecting one of [Tensor, DTensor, ShardedTensor] ,but got )
rz   r~   rp   r   r{   r<   rr   r>   
ValueErrorr#   )r:   rh   r,   r,   r-   _create_chunk_list]  s    


r   )r%   mdro   r   c              
   C   s   t |tshzt|}W nD tyZ } z,td|  ddt|  |W Y d }~n
d }~0 0 t| ||S tt| dt| dddgS d S )Nz Invalid checkpoint metadata for z, z(expected BytesStorageMetadata but found r   rb   )rp   r   r   r   r#   r   rd   r   )r%   r   ro   rh   exr,   r,   r-   _create_read_itemsp  s(    

r   c                 C   s>   t ddd}tddd}tjddd}t| ||| dS )	zP
    Initializes meta tensor if the meta tensor is DTensor or torch.Tensor.
    )valuec                 S   sx   t | dd }|tdkrptj j}ttjt|	 }tj
|  |d}tj|| j| j|  |  d}|S | S d S )Ndevicemetar   )rO   rP   rN   stride)getattrr<   r   distdistributed_c10d_get_pg_default_devicer#   r   r   current_device
empty_likerS   r   Z
from_localrO   rP   r(   r   )r   r   device_typeZnew_local_tensorZdtensorr,   r,   r-   dtensor_func  s     z&_init_state_dict.<locals>.dtensor_funcc                 S   s8   t | dd }|tdkr0tdt|  dn| S d S )Nr   r   zFound unsupported type z for meta device loading.)r   r<   r   RuntimeErrorr#   )r   r   r,   r,   r-   sharded_tensor_func  s    z-_init_state_dict.<locals>.sharded_tensor_funcc                 S   sT   t | dd }|tdkrLtj j}ttjt|	 }tj
| |d}|S | S d S )Nr   r   r   )r   r<   r   r   r   r   r#   r   r   r   r   )r   r   r   r:   r,   r,   r-   tensor_func  s    z%_init_state_dict.<locals>.tensor_funcN)r   r   r<   rr   _iterate_state_dict)rm   r   r   r   r,   r,   r-   _init_state_dict  s    	r   )iter_objectr   r   r   c                    s   t | tr | S t | tr$| S t | tjr8| S t | ttttt	j
fsV| du rZ| S t | tr|  D ]\}}t| | |< ql| S t | ttfrƇ fdd| D }t | trt|}|S dS )a$  
    Iterate through the state dict, applying the given functions to each tensor type
    and update the state dict in place.

    Args:
        iter_object (Any): the target state_dict.
        sharded_tensor_func (Callable): the function to apply to ShardedTensor
        dtensor_func (Callable): the function to apply to DTensor
        tensor_func (Callable): the function to apply to Tensor

    # TODO: let state_dict_util._iterate_state_dict() to support in place option
    so we don't need to have two versions of _iterate_state_dict.
    Nc                    s   g | ]}t | qS r,   )r   )r1   vr   r   r   r,   r-   ry     s   z'_iterate_state_dict.<locals>.<listcomp>)rp   r   r   r<   rr   intfloatstrrY   ioBytesIOdictr!   r   listtuple)r   r   r   r   keyr   retr,   r   r-   r     s0    




r   )@r   typingr   r   r   r<   Ztorch.distributedZdistributedr   Ztorch._utilsr   Z!torch.distributed._shard.metadatar   Z'torch.distributed._shard.sharded_tensorr   Ztorch.distributed.tensorr   Ztorch.distributed.tensor._utilsr	   rK   r   r   r   r   r   r   r   Zplannerr   r   r   r   r   r   Z
reshardingr   r   r   r   r   __annotations__boolr.   r6   r9   rr   r>   rB   rL   rT   rU   rW   r[   rd   rf   r   rt   r|   r}   r   r   r   r   r   r,   r,   r,   r-   <module>   s^   
$	 A78