o
    Zhx1                     @   s(  d dl Z d dlmZ d dlZd dlm  mZ d dlm	Z	 d dl
mZ ddlmZmZ ddlmZ e eZdejd	ejd
efddZdejd	ejded
dfddZdejd
ejfddZeeeef   Zdejd
dfddZdejd
efddZ dejdedefddZ!dejfddZ"dd Z#dS )    N)cast)is_symbolic)
OrderedSet   )configir)Vxcomm_buffer_typereturnc                 C   sP   t | }t|tjrdS | }t|tjrdS t|tjr&t| s&dS dS )ze
    Check if an input can be realized as a comm buffer of the specified
    `comm_buffer_type`.
    TF)		_get_data
isinstancer   ZLoopsget_output_specCommBufferLayoutFlexibleLayoutr   	get_numel)r	   r
   datalayout r   L/var/www/auris/lib/python3.10/site-packages/torch/_inductor/comm_lowering.pycan_realize_as_comm_buffer8   s   r   
group_namec                 C   s   |    t| }t|tjsJ | }t|tjrdS t|tjs*td| dt	|
 r8td| dtj|||d|_dS )z
    Realize an input as a comm buffer of the specified `comm_buffer_type`.

    Specifically, this realizes the underlying buffer if it's still unrealized
    and changes the layout of the buffer to `ir.CommBufferLayout`.
    NzOA buffer can only be realized as a comm buffer if it has `FlexibleLayout` (got ).zGA buffer with symbolic shape cannot be converted to a comm buffer (got )r   r
   r   )realizer   r   r   Bufferr   r   r   AssertionErrorr   r   r   )r	   r
   r   bufferr   r   r   r   realize_as_comm_bufferN   s.   	r   c                 C   sJ   t | jtjr| j jS t | jtjrttj| jjS td| j d)Nz\Expect the data attr of a `TensorBox` to be either an `ir.BaseView` or `ir.StorageBox` (got r   )	r   r   r   ZBaseViewZunwrap_viewZ
StorageBoxr   r   r   r	   r   r   r   r   r   s   r   c                 C   s   t ttj|  f dS )z
    If a non-blocking collective is lowered as a blocking collective, the wait
    node in the original graph becomes useless and we can skip the lowering it.
    N)_bufs_to_skip_waitaddidr   graphget_namer   r   r   r   mark_as_skip_wait   s   r$   c                 C   s   t tj|  ftv S N)r!   r   r"   r#   r   r   r   r   r   should_skip_wait   s   r&   inp	reduce_opc                 C   sP   ddl m} |  |  j }tjjo'||o't| t	j
jo'|dv o'|tjjkS )Nr   )is_symm_mem_enabled_for_group)sum)Z#torch.distributed._symmetric_memoryr)   r   Z	get_dtypeitemsizer   Z_collectiveZauto_selectr   r   CommBufferTypeSYMM_MEMZ#one_shot_all_reduce_threshold_bytes)r'   r(   r   r)   Zinp_sizer   r   r   $_should_lower_as_one_shot_all_reduce   s   
r.   c              	   C   s6   t | tjj| ttjjtjt	j
jjj| ||S r%   )r   r   r,   r-   pytreetree_map	TensorBoxcreateZFallbackKerneltorchopsZsymm_memZone_shot_all_reducedefaultr'   r(   r   r   r   r   _one_shot_all_reduce   s   
r7   c               	      s  zt jjj W n ty   td Y d S w ddlm m	m
mm  fdd} t jj| jdtjdtdtd	tjffd
d}| jdtjdtdtd	tjffdd}| jfdd}| jfdd}| jfdd}| jfdd}| jfdd}| jfdd}| jfdd}	| jfdd}
| jfdd}| jfd d!}| t jjjd"d# }| jfd$d%}d S )&NzRInductor support for distributed collectives depends on building torch.distributedr   )add_layout_constraintcloneconstrain_to_fx_stridescopy_register_loweringc                    s    |  | S r%   r   )fn)r8   r:   r<   r   r   register_comm_lowering   s   
z7register_comm_lowerings.<locals>.register_comm_loweringr'   r(   r   r   c                    sf   t | ||rt| ||S | } tjr |   tjj| 	  t
j| } t
j jj| || | S r%   )r.   r7   r   Z reorder_for_compute_comm_overlapr   r   r"   Zno_fuse_buffer_namesr    r#   r   ExternKernelrequire_contiguous_CollectiveKernelcreate_inplaceall_reduce_r5   r6   c10dr9   r   r   _all_reduce   s   z,register_comm_lowerings.<locals>._all_reducec                    sP   t | ||r| t| ||}t| | S tj| } tj jj	| || | S r%   )
r.   r7   r$   r   r?   r@   rA   rB   rC   r5   )r'   r(   r   ret)rE   r;   r   r   _all_reduce_   s   
z-register_comm_lowerings.<locals>._all_reduce_c                    s,   fdd| D } t j jj| || | S )Nc                    s   g | ]} |qS r   r   ).0r'   )r9   r   r   
<listcomp>   s    zJregister_comm_lowerings.<locals>._all_reduce_coalesced.<locals>.<listcomp>r   rA   rB   all_reduce_coalesced_r5   inputsr(   r   rD   r   r   _all_reduce_coalesced   s   z6register_comm_lowerings.<locals>._all_reduce_coalescedc                       t j jj| || | S r%   rK   rM   rE   r   r   _all_reduce_coalesced_   s   z7register_comm_lowerings.<locals>._all_reduce_coalesced_c                    s   t jt j jj| ||S r%   )r   r1   r2   rA   create_out_of_placeall_gather_into_tensorr5   )r'   
group_sizer   rQ   r   r   _all_gather_into_tensor   s   z8register_comm_lowerings.<locals>._all_gather_into_tensorc              	      s"   t tjjtj jj| ||S r%   )	r/   r0   r   r1   r2   rA   rS    all_gather_into_tensor_coalescedr5   )rN   rU   r   rQ   r   r   !_all_gather_into_tensor_coalesced  s   zBregister_comm_lowerings.<locals>._all_gather_into_tensor_coalescedc                   s   t jj jj| |||d |S )N)out)r   rA   rB   all_gather_into_tensor_outr5   )r'   rU   r   rY   rQ   r   r   _all_gather_into_tensor_out  s   z<register_comm_lowerings.<locals>._all_gather_into_tensor_outc              	          t jt j jj| |||S r%   )r   r1   r2   rA   rS   reduce_scatter_tensorr5   )r'   r(   rU   r   rQ   r   r   _reduce_scatter_tensor"     z7register_comm_lowerings.<locals>._reduce_scatter_tensorc              
      s$   t tjjtj jj| |||S r%   )	r/   r0   r   r1   r2   rA   rS   reduce_scatter_tensor_coalescedr5   )rN   r(   rU   r   rQ   r   r    _reduce_scatter_tensor_coalesced.  s   zAregister_comm_lowerings.<locals>._reduce_scatter_tensor_coalescedc              	      r\   r%   )r   r1   r2   rA   rS   all_to_all_singler5   )r'   Zoutput_split_sizesZinput_split_sizesr   rQ   r   r   _all_to_all_single;  r_   z3register_comm_lowerings.<locals>._all_to_all_singlec                    s"   | } t j jj| || | S r%   r   rA   rB   
broadcast_r5   r'   srcr   rD   r   r   
_broadcastG  s
   z+register_comm_lowerings.<locals>._broadcastc                    rP   r%   rd   rf   rQ   r   r   _broadcast_O  s   z,register_comm_lowerings.<locals>._broadcast_c              	   S   s$   t jt jtjjjj	| |||S r%   )
r   r1   r2   rA   rS   r3   r4   _dtensorshard_dim_alltoallr5   )r'   Z
gather_dimZ	shard_dimr   r   r   r   _shard_dim_alltoallV  s   
z4register_comm_lowerings.<locals>._shard_dim_alltoallc                    s"   t | r| S tj jj|  | S r%   )r&   r   Z_WaitKernelZcreate_waitwait_tensorr5   )r'   rQ   r   r   _wait_tensorb  s   z-register_comm_lowerings.<locals>._wait_tensor)r3   r4   Z_c10d_functionalZ
all_reduceAttributeErrorloginfoZloweringr8   r9   r:   r;   r<   r   r1   strrC   Zall_reduce_coalescedrL   rT   rW   rZ   r]   r`   rb   	broadcastre   rj   rk   rm   )r>   rF   rH   rO   rR   rV   rX   r[   r^   ra   rc   rh   ri   rl   rn   r   )r8   rE   r9   r:   r;   r<   r   register_comm_lowerings   s^   &
	


rt   )$loggingtypingr   r3   Ztorch.utils._pytreeutilsZ_pytreer/   Ztorch._inductor.utilsr   Ztorch.utils._ordered_setr    r   r   Zvirtualizedr   	getLogger__name__rp   r1   r,   boolr   rr   r   ZIRNoder   tupleintr   r$   r&   r.   r7   rt   r   r   r   r   <module>   sL   
*

$
