o
    wZh                     @   s(  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZm Z m!Z!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'Z'd dl(Z'd dl)Z'd dl*m+Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z? d dl@mAZAmBZBmCZC d dlDZDejEejFd eGeHZIG dd deZJi deJdddeJdddeJdddeJdddeJdd d!eJd"d#d$eJd%d&d'eJd(d)d*eJd+d,d-eJd.d/d0eJd1d2d3eJd4d5d6eJd7d8d9eJd:d;d<eJd=d>d?eJd@dAdBeJdCdDdEeJdFdGiZKeG dHdI dIZLdJdK ZMdLdM ZNdNdO ZOdPdQ ZPdRdS ZQdTdU ZRdVdW ZSdXdY ZTdZd[ ZUd\d] ZVd^d_ ZWd`da ZXdbdc ZYddde ZZdfdg Z[dhdi Z\djdk Z]dldm Z^dndo Z_dpe'j`dqeadreadsebfdtduZce7dvdwdxedydzdxd{dxfd|d}Zde;rd~ZeneaefddZeddiZge:rdegd< dddZhdseafddZiedd ZjddeadeadeafddZkdeadelfddZmdanee	jo epd< ddeel dsdfddZqdddZrdZsG dd de<ZtG dd detZudevelewe! f de"de!fddZxdaydsebfddZzdd Z{deeesfddZ|G dd de<Z}G dd de2j~ZG dd de2j~ZedddZG dd de'jjj<ZG dd detZG dd de<ZdS )    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)
NamedTupleOptionalUnionAnyCallable)patch)	trace_log)
DeviceType)_SymmetricMemory)FILE_SCHEMAfind_free_portIS_SANDCASTLEretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_ifTEST_WITH_ROCMTEST_WITH_TSANTestCase	run_testsTEST_HPUTEST_XPU)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroup)levelc                   @   s   e Zd ZU eed< eed< dS )TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str r,   r,   Y/var/www/auris/lib/python3.10/site-packages/torch/testing/_internal/common_distributed.pyr#   <   s   
 r#   Zbackend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesncclL   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmZno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importZno_acceleratorY   zaccelerator is not available.c                   @   s   e Zd Zi Zh ded< e ed< ddhed< ddhed< i Zh ded	< h ded
< h ded< h ded< e ed< erCdhed< erLdhed< dS dS )DistTestCases>   uccr=   mpiZallgather_coalescedr   r=   rH   zsendrecv anysourcezcpu barrier>   rH   r=   glooZgpucudaZddpZsubgrouppluginhcclhpuxcclxpuN)r&   r'   r(   Zskip_collectivesetZbackend_featurer   r   r,   r,   r,   r-   rG   [   s"    


rG   c                       t   fdd}|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     s   t j sttd j ttj	d }t j
 |k r&ttd|  j tr9t jj
|k r9ttd|  j trLt jj
|k rLttd|  j  | i |S )Nr3   
WORLD_SIZE
multi-gpu-z
multi-xpu-)torchrK   is_availablesysexit
TEST_SKIPSr$   r)   osenvirondevice_countr   rN   r   rP   )argskwargs
world_sizefuncr,   r-   wrapperu   s   
zskip_if_no_gpu.<locals>.wrapperr   ra   rb   r,   r`   r-   skip_if_no_gpuq   s   re   c                    rR   )Nc                     s>   t jd dkrtt jd dk rttd j  | i |S )NBACKENDrI   rS      r/   rZ   r[   r)   rW   rX   rY   r$   r]   r^   r`   r,   r-   rb      s    z(skip_if_small_worldsize.<locals>.wrapperrc   rd   r,   r`   r-   skip_if_small_worldsize      rj   c                    rR   )Nc                     sB   t jd dkrtt jd d dkrttd j  | i |S )Nrf   rI   rS         r1   rh   ri   r`   r,   r-   rb      s   $z&skip_if_odd_worldsize.<locals>.wrapperrc   rd   r,   r`   r-   skip_if_odd_worldsize   rk   rn   c                        fdd}|S )Nc                       t   fdd}|S )Nc                     s>    dkrt j k rttd  j d S | i |S Nr=   rT   )rU   rK   r\   rW   rX   rY   r$   ri   )backendra   nr,   r-   rb      s   zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapperrc   rd   rr   rs   r`   r-   	decorator   s   z2require_n_gpus_for_nccl_backend.<locals>.decoratorr,   )rs   rr   ru   r,   rt   r-   require_n_gpus_for_nccl_backend   s   
rv   c                  C   s   dd } | S )Nc                    rR   )Nc                     sF   zddl m}m}  | i |W S  ty"   ttd j Y d S w )Nr   )AutoModelForMaskedLM
BertConfigrD   )Ztransformersrw   rx   ImportErrorrW   rX   rY   r$   )r]   r^   rw   rx   r`   r,   r-   rb      s   z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapperrc   rd   r,   r`   r-   ru      s   z.import_transformers_or_skip.<locals>.decoratorr,   )ru   r,   r,   r-   import_transformers_or_skip   s   rz   c                 C   s   t j ot j | kS N)rU   rK   rV   r\   xr,   r,   r-   at_least_x_gpu   s   r~   c                        fdd}|S )Nc                       t   fdd}|S )Nc                     s   t j rt j kr | i |S tr#t j kr# | i |S tr3t j kr3 | i |S t	t
d  j d S )NrT   )rU   rK   rV   r\   r   rN   r   rP   rW   rX   rY   r$   ri   )ra   r}   r,   r-   rb      s   z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrc   rd   r|   r`   r-   ru      s   	z#skip_if_lt_x_gpu.<locals>.decoratorr,   )r}   ru   r,   r|   r-   skip_if_lt_x_gpu   s   r   c                    ro   )Nc                    rp   )Nc                     sV    dkr| i |S t j rt j kr| i |S ttd  j d S rq   )rU   rK   rV   r\   rW   rX   rY   r$   ri   )rr   ra   r}   r,   r-   rb      s
   z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrc   rd   rr   r}   r`   r-   ru      s   z(nccl_skip_if_lt_x_gpu.<locals>.decoratorr,   )rr   r}   ru   r,   r   r-   nccl_skip_if_lt_x_gpu   s   r   c                 C   st   |   }d|v s
J d|v sJ d|v sJ |d }|ddkr#|n|dd }||v s8J d| d| d S )	N	iterationZ	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )Z_get_ddp_logging_datafindsplit)Z	model_DDPZ
err_substrZddp_logging_dataZlogging_erractualr,   r,   r-   verify_ddp_error_logged   s   
r   c                    rR   )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c               	      s   zt jd }t jd= W n ty   d }Y nw zzt jd }W n ty*   d }Y nw W dt jd< ndt jd< w z | i |}|W |d urK|t jd< |d urU|t jd< S S |d ur_|t jd< |d urh|t jd< w )NZTORCH_NCCL_ASYNC_ERROR_HANDLINGZTORCH_NCCL_BLOCKING_WAIT1)rZ   r[   KeyError)r]   r^   Z cached_nccl_async_error_handlingZcached_nccl_blocking_waitretr`   r,   r-   rb     sF   z(with_nccl_blocking_wait.<locals>.wrapperrc   rd   r,   r`   r-   with_nccl_blocking_wait   s   "r   c                    r   )zK
    Runs a test for each distributed debug level specified in levels.
    c                    r   )Nc                     sV   t jdd }D ]}|t jd< t   | i |}t  |d ur(|t jd< q	|S )NZTORCH_DISTRIBUTED_DEBUG)rZ   r[   getc10dZset_debug_level_from_envbarrier)r]   r^   Z	old_levelr"   r   )ra   levelsr,   r-   rb   .  s   

z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapperrc   rd   r   r`   r-   ru   -  s   z)with_dist_debug_levels.<locals>.decoratorr,   )r   ru   r,   r   r-   with_dist_debug_levels(  s   r   c                   C      t t  dS )Nz+c10d was not compiled with the Gloo backend)r   r   Zis_gloo_availabler,   r,   r,   r-   requires_glooB     r   c                 C   s@   t  stdS ttjj | k d|  dtjj  d| S )N+c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )r   is_nccl_availabler   r   rU   rK   r=   version)r   msgr,   r,   r-   requires_nccl_versionI  s   r   c                   C   r   )Nr   )r   r   r   r,   r,   r,   r-   requires_ncclU  r   r   c                   C   r   )Nz*c10d was not compiled with the UCC backend)r   r   Zis_ucc_availabler,   r,   r,   r-   requires_ucc[  r   r   c                   C   r   )Nz*c10d was not compiled with the MPI backend)r   r   Zis_mpi_availabler,   r,   r,   r-   requires_mpia  r   r   c                  C   s$   t j ottjd} t|  dS )Nr   z"multicast support is not available)rU   rK   rV   r   has_multicast_supportr   CUDAr   )r   r,   r,   r-   requires_multicast_supporth  s   
r   c                    s   d _ t  fdd}|S )zSkips a test for ROCmTc                     s&   t s	 | i |S ttd j d S )Nr?   )r   rW   rX   rY   r$   ri   r`   r,   r-   rb   w  s   z*skip_if_rocm_multiprocess.<locals>.wrapper)skip_if_rocm_multiprocessr   rd   r,   r`   r-   r   s  s   r   c                   C   s   t tjdkdS )Nwin32z8This unit test case is not supported on Windows platform)r   rW   platformr,   r,   r,   r-   skip_if_win32  r   r   devicemajorminorreturnc                 C   s6   | j dkr	tdtjjdurdS tj| ||fkS )z
    Returns True if the device's compute capability is (major, minor) or higher.
    Error out if the device is not a CUDA device.
    Returns False if device is a RoCM device.
    rK   z3sm_is_or_later() is only supported for CUDA devicesNF)type
ValueErrorrU   r   ZhiprK   Zget_device_capability)r   r   r   r,   r,   r-   sm_is_or_higher_than  s
   
r   	localhostrm   T   )minutesFc           	      C   sH   t  }|rt|tdd }tjj| ||||S tj| |||||dS )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    rm   )Zmilliseconds)wait_for_workers	use_libuv)r   r)   r   rU   classesZ	dist_c10dZTCPStorer   )	addrr_   	is_mastertimeoutr   Z	jit_classr   portZtimeout_millisecondr,   r,   r-   create_tcp_store  s   
r   i  Z!DISTRIBUTED_TESTS_DEFAULT_TIMEOUTZ300Ztest_ddp_uneven_inputsi     Ztest_join_kwargsc                 C   s.   t jdks	| d u rtjjddS tjj| dS )Nr   z	127.0.0.1)hostnameZ	interface)rW   r   r   ZProcessGroupGloocreate_devicer   r,   r,   r-   r     s   r   c                 C   s   t | dd tS N.r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)Ztest_idr,   r,   r-   get_timeout  s   r   c               	   c   s`    t  t  } }tjtj}}z| |t_t_tjtjfV  W ||t_t_d S ||t_t_w r{   )r	   rW   stdoutstderr)Znew_outZnew_errZold_outZold_errr,   r,   r-   captured_output  s   "r   rankr_   
num_inputsc              
      s~   ddt dt dt dt fdd}dt fd	d
  fddt|ddt|ddt|ddt|ddt|ddt|ddfD S )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    rm   r   r   r_   sparse_dims
dense_dimsc              	   S   s   t t | d d| d f}|gdd t|D  }t|d D ]}t |t d| d f}|| q!t | d gdd t|D  }t |||S )Nrm   c                 S      g | ]}d qS rl   r,   .0_r,   r,   r-   
<listcomp>      z@simple_sparse_reduce_tests.<locals>.generate.<locals>.<listcomp>c                 S   r   r   r,   r   r,   r,   r-   r     r   )	rU   ZreshapeZarangerangecatZzerosappendZonesZsparse_coo_tensor)r   r_   r   r   indicesshaper   valuesr,   r,   r-   generate  s   "z,simple_sparse_reduce_tests.<locals>.generatec                    s    t tj fddtD S )Nc                    s   g | ]} |qS r,   r,   )r   r   fnr_   r,   r-   r         zCsimple_sparse_reduce_tests.<locals>.compute_sum.<locals>.<listcomp>)r   operatoraddr   r   r,   r   r-   compute_sum  s   z/simple_sparse_reduce_tests.<locals>.compute_sumc                    sD   g | ]  fd dt D  fddt D fqS )c                    s"   g | ]}  |  qS r,   r,   r   i)r   r   r   r_   r,   r-   r     s    z9simple_sparse_reduce_tests.<locals>.<listcomp>.<listcomp>c                    s   g | ]	}  qS r,   r,   r   )r   r   r   r_   r,   r-   r     s    )r   )r   r   r   r   r_   r   r-   r     s    z.simple_sparse_reduce_tests.<locals>.<listcomp>)r   rl      )r   N)rm   r   )r)   r   )r   r_   r   r   r,   r   r-   simple_sparse_reduce_tests  s   
	




r   rr   c                    s^   t j }trt j }trt j }t|d | |kr!||    fddt| D }|S )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    rm   c                    s*   i | ]}|t |  |d     qS rm   )listr   ZnGPUs_per_processZvisible_devicesr,   r-   
<dictcomp>  s    z(init_multigpu_helper.<locals>.<dictcomp>)rU   rK   r\   r   rN   r   rP   r   )r_   rr   ZnGPUsZrank_to_GPUr,   r   r-   init_multigpu_helper  s   


r   tmp_dirinit_methodc                 C   s   t  atjtjd< ttjtjd ttjtjd tjtjd}t| | d ur8| tjd< d S t	tj|d tjd< d S )NZTEMP_DIRr   Ztest_dirZinit_dirZINIT_METHODZshared_init_file)
tempfileTemporaryDirectoryr   namerZ   r[   mkdirpathjoinr   )r   Zinit_dir_pathr,   r,   r-   initialize_temp_directories  s   
r   c                   C   s   t d ur
t   d S d S r{   )r   cleanupr,   r,   r,   r-   cleanup_temp_dir,  s   r      c                	       s8  e Zd ZdZdZdefddZedefddZede	fdd	Z
d
d Zd1dededdf fddZd2 fddZd2 fddZdefddZd2ddZd2ddZG dd deZede	fdd Zede	d!ed"eddfd#d$Zd!eddfd%d&Zd2d'd(Zd2d)d*Zd2d+d,Zd2d-d.Zedefd/d0Z  ZS )3MultiProcessTestCaser   
   r   c                 C      dS )NFr,   selfr,   r,   r-   _should_stop_test_suiteI     z,MultiProcessTestCase._should_stop_test_suitec                 C   r   )NTr,   r   r,   r,   r-   destroy_pg_upon_exitQ     z)MultiProcessTestCase.destroy_pg_upon_exitc                 C      t S r{   DEFAULT_WORLD_SIZEr   r,   r,   r-   r_   U  r   zMultiProcessTestCase.world_sizec                        t   fdd}t|| S )Nc                    s$   | j | jkr|   d S    d S r{   )r   MAIN_PROCESS_RANK_join_processesr   r   r,   r-   rb   Z  s   
z1MultiProcessTestCase.join_or_run.<locals>.wrapperr   types
MethodTyper   r   rb   r,   r   r-   join_or_runY     z MultiProcessTestCase.join_or_runrunTestmethod_name
methodNameNc              
         |dkr|}t  | zt| |}t| || | W d S  ty@ } z|dkr5td| j d| |W Y d }~d S d }~ww Nr	  zno such test method in z: super__init__getattrsetattrr  AttributeErrorr   	__class__r   r
  r  r   er  r,   r-   r  g     
zMultiProcessTestCase.__init__c                    s8   t    g | _g | _| j| _tjddj| _	i | _
d S )NF)delete)r  setUpskip_return_code_checks	processesr  r   r   NamedTemporaryFiler   	file_namepid_to_piper   r  r,   r-   r  v  s   

zMultiProcessTestCase.setUpc                    s(   t    | jD ]}|  qg | _d S r{   )r  tearDownr  	terminate)r   pr  r,   r-   r     s   



zMultiProcessTestCase.tearDownc                 C      |   dd S r   idr   r   r,   r,   r-   _current_test_name  s   z'MultiProcessTestCase._current_test_namec              
   C   s   g | _ tt| jD ]<}tj \}}|| jjdt	| || 
 | j|fdt| ddid}|  td||j || j|j< | j | q
d S )Nzprocess fake_pgF)targetr   r]   r^   zStarted process %s with pid %s)r  r   r)   r_   rU   multiprocessingPiper  _runr+   r&  r  r  startloggerinfopidr  r   )r   procr   Zparent_connZ
child_connprocessr,   r,   r-   _start_processes  s   
z%MultiProcessTestCase._start_processesc                 C   s   t jdj}| | d S )NZspawn)rU   r)  Zget_contextProcessr2  )r   r0  r,   r,   r-   _spawn_processes  s   z%MultiProcessTestCase._spawn_processesc                   @   s   e Zd ZdZdS )zMultiProcessTestCase.Eventrm   N)r&   r'   r(   GET_TRACEBACKr,   r,   r,   r-   Event  s    r6  r   c                 C   s   t d| 	 tj| |g}| |v r`| jrt d| d S |  }t d|| |tjj	kr`t
jdd#}t| |  |d | |  t d| W d    n1 s[w   Y  ||v rfd S q)	Nz*Starting event listener thread for rank %sTz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r-  r.  r)  
connectionwaitclosedrecvr   r6  r5  r   r  faulthandlerZdump_tracebackflushseeksendread)parent_pipeZsignal_piper   Zready_pipeseventZtmp_filer,   r,   r-   _event_listener  s,   

	z$MultiProcessTestCase._event_listener	test_namer  c                 K   s$   | |}||_ ||_||| d S r{   )r   r  run_testclsr   rD  r  rA  r^   r   r,   r,   r-   r+    s   zMultiProcessTestCase._runc              
   C   s  t jjdd\}}tjtj||| jfdd}|  t	j
dkr*t	j
dkr*t jd dtjd< zizt| |  W nG tjy^ } ztd	| j|t| t	td
 j W Y d }~n&d }~w ty   tdt | jtj |t  t	tj Y nw W |d ur|d  |d usJ |  |  n|d ur|d  |d usJ |  |  w | j rzt!"  W d S  t#t$fy   Y d S w d S )NF)ZduplexT)r(  r]   daemonr   darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srB   z;Caught exception: 
%s exiting process %s with exit code: %s)%rU   r)  r*  	threadingThreadr   rC  r   r,  rW   r   _CZ'_set_print_stack_traces_on_fatal_signalrZ   r[   r  unittestSkipTestr-  r.  r+   rX   rY   r$   	Exceptionr   	traceback
format_excTEST_ERROR_EXIT_CODEr?  r   closer   r   destroy_process_groupAssertionErrorr   )r   rD  rA  Zsignal_recv_pipeZsignal_send_pipeZevent_listener_threadser,   r,   r-   rE    sX   






zMultiProcessTestCase.run_testc                 C   s  g }t | jD ]9\}}|jd u r@| j|j }z|tjj |	||f W q t
y? } ztd|| W Y d }~qd }~ww q|D ]A\}}z$|drd|jrXtd| W qC| }td|| ntd| W qC t
y } ztd|| W Y d }~qCd }~ww d S )NzBEncountered error while trying to get traceback for process %s: %sr   z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater  exitcoder  r/  r?  r   r6  r5  r   ConnectionErrorr-  r   pollr:  r.  r;  )r   Zpipesr   r1  piper  r   rQ  r,   r,   r-   _get_timedout_process_traceback  sJ   

z4MultiProcessTestCase._get_timedout_process_tracebackc              	   C   sF  t |  }t }d}z	 t| jD ](\}}|jtjkr;td| d|j d t	j
 }|D ]}|  q0d} nq|r?n2tdd | jD rJn't | }	|	|krk|   td| d	 | jD ]}|  qcntd
 qt | }
|| jv r| |
 n| |
 W | j D ]}|  qd S | j D ]}|  qw )NFTProcess z terminated with exit code z", terminating remaining processes.c                 s   s    | ]}|j d uV  qd S r{   )rY  )r   r"  r,   r,   r-   	<genexpr>9  s    z7MultiProcessTestCase._join_processes.<locals>.<genexpr>zTiming out after z" seconds and killing subprocesses.g?)r   r%  timerX  r  rY  r   rS  printrU   r)  active_childrenr!  allr]  sleepr  _check_no_test_errors_check_return_codesr  r   rT  )r   r   r   
start_timeZsubprocess_errorr   r"  rb  acelapsedelapsed_timer\  r,   r,   r-   r  #  sR   

	



 



z$MultiProcessTestCase._join_processesc                 C   sH   t | jD ]\}}|jdu rtd| d| d| | j|j qdS )zV
        Checks that we didn't have any errors thrown in the child processes.
        Nr^  z timed out after  seconds)rX  r  rY  RuntimeErrorassertNotEqualrS  )r   rj  r   r"  r,   r,   r-   re  S  s   
z*MultiProcessTestCase._check_no_test_errorsc           
   
   C   sF  | j s
td dS | j d }dd t| j D }|r?d}|D ]\}}| j|j  }|d| dtj d	| d
7 }qt	|t| j D ])\}}|j
du rXt	d| d| d| j|j
|j
d| d|j
 d|j
 d qDt D ]}	|j
|	jkrtrtd|  |	j  dS t|	jqr| j|j
dd|j
 d|j d dS )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr   c                 S   s$   g | ]\}}|j tjkr||fqS r,   )rY  r   rS  )r   r   r"  r,   r,   r-   r   o  s
    z<MultiProcessTestCase._check_return_codes.<locals>.<listcomp> r^  z exited with error code z and exception:

 terminated or timed out after rk  zExpect process z+ exit code to match Process 0 exit code of z
, but got )r   6Skipping %s on sandcastle for the following reason: %sz Expected zero exit code but got z
 for pid: )r  r-  warningrX  r  r/  r;  r   rS  rl  rY  assertEqualrY   r   r$   r   r.  r%  r%   rN  rO  )
r   rj  Zfirst_processZerrored_processesr   r   r1  error_messager"  skipr,   r,   r-   rf  ^  sR   



z(MultiProcessTestCase._check_return_codesc                 C   s
   | j dkS )Nr   r   r   r,   r,   r-   r        
zMultiProcessTestCase.is_masterr	  r	  r   N) r&   r'   r(   r  rS  boolr   propertyr   r)   r_   r  r+   r  r  r   r&  r2  r4  r   r6  staticmethodrC  classmethodr+  rE  r]  r  re  rf  r   __classcell__r,   r,   r  r-   r   @  s6    	



3
%
0
@r   c                       sB   e Zd Z fddZdd ZdefddZdd	 Zd
d Z  Z	S )DistributedTestBasec                       t    |   d S r{   r  r  r4  r   r  r,   r-   r       
zDistributedTestBase.setUpc                 C   s(   z	t | j W d S  ty   Y d S w r{   )rZ   remover  OSErrorr   r,   r,   r-   r     s
   zDistributedTestBase.tearDownr   c                 C   s(   d|v rdS d|v rdS d|v rdS dS )NrK   r=   rN   rM   rP   rO   rJ   r,   r   r   r,   r,   r-   rr     s   zDistributedTestBase.backendc                 C   sr   t | }t j| j|}t jj| || j| j	|d d| |v s,d| |v r3t j
| j	 t jj S )Nrr   r_   r   storer=   rO   )rU   get_device_moduler\   distributed	FileStorer  init_process_grouprr   r_   r   acceleratorset_device_indexdistributed_c10d_get_default_group)r   r   num_visible_devicesr  r,   r,   r-   	create_pg  s   zDistributedTestBase.create_pgc                    s&   t |   fddt| jD S )Nc                    s   i | ]}||  gqS r,   r,   r   r  r,   r-   r     s    z6DistributedTestBase.rank_to_device.<locals>.<dictcomp>)rU   r  r\   r   r_   r  r,   r  r-   rank_to_device  s   z"DistributedTestBase.rank_to_device)
r&   r'   r(   r  r   r+   rr   r  r  r~  r,   r,   r  r-   r    s    
r  subtest_configtest_fntest_kwargsc           
   	   O   s   t | }dd |D }dd |D }tj| D ]8}tt||}	| jdi |	 tj	  ||i ||	 tj	  W d   n1 sHw   Y  t
  qdS )a\  
    Runs a test function given by ``test_fn`` as a subtest according to the
    configurations specified by ``subtest_config``. This amortizes the
    costly setup overhead (including process spawn and initializing the
    process group) over the subtests.

    Args:
        subtest_config (Dict[str, List[Any]]): A mapping from subtest
            keyword argument name to a list of its possible values.
        test_fn (Callable): A callable that runs the actual test.
        test_args: Positional arguments to pass to ``test_fn``.
        test_kwargs: Keyword arguments to pass to ``test_fn``.
    c                 S      g | ]}|d  qS )r   r,   r   itemr,   r,   r-   r         z run_subtests.<locals>.<listcomp>c                 S   r  r   r,   r  r,   r,   r-   r     r  Nr,   )r   items	itertoolsproductdictzipZsubTestrU   _dynamoresetr   r   )
Zcls_instr  r  Z	test_argsr  Zsubtest_config_itemsZsubtest_config_keysZsubtest_config_valuesr   Zsubtest_kwargsr,   r,   r-   run_subtests  s   

r  c                   C   sD   t durt S ztjg dddjdka W t S  ty!   da Y t S w )a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    N)Zfi_infoz-pZefaz-tZ	FI_EP_RDMF)checkr   )EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr,   r,   r,   r-   has_efa  s   r  c                   C   s   t  rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    ZshmZuvN)r  r,   r,   r,   r-   tp_transports  s   r  c                    s:   du rt t|dS dd  t fdd}|S )z+
    Wrapper to use with a test method
    N)r   r_   c                    sf   t  t }fdd fdd}g }tD ]}tj|||fd}|  || q|S )Nc                          t jjkS r{   r   r  Z_worldr,   worldr,   r-   world_is_valid!     zaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_validc              
      s   t jd| |d z3z   W n! ty0 } ztj| t f t	| W Y d }~nd }~ww W  r;t 
  d S d S  rEt 
  w w )Nthreadedrr   r   r_   r  )r   r  BaseExceptionMultiThreadedTestCaseexception_queueputrW   exc_infor!   exception_handlerU  )r   Zworld_pgr  ex)callbackr  r_   r,   r-   worker$  s    


zYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.workerr(  r]   )r   r   	HashStorer   rK  rL  r,  r   )r_   r  global_storer  threadsr   tr,   )r  r  r  r_   r-   #_run_test_method_with_multi_threads  s   zIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threadsc              	      sX   t jjd z fdd}t| W t jjd d S t jjd w )NTc                      s   g R i S r{   r,   r,   )r]   ra   r^   r   r,   r-   <lambda>@  r   z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>F)rU   rM  _distributed_c10d_set_thread_isolation_moder  _join_threads)r   r]   r^   r  r  ra   r_   )r]   r^   r   r-   rb   ;  s
   "z-spawn_threads_and_init_comms.<locals>.wrapper)r   spawn_threads_and_init_commsr   )ra   r   r_   rb   r,   r  r-   r    s   
r  c                       s   e Zd ZdZe ZdZdd Zd(de	de	dd	f fd
dZ
dd Zdd Zd) fddZ fddZdd Zedd Zdd Zedd Zedd ZedefddZede	fd d!Zd*d"d#d$d%Zd*d"d#d&d'Z  ZS )+r  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    r   c                    r   )Nc                    s(   | j | jkr| | j  d S    d S r{   )r   MAIN_THREAD_RANKr  r  r   r   r,   r-   rb   \  s   
z2MultiThreadedTestCase.join_or_run.<locals>.wrapperr  r  r,   r   r-   r  [  r  z!MultiThreadedTestCase.join_or_runr	  r
  r  r   Nc              
      r  r  r  r  r  r,   r-   r  e  r  zMultiThreadedTestCase.__init__c                 C      d S r{   r,   r   r,   r,   r-   perThreadSetUpt  r   z$MultiThreadedTestCase.perThreadSetUpc                 C   r  r{   r,   r   r,   r,   r-   perThreadTearDownx  r   z'MultiThreadedTestCase.perThreadTearDownc                    s&   t    | j| _g | _dtjd< dS )z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r   rJ  N)r  r  r  r   r  rZ   r[   r   r  r,   r-   r  {  s   
zMultiThreadedTestCase.setUpc                    s   t    g | _dS )z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)r  r   r  r   r  r,   r-   r     s   

zMultiThreadedTestCase.tearDownc                    s   t jjd | j}t  t | j_	 fdd}| s t
dt| jD ]}tj| jj||| jfd}|  | j| q%dS )zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        Tc                      r  r{   r  r,   r  r,   r-   r    r  z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_validzInvalid worldr  N)rU   rM  r  r  r&  r   r   r  r  r  rl  r   r_   rK  rL  r+  r,  r  r   )r   rD  r  r   r  r,   r  r-   _spawn_threads  s   z$MultiThreadedTestCase._spawn_threadsc                 K   sH   | |}||_ t|drt |_tj|j_tj|j_	|
||| d S )N_tls)r   hasattrrK  localr  r   Z
_precision	precisionZ_rel_tolZrel_tolrun_test_with_threaded_pg)rG  rD  r   r_   r^   r   r,   r,   r-   r+    s   



zMultiThreadedTestCase._runc              
   C   s   t jd||| jjd |   z@zt| |  W n! ty9 } z| j|t	
 f t| W Y d}~nd}~ww W t   |   dS W t   |   dS t   |   w )zd
        Run the current test associated with `test_name` using the threaded process group.
        r  r  N)r   r  r  r  r  r  r  r  r  rW   r  r!   r  rU  r  )r   rD  r   r_   r  r,   r,   r-   r    s&   
z/MultiThreadedTestCase.run_test_with_threaded_pgc              	   C   s   t }zLt|D ]!\}}|td| | r(tj|ttd| dd ff qt	
  g }| j sC| j }|| | j r4W t  tjjd nt  tjjd w | ||| d S )Nr   zRank failed to join in under rk  F)r   rX  r   maxis_aliver  r  r  TimeoutErrorr!   r  emptyr   r   r    rU   rM  r  r  rf  )rG  r  r   r   idxthreadfailed_ranksZfailurer,   r,   r-   r    s8   




z#MultiThreadedTestCase._join_threadsc                 C   sH  d}d}|D ]l\}}|d }t |tjr(td||t| |dk r'td j}qt |tr?d| d| d	}	t	|	 t
|	t |tr_dtj| }	t	d
|	| |d| d|	 d7 }qt |trrt|jtkrr|dk rr|j}qt|dkr}t
||dkrt D ]}
||
jkrtrtd||
j  d S t|
jqd S d S )Nrn  r   rm   z3Thread %s skipping test %s for following reason: %sr   rB   zThread rp  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
ro  rq  )
isinstancerN  rO  r-  r.  r+   rY   r$   r  r   rl  rP  r   rQ  format_exception
SystemExitr   coder)   lenr   r   r%   )rG  r  r   r   	error_msgZ	skip_coder   r  excr   ru  r,   r,   r-   rf    sR   





z)MultiThreadedTestCase._check_return_codesc                 C   r   r{   r   r   r,   r,   r-   r_     r   z MultiThreadedTestCase.world_sizec                 C   r#  r   r$  r   r,   r,   r-   r&    s   z(MultiThreadedTestCase._current_test_namer   rv  c                C   s    | j |kr| ||| dS dS )z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r   rs  r   r}   yr   r   r,   r,   r-   assertEqualOnRank  s   
z'MultiThreadedTestCase.assertEqualOnRankc                C   s   | j |kr| || d S d S r{   )r   rm  r  r,   r,   r-   assertNotEqualOnRank'  s   
z*MultiThreadedTestCase.assertNotEqualOnRankrx  ry  r{   )r&   r'   r(   __doc__queueQueuer  r  r  r+   r  r  r  r  r   r  r}  r+  r  r  rf  r{  r)   r_   r&  r  r  r~  r,   r,   r  r-   r  I  s0    



0	r  c                       L   e Zd Zdeejejf deddf fddZ	dejdejfdd	Z
  ZS )
SaveForwardInputsModuleforward_inputscast_forward_inputsr   Nc                    s(   t    tdd| _|| _|| _d S )Nd   )r  r  nnZLinearlr  r  r   r  r  r  r,   r-   r  -  s   

z SaveForwardInputsModule.__init__r}   c                 C   s,   || j | < | | jr|| jjjS |S r{   )r  r  r  toweightZdtyper   r}   r,   r,   r-   forward7  s   
"zSaveForwardInputsModule.forwardr&   r'   r(   r  r  ModulerU   ZTensorrz  r  r  r~  r,   r,   r  r-   r  ,      
r  c                       r  )
SaveForwardInputsModelr  r  r   Nc                    s,   t    t||| _t||| _|| _d S r{   )r  r  r  c1c2r  r  r  r,   r-   r  =  s   

zSaveForwardInputsModel.__init__r}   c                 C   s   || j | < | | |S r{   )r  r  r  r  r,   r,   r-   r  G  s   
zSaveForwardInputsModel.forwardr  r,   r,   r  r-   r  <  r  r  c                 c   s    |s	t j|  dtjd< dtjd< |r1|r)t jjjj	 }t
jd|| |d nt
jd| |d t j  t jjj  zd V  W t j  t jjj  |rVt
  d S d S t j  t jjj  |rkt
  w w )	Nr   MASTER_ADDRZ6789MASTER_PORTZfaker  r=   r   r_   )rU   r  r  rZ   r[   testing	_internalr  r'  Z	FakeStorer   r  r  r  utilsZcountersclearrU  )r   r_   Zinit_pgr'  r  r,   r,   r-   _dynamo_dist_per_rank_initK  s:   





r   c                       s4   e Zd ZdZe fddZe fddZ  ZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                    sh   t    | jttjddd d| _d| j | _	d| j	v r$d n| jg| _
tjd| jdd	 d S )
Nr   Z12355)r  r  r   zcuda:rK   r=   rm   r  )r  
setUpClassZ_exit_stackenter_contextr   r  rZ   r[   r   r   Z
device_idsr   r  rG  r  r,   r-   r  q  s   
	z.DynamoDistributedSingleProcTestCase.setUpClassc                    s   t   t   d S r{   )r   rU  r  tearDownClassr  r  r,   r-   r    s   z1DynamoDistributedSingleProcTestCase.tearDownClass)r&   r'   r(   r  r}  r  r  r~  r,   r,   r  r-   r  i  s    r  c                	       s\   e Zd ZdZ fddZ fddZedefddZe	d	ed
e
de
ddfddZ  ZS )"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    c                    r  r{   r  r   r  r,   r-   r    r  z(DynamoDistributedMultiProcTestCase.setUpc                    s2   t    z	t| j W d S  ty   Y d S w r{   )r  r   rZ   r  r  r  r   r  r,   r-   r     s   
z+DynamoDistributedMultiProcTestCase.tearDownr   c                 C   s
   t j S r{   )rU   rK   r\   r   r,   r,   r-   r_     rw  z-DynamoDistributedMultiProcTestCase.world_sizer   rD  r  Nc                 K   s2   t t  | |}||_||_||| d S r{   )r   
addHandlerloggingNullHandlerr   r  rE  rF  r,   r,   r-   r+    s
   z'DynamoDistributedMultiProcTestCase._run)r&   r'   r(   r  r  r   r{  r)   r_   r}  r+   r+  r~  r,   r,   r  r-   r    s    	$r  c                	       s   e Zd ZU dZeed< dZeed< dZee	 ed< e
ddZe
ed	< eejd
e	fddZedddZe fddZe fddZe	ddededee	 fddZ  ZS )MultiProcContinousTestrl   r_   r   r   N	rdvz_filex   )secondsr   r   c                 C   s   t d)z
        ProcessGroup backend str.
        To be customized by sub test classes, e.g. "nccl".
        Here we raise error.
        z/Please implement backend_str in your test class)NotImplementedErrorr  r,   r,   r-   backend_str  s   z"MultiProcContinousTest.backend_strFc                 C   r   )z
        ProcessGroup init options.
        To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
        Here we return None.
        Nr,   )rG  Zhigh_priority_streamr,   r,   r-   opts  s   zMultiProcContinousTest.optsc                    s   t    d| j  kr| jk sn td| j d| j | jr*t| j| j}nd}|  }| 	 }t
d| tj|| j| j||| jd tj | _t
d| j d dS )	z
        Class-scope test fixture. Run once for entire test class, before any test starts.
        Set up the process group.
        r   zBRank must be set and in the range of 0 to world_size. World size: z Rank: NzTesting backend=)rr   r_   r   r  Z
pg_optionsr   Rank z setup complete)r  r  r   r_   rl  r  r   r  r  r  ra  r  r   r  r  Zpg)rG  r  r  rr   r  r,   r-   r    s2   
z!MultiProcContinousTest.setUpClassc                    sR   t   t   | jrzt| j W n	 ty   Y nw td| j	 d dS )z
        Class-scope test fixture. Run once for entire test class, after all tests finish.
        Tear down the process group.
        r  z teardown completeN)
r   rU  r  r  r  rZ   r  r  ra  r   r  r  r,   r-   r    s   
z$MultiProcContinousTest.tearDownClassc                 C   s   || _ || _|| _t  dS )ad  
        This is an entry point for each rank to run the tests in `MultiProcContinousTest`.
        In this entry point, we set the class variables for the test class.
        Then we run all tests.

        Note:
        - This helper only works for a subclass of `MultiProcContinousTest`.

        Example:
        - See `test_c10d_ops_nccl.py`.
        N)r   r_   r  r   )rG  r   r_   r  r,   r,   r-   run_rank  s   
zMultiProcContinousTest.run_rank)Fr{   )r&   r'   r(   r_   r)   r*   r   r  r   r+   r   r   r}  abcabstractmethodr  r  r  r  r  r~  r,   r,   r  r-   r
    s.   
 r
  r{   r   ry  )TF)r  r<  r  r  r)  rZ   r  r  rW   r   rK  r`  rQ  r  rN  
contextlibr   dataclassesr   datetimer   enumr   	functoolsr   r   r   ior	   typingr
   r   r   r   r   Zunittest.mockr   Ztorch._logging._internalr   rU   Ztorch._dynamo.test_caseZtorch.cuda.ncclZtorch.distributedr  r   Ztorch._C._autogradr   Ztorch._C._distributed_c10dr   Ztorch.nnr  Z$torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r   Z5torch.testing._internal.distributed.multi_threaded_pgr   r    r!   r   basicConfigINFO	getLoggerr&   r-  r#   rY   rG   re   rj   rn   rv   rz   r~   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r)   rz  r   r   r   getenvr   r   r   r   r   r+   r   r   r   r*   r   r   r   r   r  r  r   r  r  r  r  r  r  r  r  r  r   r  Z	test_caser  r  r
  r,   r,   r,   r-   <module>   s,  
8






	










.


,
  i'
#
8 d $