o
    Zhd5                     @   sJ  d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m  m
  mZ d dlm  mZ d dlmZmZmZmZ d dl	mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
l m!Z! d dl"m#Z#m$Z$m%Z%mZ& d dl'm(Z(m)Z) dgZ*de$de+ej,ej,f fddZ-de$de.de+ej,ej,f fddZ/de$de+ej,ej,f fddZ0de$de.defddZ1de$dej2defddZ3de$dej2fddZ4d ej5dej6d!e.dej5fd"d#Z7dej6d!e.d$e.d%e.d&ej2dej6fd'd(Z8dej6d!e.d)e#de$fd*d+Z9dej6de+ej6e:e f fd,d-Z;de$d.ee# dej6fd/d0Z<G d1d deZ=dS )2    N)AnycastOptional)ShardShardedTensorShardedTensorMetadataTensorProperties)ShardMetadata)ChunkShardingSpec)_mesh_resources)_set_fsdp_flattened)FSDPExtensions)_create_chunk_sharded_tensor)_remote_device)
DeviceMeshDTensor	Replicater   )_flatten_tensor_unflatten_tensorDTensorExtensionstensorreturnc                 C   s   | j }|jdksJ d| jd }dgt|   }|jdd}| jd  r8tt|j}| || }|||< t	
|| j fS )N   &Only 1D DeviceMeshes currently handledr   )Zmesh_dim)device_meshndim
placementslensizeis_shardr   DSharddimtorchSize_local_tensor)r   r   	placementoffsetsZ
num_chunksZ	shard_dim
chunk_size r(   U/var/www/auris/lib/python3.10/site-packages/torch/distributed/tensor/parallel/fsdp.py_get_box    s   
r*   idxc                    s(   t | \}}t fdd|D |fS )Nc                    s   g | ]}|  qS r(   r(   ).0valr+   r(   r)   
<listcomp>2   s    z _get_box_for.<locals>.<listcomp>)r*   r"   r#   )r   r+   r&   r   r(   r.   r)   _get_box_for0   s   r0   c                 C   s(   | j }| }|d usJ t| |d S )Nr   )r   Zget_coordinater0   )r   r   Zcoordr(   r(   r)   _get_local_box5   s   r1   dtcurrent_rankc                 C   sJ   | j }|jdksJ dt| \}}tt|t|d| d| jj dS )Nr   r   rank:/Zshard_offsetsZshard_sizesr%   )r   r   r1   r	   listr$   device)r2   r3   meshr&   sizesr(   r(   r)   _create_shard_md_from_dt<   s   r;   dt_pgc           	   
   C   s   g }t |}|dkrdnd}| jd  r| }nd}t|D ]%}t| |\}}|tt	|t	|d|dkr:|n| d| j
j d q!t||  t| j| j| jddS )Nr   r   r4   r5   r6   )dtypelayoutrequires_grad)Zshards_metadatar   tensor_properties)distget_rankr   r   r   ranger0   appendr	   r7   r$   r8   r   r   r=   r>   r?   )	r2   r<   Z	shards_mdZmy_rankZscapegoat_rankZshard_countir&   r:   r(   r(   r)   !_create_sharded_tensor_md_from_dtH   s0   


rF   c                 C   s    | j }|jdksJ d| S )Nr   r   )r   r   Z	get_group)r2   r9   r(   r(   r)   
_get_dt_pgo   s   rG   specrankc                 C   s   t | ts| S d}| jD ]}tt|}| |kr$| |jkr$d} nq|rVt| } t	| jD ]$\}}tt|}| |krU| |jkrUtd| d|j | j|< q1| S )z
    Rewrite ``spec`` to match the device of ``tensor``.

    FSDP.sharded_optim_state_dict sneakly ships optimizer state to CPU so if the original ShardingSpec
    produces CUDA metadata, ST construction bombs.
    FTr4   r5   )

isinstancer
   r   r   r   rI   r8   copydeepcopy	enumerate)rH   r   rI   ZrewriteprE   r%   r(   r(   r)   _rewrite_spec_if_neededu   s"   
	



rO   
world_sizenum_devices_per_nodepgc                 C   s  t | tu rCt|  dksJ |  }t|||||}|  d }t|t|j	g}t| 	 }	d|	j
_tj||	| jdd}
|
S t | tu r| j}|jdksUJ d| j}t|||tj |}t| }t|t| t|g}t| |}	d|	j
_tj||	|dd}
|
S t| ||||S )Nr   r   F)Zsharded_tensor_metadataZprocess_groupZ
init_rrefsr   )typer   r   local_shardsZlocal_tensorr   r   rK   rL   metadatar@   r?   Z+_init_from_local_shards_and_global_metadataZ_process_groupr   r   r   r$   r"   ZacceleratorZdevice_countrG   r;   rA   rB   rF   )r   rI   rP   rQ   rR   Zinner_paramZinner_stZouter_local_shardshardsZst_metaZst_outerr   r<   r(   r(   r)   _chunk_tensor   sh   
rW   r   c                 C   s   t |}|du rtd|jdk rtd|j dd|   } t| tjrUt| t	sUdd t
|jD }d	d t
|jD }td
|d
< t	j| ||ddj||dS | j}|d
 }|  } dd t
|jD }||d< dd t
|jD }td
|d< ||d< t	j| ||ddj||dS )z
    Shard a tensor to chunks along the first dimension.

    The local rank will gets its corresponding chunk as the local tensor to create a DTensor.
    Nz4No parent device_mesh is found for FSDP device_mesh.   z!Found parent device_mesh of ndim=,zbut meshes must be at least 2D.c                 S      g | ]}t  qS r(   r   r,   _r(   r(   r)   r/          z"_chunk_dtensor.<locals>.<listcomp>c                 S   rZ   r(   r[   r\   r(   r(   r)   r/      r^   r   F)Z	run_checkr   r   c                 S   rZ   r(   r[   r\   r(   r(   r)   r/     r^   c                 S   rZ   r(   r[   )r,   rE   r(   r(   r)   r/     r^   )r   Zget_root_meshRuntimeErrorr   detachclonerJ   r"   Tensorr   rC   r    Z
from_localredistributer   to_local)r   rI   r   Z	root_meshZreplicate_placementsZshard_placementsZtp_placementsZtp_placementr(   r(   r)   _chunk_dtensor   sF   


rh   c                 C   s\   t t|  }t|dkr!t|d jtu r!|d j}| }|} | t|dkr+|fS g fS )Nr   r   )r   r   rT   r   rS   r   )r   rV   Zinner_tensorr(   r(   r)   _pre_load_state_dict  s   
ri   parent_meshc                 C   sX   || j ksJ tt| j}tdt|d D ]}t ||< q| j| j |d} | 	 S )zGAll gather a DTensor in its FSDP dimension and return the local tensor.r   r   r_   )
r   r7   rK   rL   r   rC   r   r   rf   rg   )r   rj   r   rE   r(   r(   r)   _all_gather_dtensor)  s   rk   c                       s   e Zd ZdZd fddZdejdeejee	 f fddZ
dejd	e	dejfd
dZ	ddejdedededejdeej dejfddZdejdededejfddZdejdeejee f fddZdedee dejfddZ  ZS )r   z
    DTensorExtension is the TensorFlattener extension needed for 2D FSDP + TP.

    This is the implementation for FSDPExtensions defined in
    https://github.com/pytorch/pytorch/blob/main/torch/distributed/fsdp/_fsdp_extensions.py
    r   Nc                    s*   t    d | _|| _tj| j| _d S N)super__init__compute_streamdevice_handler"   Z_dynamodisablepost_unflatten_transform)selfrp   	__class__r(   r)   rn   E  s   

zDTensorExtensions.__init__r   c                 C      t |S rl   )r   rs   r   r(   r(   r)   pre_flatten_transformO     z'DTensorExtensions.pre_flatten_transformparam_extensionc                 C   s`   | j p| j }| j| t||| j| j d}t| |W  d    S 1 s)w   Y  d S )N)rp   ro   )ro   rp   Zcurrent_streamstreamr   r   )rs   r   rz   r{   resultr(   r(   r)   rr   U  s   $z*DTensorExtensions.post_unflatten_transformrI   rP   rQ   rR   r8   c                 C   s   t |||||S rl   )rW   )rs   r   rI   rP   rQ   rR   r8   r(   r(   r)   chunk_tensorh  s   	zDTensorExtensions.chunk_tensorr   c                 C   s   t |||S rl   )rh   )rs   r   rI   r   r(   r(   r)   chunk_dtensors  s   zDTensorExtensions.chunk_dtensorc                 C   rv   rl   )ri   rw   r(   r(   r)   pre_load_state_dict_transform{  ry   z/DTensorExtensions.pre_load_state_dict_transformrj   c                 C   s
   t ||S rl   )rk   )rs   r   rj   r(   r(   r)   all_gather_dtensor  s   
z$DTensorExtensions.all_gather_dtensor)r   Nrl   )__name__
__module____qualname____doc__rn   r"   re   tupler   r   rx   rr   intrA   ProcessGroupr8   r}   r   r~   r7   r   r   r   r   __classcell__r(   r(   rt   r)   r   =  sh    





)>rK   typingr   r   r   r"   Ztorch.distributeddistributedrA   Z&torch.distributed._shard.sharding_specZ_shardZsharding_specZ
shard_specZ"torch.distributed.distributed_c10dZdistributed_c10dZc10dZ'torch.distributed._shard.sharded_tensorr   r   r   r   r	   Z:torch.distributed._shard.sharding_spec.chunk_sharding_specr
   Ztorch.distributed.device_meshr   Z$torch.distributed.fsdp._common_utilsr   Z'torch.distributed.fsdp._fsdp_extensionsr   Z#torch.distributed.fsdp._shard_utilsr   Ztorch.distributed.remote_devicer   Ztorch.distributed.tensorr   r   r   r    Z6torch.distributed.tensor.parallel._data_parallel_utilsr   r   __all__r   r#   r*   r   r0   r1   r;   r   rF   rG   ZShardingSpecre   rO   rW   rh   r7   ri   rk   r   r(   r(   r(   r)   <module>   s   "
'

J
A

