o
    Zhm                     @   sB  d dl mZ d dlmZ d dlmZ d dlmZmZ d dl	Z	e	j
jZdZdZdZd	Zd
Zg dZedefddZdededefddZ	d%dedededededee fddZdedededeegef dedeee  fddZdededefddZ				 d&deded!edee d"eeegef  deddfd#d$ZdS )'    )Iterable)contextmanager)	timedelta)CallableOptionalNz/num_membersz/last_memberz/TRACEz/TRACING_GATE   )store_timeoutget_allsynchronizebarriertimeoutc                 c   s,    | j }| t|d dV  | | dS )z
    This sets the timeout and then restores the old timeout when the context
    manager exits.

    Args:
        store: the store to set the timeout on
        timeout: the timeout to set
    secondsN)r   Zset_timeoutr   )storer   Zold_timeout r   T/var/www/auris/lib/python3.10/site-packages/torch/distributed/elastic/utils/store.pyr      s
   r   rankprefix
world_sizec                    sH   |   fddt|D }t| |  dd}|dkr"| |g |S )ad  
    Given a store and a prefix, the method goes through the array of keys
    of the following format: ``{prefix}{idx}``, where idx is in a range
    from 0 to size, and tries to retrieve the data.

    The Rank0 process waits at the end to make sure all other processes
    finished the procedure before exiting.

    Usage

    ::

     values = get_all(store, "torchelastic/data", 3)
     value1 = values[0]  # retrieves the data for key torchelastic/data0
     value2 = values[1]  # retrieves the data for key torchelastic/data1
     value3 = values[2]  # retrieves the data for key torchelastic/data2

    c                    s   g | ]}  | qS r   r   ).0idxr   r   r   
<listcomp>B   s    zget_all.<locals>.<listcomp>z	/finishedr   r   
key_prefixr   )Z	multi_getrange_barrier_nonblockingwait)r   r   r   r   Zdata_arrZbarrier_keyr   r   r   r	   /   s   r	   ,  datar   returnc                 C   sT   t | | | | | | t| |||}|W  d   S 1 s#w   Y  dS )aT  
    Synchronizes ``world_size`` agents between each other using the underlying c10d store.
    The ``data`` will be available on each of the agents.

    Note: The data on the path is not deleted, as a result there can be stale data if
        you use the same key_prefix twice.

    Time complexity: O(N) per worker, O(N^2) globally.
    N)r   setr	   )r   r   r   r   r   r   Z
agent_datar   r   r   r
   R   s
   $r
   rank_decodertrace_timeoutc           	         sh      | t d  fdd} fdd}|dkr1| }   t d |S | S )N<val_ignored>c               	      s   t  } d}tdD ]D}|tkr | S z'|dkr)  | t gtd n  | t gtdd W q
 tyN   |d7 }| | Y q
w | S )Nr      r   )Zmilliseconds)r!   r   _MAX_TRACE_MISSING_RANKSr   _TRACEr   DistStoreErroradd)missing_rank_infoZranks_missingir   r"   r   r#   r   r   r   _find_missing_rankss   s$    z9_try_detecting_missing_ranks.<locals>._find_missing_ranksc                      s>   z   t g dd dgW S  ty   Y d S w )Nz[<check rank 0 (r   z) for missing rank info>])r   _TRACING_GATEr(   r   )r   r"   r   r   r   _checkin   s   z._try_detecting_missing_ranks.<locals>._checkinr   )r!   r'   r.   )	r   r   r   r   r"   r#   r-   r/   r*   r   r,   r   _try_detecting_missing_ranksi   s   r0   c                 C   s4   |t  }|t }| |d}||kr| |d |S )zq
    Does all the non-blocking operations for a barrier and returns the final key
    that can be waited on.
    r%   r$   )_NUM_MEMBERS_LAST_MEMBER_CHECKINr)   r!   )r   r   r   Znum_members_keylast_member_keyr   r   r   r   r      s   r   
   barrier_timeoutrank_tracing_decoderc           
      C   s   |du r|du sJ dt | |Q t| ||d}z| |g W n8 tyY } z,|du r/|t| ||||p9dd |}	|	durStd|||dd|	 d	|d|d}~ww W d   dS 1 sew   Y  dS )
as  
    A global lock between agents. This will pause all workers until at least
    ``world_size`` workers respond.

    This uses a fast incrementing index to assign waiting ranks and a success
    flag set by the last worker.

    Time complexity: O(1) per worker, O(N) globally.

    Optionally, passing rank will enable tracing of missing ranks on timeouts.
    `rank_tracing_decoder` lambda arg can be used to convert rank data
    into a more meaninful information at an app level (e.g. hostname).

    Note: Since the data is not removed from the store, the barrier can be used
        once per unique ``key_prefix``.
    Nz!Tracing requires rank informationr   c                 S   s   t | S )N)str)xr   r   r   <lambda>   s    zbarrier.<locals>.<lambda>ziTimed out waiting on barrier on rank {}, for key prefix: {} (world_size={}, missing_ranks={}, timeout={})[z, ])r   r   r   r(   r0   formatjoin)
r   r   r   r5   r   r6   r#   r3   eZmissing_ranksr   r   r   r      sJ   
	"r   )r   )r   NNr4   )collections.abcr   
contextlibr   datetimer   typingr   r   ZtorchZ_CZ_DistStoreErrorr(   r1   r2   r'   r.   r&   __all__floatr   intr7   r	   byteslistr
   r0   r   r   r   r   r   r   <module>   s|   	)


/