o
    ZhL                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	 d dl
Z
d dlmZ d dlm  m  mZ d dlm  m  mZ d dlmZ d dlmZmZ d dlmZmZmZmZmZ d dlm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z&m'Z' d dl(m)Z) d dl*m+Z+m,Z,m-Z- zd dl.m/Z0 W n e1y   d dl.m2Z0 Y nw e
j3j4Z4e5e6Z7de
j8j9de:e;df de<e=e;f de;fddZ>de
j8j9de:e;df de<e=e;f de?fddZ@de
j8j9de:e;df de<e=e;f ddfddZAG dd dZBdS )    N)Sequence)castOptional)
DeviceMesh)DTensorSpec
TensorMeta)_is_inplace_op_is_out_variant_opOpInfoOpSchemaOutputSpecType)is_rng_supported_mesh)redistribute_local_tensor)ShardingPropagator)convolution_backward_handlerconvolution_handler)try_find_mesh_from_args)Partial	Placement	Replicate)_cxx_pytree)_pytreeop_callargs.kwargsreturnc                 C   s$   | j |i |}|tur|S td)z
    Decomposes a op to core ATen op, this handler is mostly here
    for inference mode usage where the ops are not core aten ops.
    zDecomposition failed)Z	decomposeNotImplementedRuntimeError)r   r   r   r r   Q/var/www/auris/lib/python3.10/site-packages/torch/distributed/tensor/_dispatch.pydecompose_handler+   s   	r!   c                 C   s,   t tj|d }t tj|d }|j|jkS )Nr      )r   torchTensorshape)r   r   r   lhsrhsr   r   r    is_same_size_handler;   s   r(   c                 C   s   t jj| ||}tttt |j	|j
}tttdf |}| |i |j ttt j |d d }|j}|j}g }|D ]}	t|	trJ||	 q=|td q=ttj|d }
t|t|t|
 |
 |
jdd}t j|
|dd}| }|
| d S )	N.r   maxr"   r%   stridedtype)mesh
placementstensor_metaF)local_tensorspecrequires_grad)dtensorDTensorZ_op_dispatcherunwrap_to_op_infopytreetree_unflattenr   listobject
local_argsargs_tree_spectuplelocal_kwargsr.   device_mesh
isinstancer   appendr   r#   r$   r   r   sizer+   r,   Zfull_tensorZcopy_)r   r   r   op_infolocal_tensor_argsZgrad_dtensorZgrad_placementsr-   Zfound_inf_placementsZ	placementZtarget_tensorr1   Zfound_inf_dtensorZ	found_infr   r   r    found_inf_reduce_handlerE   s<   
	rD   c                   @   s   e Zd ZdZdddZdejjdee	df d	e
ee	f de	fd
dZedededdfddZdejjdee	df d	e
ee	f defddZede	dede	fddZdejjdejdedefddZdS )OpDispatchera  
    Op dispatching class instance to handle args/kwargs pre-processing (un-wrapping), sharding
    propagation, redistribute local args, local compute, and post-processing (re-wrapping). It
    also handles any op specific logic if necessary.

    NOTE: Given the runtime overhead of Tensor subclass (__torch_dispatch__), the OpDispatcher
    is designed to minimize the CPU overhead by using the tricks of proper unflattening, faster
    pytree if needed, and leveraging various caching mechanisms implemented in the sharding
    propagation and redistribute modules. The CPU overhead is critical to eager mode performance,
    one need to carefully measure the CPU overhead when making significant changes to the
    OpDispatcher and ShardingPropagator.
    r   Nc                 C   s   t  | _tjjtjjtjjtjjtjjtjj	tjj
tjjtjjtjjh
| _tjjttjjttjjttjjttjjttjjti| _d| _d S )NF)r   sharding_propagatoratenZnative_dropoutdefaultZnormal_Z	rand_likeZ
randn_likeZrandint_likeZ	low_dtypeZlow_dtype_outZuniform_Z	bernoulliZ
bernoulli_float_random_opsZlinearr!   matmulZis_same_sizer(   Zconvolutionr   Zconvolution_backwardr   Z*_amp_foreach_non_finite_check_and_unscale_rD   _custom_op_handlers_allow_implicit_replication)selfr   r   r    __init__|   s(   
zOpDispatcher.__init__r   r   .r   c                    s\  || j v r| j | |||S | |||}td|j | j| |j}td|| |dus4J d|j}|	 dur|j
rN|jdusGJ | ||j |jr^tttt |j|jn|j}tttdf |}|| jv rtjs|t|r|t|t_ttj|d ttj|d }}	tjr|	jstj|j nt!" }
|
 ||i |j#}W d   n1 sw   Y  nW||i |j#}nN|j$}|jj%j&j'}|du rd}n>dt(dtjfd	d
 t)|t(r |}n*t)|t*r fdd|D }t)|tsJ d|v rt+|d j,}t-d| d|j$du r?|t.j/j0kr?dd t1t23 D }t24|| tt5dd |}t67t8j9|d}t:|rP|j$durN|d S dS t;|rt)|j$ts`|j$fn|j$}g }d}|j&j<D ]!}|j=rttj||j> }tt(|| |_ |?| |d7 }qkt@|dksJ dt@|dkrt|S |d S | A||j$S )z(
        Main dispatching logic
        zDispatching op_call: %szoutput_sharding for %s: %sNz"output sharding should not be None.r   r1   r   c                 S   sP   | j d ur!| j j}| j j}t|dkrtjd|dS tjg |dS t|  d)Nr   r   )r,   z has no tensor metadata.)r/   r%   r,   lenr#   Zzerostensorr   )r1   r%   r,   r   r   r    default_tensor   s   
z-OpDispatcher.dispatch.<locals>.default_tensorc                    s    g | ]}|d ur |nd qS Nr   ).0srR   r   r    
<listcomp>   s    z)OpDispatcher.dispatch.<locals>.<listcomp>zreturn type z in DTensor op is not supportedc                 S   s   g | ]}d qS rS   r   )rT   _r   r   r    rW     s    c                 S   s   | d uS rS   r   )xr   r   r    <lambda>  s    z'OpDispatcher.dispatch.<locals>.<lambda>Tr"   z,out variant should have at least one out arg)BrL   r5   loggerdebugZschemarF   	propagateoutput_shardingcompute_meshZget_coordinateZneeds_redistributeZredistribute_schemaredistribute_local_argsr;   r6   r7   r   r8   r9   r:   r<   rJ   randomZ_rng_trackerr   ZOffsetBasedRNGTrackerr3   r4   r#   r$   is_metaZ_distribute_region_spec
contextlibnullcontextr=   Zoutput_specopZ_schemareturnsr   r?   r   strtypeNotImplementedErrorrG   equalrH   rangedistZget_world_sizeZall_gather_objectfilter	functoolsreduceoperatorand_r   r	   	argumentsZis_outnamer@   rP   wrap)rN   r   r   r   rB   r^   r-   rC   Z	first_argZfirst_local_argZrng_contextZlocal_resultsr1   Zret_listZret_typeZobj_listZoutput_specsZout_dtsZspec_idxargumentZout_dtr   rV   r    dispatch   s   

	









zOpDispatcher.dispatchrB   suggested_input_schemac           	      C   s   | j d urtt|j}n|j}g }t| jD ]1\}}|| }t|trDt	t
j| j| }||kr>t|||}|| q|| q|| qt|| _d S rS   )r;   r<   r6   Ztree_leavesargs_schema	enumerateZflat_args_schemar?   r   r   r#   r$   r:   r   r@   )	rB   rx   Zflatten_args_schema_to_reshardZnew_local_argsiZarg_specZreshard_arg_specr0   Zresharded_local_tensorr   r   r    r`   ,  s$   


z$OpDispatcher.redistribute_local_argsc                 C   s  | j j|d }|d ur|jrt|\}}|}n|d }}g }i }	g }
i }d }|D ]C}t|tjrF|
	|j
 |	|j |d u rE|j}q*t|tjrc|pRt||}|	| ||| |
	| q*|	| |
	| q*| D ]8\}}t|tjr|j
||< |j|	|< qrt|tjr|pt||}| ||||	|< |||< qr||	|< |||< qr|d usJ d| dt|t||rt||nt||	|d|t|
||}|S )Nz*found no DeviceMesh from dtensor args for !)Zschema_info)rF   Zop_to_schema_infogetZneeds_pytreer6   Ztree_flattenr?   r3   r4   r@   Z_local_tensorrc   r>   r#   r$   r   %_try_replicate_spec_for_scalar_tensoritemsr
   r   r7   r<   )rN   r   r   r   Zruntime_schema_infoZ	tree_argsZ	args_specZ	args_listry   Zkwargs_schemar:   r=   r_   argkvrB   r   r   r    r5   J  s   







zOpDispatcher.unwrap_to_op_inforesr1   c                 C   s   t | tjr+|d ur t |tsJ d| dtj| || jdS | jdks)J d| S t | tt	fre|d ur=t |tt	fsEJ d| dg }t
| |D ]\}}|t|| qLt | t	rct	|S |S | S )NzBoutput spec does not match with output! Expected DTensorSpec, got .)r2   r   zoutput tensor should be scalar!zAoutput spec does not match with output! Expected list/tuple, got )r?   r#   r$   r   r3   r4   r2   ndimr8   r<   zipr@   rE   ru   )r   r1   Zres_listerU   r   r   r    ru     s"   

zOpDispatcher.wrap
tensor_argr_   c                 C   sn   |  dkr|jdkrtd |  dks| jr0t|t f|j t|j|	 |j
dd}|S t| d)Nr"   zFound a non-scalar tensor with numel=1 and ndim!=0, we are implicitly creating a replicated DTensor for it. However, please consider changing it to a scalar tensor or explicitly create a DTensor under distributed enviroment.r*   )r/   zw: got mixed torch.Tensor and DTensor, need to convert all torch.Tensor to DTensor before calling distributed operators!)Znumelr   warningswarnrM   r   r   r   r%   r+   r,   r   )rN   r   r   r_   Zreplication_specr   r   r    r~     s$   z2OpDispatcher._try_replicate_spec_for_scalar_tensor)r   N)__name__
__module____qualname____doc__rO   r#   _ops
OpOverloadr<   r9   dictrh   rw   staticmethodr
   r   r`   r5   r   ru   r$   r   r   r~   r   r   r   r    rE   n   sR    



 


UrE   )Crd   ro   loggingrq   r   collections.abcr   typingr   r   r#   Ztorch.distributeddistributedrm   Ztorch.distributed.tensor._apirQ   Z_apir3   Z torch.distributed.tensor._random_randomra   Ztorch.distributed.device_meshr   Z&torch.distributed.tensor._dtensor_specr   r   Z#torch.distributed.tensor._op_schemar   r	   r
   r   r   r   Z&torch.distributed.tensor._redistributer   Z'torch.distributed.tensor._sharding_propr   Z!torch.distributed.tensor._tp_convr   r   Ztorch.distributed.tensor._utilsr   Z(torch.distributed.tensor.placement_typesr   r   r   Ztorch.utilsr   r6   ImportErrorr   opsrG   	getLoggerr   r[   r   r   r<   r9   r   rh   r!   boolr(   rD   rE   r   r   r   r    <module>   sn   










)