o
    ZhJ4                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	 d dl
Z
d dlmZ g dZG dd dZG dd	 d	eZG d
d deZG dd dZdS )    N)ABCabstractmethod)TracebackType)Any
NamedTupleOptional)JoinHookJoinableJoinc                   @   s,   e Zd ZdZd	ddZdeddfddZdS )
r   a  
    This defines a join hook, which provides two entry points in the join context manager.

    Entry points : a main hook, which is called repeatedly while there exists a non-joined
    process, and a post-hook, which is called once all processes have joined.

    To implement a join hook for the generic join context manager, define a
    class that inherits from :class:`JoinHook` and override ``main_hook()`` and
    ``post_hook()`` as appropriate.
    returnNc                 C      dS )zCall this hook while there exists a non-joined process to shadow collective communications in a training iteration.

        Training iteration i.e., in one forward pass, backward pass, and optimizer step.
        N selfr   r   P/var/www/auris/lib/python3.10/site-packages/torch/distributed/algorithms/join.py	main_hook       zJoinHook.main_hookis_last_joinerc                 C   r   )aK  
        Call hook after all processes have joined.

        It is passed an additional ``bool`` argument ``is_last_joiner``, which indicates if the rank is one of the last to join.

        Arguments:
            is_last_joiner (bool): ``True`` if the rank is one of the last to
                join; ``False`` otherwise.
        Nr   )r   r   r   r   r   	post_hook    r   zJoinHook.post_hookr   N)__name__
__module____qualname____doc__r   boolr   r   r   r   r   r      s    
r   c                       sf   e Zd ZdZed fddZedefddZeede	j
fdd	Zeedefd
dZ  ZS )r	   a_  
    This defines an abstract base class for joinable classes.

    A joinable class
    (inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
    which returns a :class:`JoinHook` instance, in addition to
    :meth:`join_device` and :meth:`join_process_group` that return device and
    process group information, respectively.
    r   Nc                    s   t    t | _d S N)super__init___JoinConfigconstruct_disabled_join_config_join_configr   	__class__r   r   r   7   s   
zJoinable.__init__c                 K   r   )a  
        Return a :class:`JoinHook` instance for the given :class:`Joinable`.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.
        Nr   )r   kwargsr   r   r   	join_hook<   s   zJoinable.join_hookc                 C   r   )zeReturn the device from which to perform collective communications needed by the join context manager.Nr   r   r   r   r   join_deviceI      zJoinable.join_devicec                 C   r   )zfReturns the process group for the collective communications needed by the join context manager itself.Nr   r   r   r   r   join_process_groupO   r&   zJoinable.join_process_groupr   )r   r   r   r   r   r   r   r$   propertytorchdevicer%   r   r'   __classcell__r   r   r!   r   r	   ,   s    
r	   c                   @   s6   e Zd ZU dZeed< eed< eed< edd ZdS )r   zdThis includes all fields needed from a :class:`Joinable` instance for the join context manager side.enablethrow_on_early_terminationis_first_joinablec                   C   s   t ddddS )zReturn a :class:`_JoinConfig` instance indicating that join-related logic should be disabled.

        e.g. if the caller is not in a join context manager.
        Fr,   r-   r.   )r   r   r   r   r   r   ]   s   z*_JoinConfig.construct_disabled_join_configN)r   r   r   r   r   __annotations__staticmethodr   r   r   r   r   r   V   s   
 r   c                   @   s   e Zd ZdZ		ddee dedefddZdddZdddZ	dd Z
deee  dee dee fddZdd Zdd ZedefddZd
S )r
   a
  
    This class defines the generic join context manager, which allows custom hooks to be called after a process joins.

    These hooks should shadow the
    collective communications of non-joined processes to prevent hanging and
    erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
    for details about the hook definition.

    .. warning::
        The context manager requires each participating :class:`Joinable` to
        call the method :meth:`notify_join_context()` before its own per-
        iteration collective communications to ensure correctness.

    .. warning::
        The context manager requires that all ``process_group`` attributes in
        the :class:`JoinHook` objects are the same. If there are multiple
        :class:`JoinHook` objects, then the ``device`` of the first is used.
        The process group and device information is used for checking for non-
        joined processes and for notifying processes to throw an exception if
        ``throw_on_early_termination`` is enabled, both of which using an all-
        reduce.

    Arguments:
        joinables (List[Joinable]): a list of the participating
            :class:`Joinable` s; their hooks are iterated over in the given
            order.

        enable (bool): a flag enabling uneven input detection; setting to
            ``False`` disables the context manager's functionality and should
            only be set when the user knows the inputs will not be uneven
            (default: ``True``).

        throw_on_early_termination (bool): a flag controlling whether to throw an
            exception upon detecting uneven inputs (default: ``False``).

    Example::

        >>> import os
        >>> import torch
        >>> import torch.distributed as dist
        >>> import torch.multiprocessing as mp
        >>> # xdoctest: +SKIP
        >>> import torch.nn.parallel.DistributedDataParallel as DDP
        >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
        >>> from torch.distributed.algorithms.join import Join
        >>>
        >>> # On each spawned worker
        >>> def worker(rank):
        >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
        >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
        >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
        >>>     # Rank 1 gets one more input than rank 0
        >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
        >>>     with Join([model, optim]):
        >>>         for input in inputs:
        >>>             loss = model(input).sum()
        >>>             loss.backward()
        >>>             optim.step()
        >>>     # All ranks reach here without hanging/erroring
    TF	joinablesr,   r-   c                    sP   t |dkr
td|| _ fdd| jD | _|| _|| _|   |   d S )Nr   z7The join context manager requires at least one joinablec                    s   g | ]
}|j d i  qS )r   )r$   ).0joinabler#   r   r   
<listcomp>   s    z!Join.__init__.<locals>.<listcomp>)len
ValueError
_joinables_join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   r2   r,   r-   r#   r   r5   r   r      s   
zJoin.__init__r   Nc                 C   s>   t | jdks	J d}| jD ]}t| j| j|d|_d}qdS )zESet the :class:`_JoinConfig` of each participating :class:`Joinable`.r   Tr/   FN)r7   r9   r   r;   r<   r    )r   r.   r4   r   r   r   r=      s   
zJoin._set_joinable_configsc                 C   sb   d}d}| j D ]}|du r|j}n	||jkrtd|du r!|j}q|| _t| j| _|| _dS )a  
        Extract the process group and device information from the joinables.

        If there are multiple joinables, then the context manager uses the
        first specified device.

        Preconditions:
            ``self._joinables`` is not ``None`` and is non-empty.

        Raises:
            ValueError
                If there are multiple conflicting ``process_group`` attributes
                among the ``Joinable`` objects.
        Nz7Using join context manager with multiple process groups)	r9   r'   r8   r%   _process_groupdistZget_rank_rank_device)r   process_groupr*   r4   r   r   r   r>      s   


zJoin._extract_dist_infoc                 C   s   d S r   r   r   r   r   r   	__enter__   r   zJoin.__enter__typevalue	tracebackc           
   	   C   s   | j r|rdS d}d}d}d}td |sN||kr*td| d| j d	| d
 |  }|dkr5d}n| jr<|   | jD ]}	|		  q?d}|d7 }|r| jD ]}	|	
| qQdS )z
        Repeatedly runs the main hooks until all processes join; then, runs the post-hooks.

        Raises:
            RuntimeError
                If ``throw_on_early_termination=True``.
        NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )r;   warningssimplefilterwarnrA   _get_num_nonjoined_procsr<   _notify_procs_to_terminater:   r   r   )
r   rE   rF   rG   Zall_procs_joinedr   iZWARN_THRESHOLDnum_nonjoined_procsr$   r   r   r   __exit__   s>   

	


zJoin.__exit__c                 C   s(   t jd| jd}tj|| jd | S )zaReturn the number of non-joined processes by shadowing an all-reduce in the non-joined processes.rI   r*   group)r)   zerosrB   r@   
all_reducer?   item)r   rP   r   r   r   rM     s   zJoin._get_num_nonjoined_procsc                 C   s2   t jd| jd}tj|| jd td| j d)zSchedule an all-reduce to notify non-joined processes to terminate.

        Also raise a ``RuntimeError`` indicating that the current process has exhausted its inputs.
        rI   rR   rS   zRank z exhausted all inputs.)r)   onesrB   r@   rV   r?   RuntimeErrorrA   )r   rX   r   r   r   rN     s   zJoin._notify_procs_to_terminater4   c                 C   s   t | dsJ dt|  d| j}|jr|jsdS | j}| j}tjd|d}t	j
||dd}|jrJtjd|d}t	j
||d	 | }|rJtd
|S )aH  
        Notifies the join context manager that the calling process has not yet joined.

        Then, if ``throw_on_early_termination=True``, checks if uneven inputs have been detected
        (i.e. if one process has already joined) and throws an exception if so.

        This method should be called from a :class:`Joinable` object before
        its per-iteration collective communications. For example, this should
        be called at the beginning of the forward pass in
        :class:`DistributedDataParallel`.

        Only the first :class:`Joinable` object passed into the context
        manager performs the collective communications in this method, and
        for the others, this method is vacuous.

        Arguments:
            joinable (Joinable): the :class:`Joinable` object calling this
                method.

        Returns:
            An async work handle for the all-reduce meant to notify the context
            manager that the process has not yet joined if ``joinable`` is the
            first one passed into the context manager; ``None`` otherwise.
        r    zCheck that the z/ constructor calls the ``Joinable`` constructorNrI   rR   T)rT   Zasync_oprS   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrE   r    r.   r,   r%   r'   r)   rX   r@   rV   r-   rU   rW   rY   )r4   Zjoin_configr*   rC   rX   ZworkrU   Zshould_throwr   r   r   notify_join_context'  s&   zJoin.notify_join_context)TFr   )r   r   r   r   listr	   r   r   r=   r>   rD   r   rE   BaseExceptionr   rQ   rM   rN   r1   r[   r   r   r   r   r
   h   s2    @




4	r
   )rJ   abcr   r   typesr   typingr   r   r   r)   Ztorch.distributeddistributedr@   __all__r   r	   r   r
   r   r   r   r   <module>   s   *