o
    Zh                      @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZmZ zd dlZW n	 ey=   Y nw eeZdd Zd
de
e fddZG dd	 d	ZdS )    N)OptionalTextIOUnionc                  C   s   t jddt jt jd} | D ]<}|\}}}}}zt  |||}|d |d |W   S  tyI } z|  td|  W Y d}~qd}~ww t	d)a  
    Find a free port and binds a temporary socket to it so that the port can be "reserved" until used.

    .. note:: the returned socket must be closed before using the port,
              otherwise a ``address already in use`` error will happen.
              The socket should be held and closed as close to the
              consumer of the port as possible since otherwise, there
              is a greater chance of race-condition where a different
              process may see the port as being free and take it.

    Returns: a socket binded to the reserved free port

    Usage::

    sock = find_free_port()
    port = sock.getsockname()[1]
    sock.close()
    use_port(port)
    	localhostN)hostportfamilytype)r   r   r   z Socket creation attempt failed: zFailed to create a socket)
socketgetaddrinfo	AF_UNSPECSOCK_STREAMbindlistenOSErrorcloseprintRuntimeError)addrsaddrr   r	   proto_se r   _/var/www/auris/lib/python3.10/site-packages/torch/distributed/elastic/rendezvous/etcd_server.pyfind_free_port   s    


r   data_dirc                 C   sP   | r|   d u rtd |   |   |r&td| tj|dd d S d S )Nzstopping etcd serverzdeleting etcd data dir: %sTignore_errors)pollloggerinfo	terminatewaitshutilrmtree)
subprocessr   r   r   r   	stop_etcdC   s   
r(   c                
   @   s   e Zd ZdZddee fddZdejfddZ	de
fd	d
ZdefddZdefddZ			dde
de
dee
edf ddfddZ	d dede
dee
edf ddfddZdd Zd!de
ddfddZd"ddZdS )#
EtcdServera  
    .. note:: tested on etcd server v3.4.3.

    Starts and stops a local standalone etcd server on a random free
    port. Useful for single node, multi-worker launches or testing,
    where a sidecar etcd server is more convenient than having to
    separately setup an etcd server.

    This class registers a termination handler to shutdown the etcd
    subprocess on exit. This termination handler is NOT a substitute for
    calling the ``stop()`` method.

    The following fallback mechanism is used to find the etcd binary:

    1. Uses env var TORCHELASTIC_ETCD_BINARY_PATH
    2. Uses ``<this file root>/bin/etcd`` if one exists
    3. Uses ``etcd`` from ``PATH``

    Usage
    ::

     server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
     server.start()
     client = server.get_client()
     # use client
     server.stop()

    Args:
        etcd_binary_path: path of etcd server binary (see above for fallback path)
    Nr   c                 C   sp   d| _ d| _tjt}tj|d}tjd|| _	tj
| j	s%d| _	|r)|ntjdd| _d | _d | _d S )Nr   zbin/etcdZTORCHELASTIC_ETCD_BINARY_PATHetcdZtorchelastic_etcd_data)prefix)_port_hostospathdirname__file__joinenvironget_etcd_binary_pathisfiletempfilemkdtemp_base_data_dirZ	_etcd_cmd
_etcd_proc)selfr   rootZdefault_etcd_binr   r   r   __init__n   s   
zEtcdServer.__init__returnc                 C   s   | j std| j S )Nz>No etcd server process started. Call etcd_server.start() first)r;   r   r<   r   r   r   _get_etcd_server_process   s
   z#EtcdServer._get_etcd_server_processc                 C      | j S )z)Return the port the server is running on.)r-   r@   r   r   r   get_port      zEtcdServer.get_portc                 C   rB   )z)Return the host the server is running on.)r.   r@   r   r   r   get_host   rD   zEtcdServer.get_hostc                 C   s   | j  d| j S )z,Return the etcd server endpoint (host:port).:)r.   r-   r@   r   r   r   get_endpoint   s   zEtcdServer.get_endpoint<      timeoutnum_retriesstderrc              
   C   s   d}	 zt j| jt|}t j|dd | |||W S  tyL } z$|d7 }t| j	 t
dt| ||krBtj| jdd  W Y d}~nd}~ww q)a  
        Start the server, and waits for it to be ready. When this function returns the sever is ready to take requests.

        Args:
            timeout: time (in seconds) to wait for the server to be ready
                before giving up.
            num_retries: number of retries to start the server. Each retry
                will wait for max ``timeout`` before considering it as failed.
            stderr: the standard error file handle. Valid values are
                `subprocess.PIPE`, `subprocess.DEVNULL`, an existing file
                descriptor (a positive integer), an existing file object, and
                `None`.

        Raises:
            TimeoutError: if the server is not ready within the specified timeout
        r   T)exist_ok   z4Failed to start etcd server, got error: %s, retryingr   N)r/   r0   r3   r:   strmakedirs_start	Exceptionr(   r;   r!   warningr%   r&   atexitregister)r<   rJ   rK   rL   Zcurr_retriesr   r   r   r   r   start   s&   
zEtcdServer.startc                 C   s   t  }t  }| d | _| d }td| jdd|dd| j d| j dd| j d| j d	d| j d| g
}t	d
| |
  |
  tj|d|d| _| | d S )NrN    z--enable-v2z
--data-dirz--listen-client-urlszhttp://rF   z--advertise-client-urlsz--listen-peer-urlszStarting etcd server: [%s]T)	close_fdsrL   )r   getsocknamer-   shlexsplitr3   r6   r.   r!   r"   r   r'   Popenr;   _wait_for_ready)r<   r   rJ   rL   sockZ	sock_peerZ	peer_portZetcd_cmdr   r   r   rQ      s0   zEtcdServer._startc                 C   s   t j| j| jdddS )zNReturn an etcd client object that can be used to make requests to this server./v2
   r   r   version_prefixread_timeout)r+   Clientr.   r-   r@   r   r   r   
get_client   s   zEtcdServer.get_clientc                 C   s   t j| j | jddd}t | }t |k rK|   d ur,|  j}td| z
t	
d|j W d S  tyD   td Y nw t |k std)Nr_      ra   z*Etcd server process exited with the code: zetcd server ready. version: %srN   z.Timed out waiting for etcd server to be ready!)r+   rd   r.   r-   timerA   r    
returncoder   r!   r"   versionrR   sleepTimeoutError)r<   rJ   clientZmax_timeexitcoder   r   r   r]      s$   
zEtcdServer._wait_for_readyc                 C   s   t d t| j| j dS )zGStop the server and cleans up auto generated resources (e.g. data dir).zEtcdServer stop method calledN)r!   r"   r(   r;   r:   r@   r   r   r   stop   s   
zEtcdServer.stopN)rH   rI   N)rH   N)rH   )r?   N)__name__
__module____qualname____doc__r   rO   r>   r'   r\   rA   intrC   rE   rG   r   r   rV   rQ   re   r]   rn   r   r   r   r   r)   N   s@    
(
 r)   ro   )rT   loggingr/   rZ   r%   r
   r'   r8   rg   typingr   r   r   r+   ModuleNotFoundError	getLoggerrp   r!   r   rO   r(   r)   r   r   r   r   <module>   s&   
%