
    [ThJ4                         S SK r S SKJrJr  S SKJr  S SKJrJrJ	r	  S SK
r
S SKJr  / SQr " S S5      r " S S	\5      r " S
 S\5      r " S S5      rg)    N)ABCabstractmethod)TracebackType)Any
NamedTupleOptional)JoinHookJoinableJoinc                   4    \ rS rSrSrS	S jrS\SS4S jrSrg)
r	      a  
This defines a join hook, which provides two entry points in the join context manager.

Entry points : a main hook, which is called repeatedly while there exists a non-joined
process, and a post-hook, which is called once all processes have joined.

To implement a join hook for the generic join context manager, define a
class that inherits from :class:`JoinHook` and override ``main_hook()`` and
``post_hook()`` as appropriate.
returnNc                     g)zCall this hook while there exists a non-joined process to shadow collective communications in a training iteration.

Training iteration i.e., in one forward pass, backward pass, and optimizer step.
N selfs    Y/var/www/auris/envauris/lib/python3.13/site-packages/torch/distributed/algorithms/join.py	main_hookJoinHook.main_hook           is_last_joinerc                     g)a  
Call hook after all processes have joined.

It is passed an additional ``bool`` argument ``is_last_joiner``, which indicates if the rank is one of the last to join.

Arguments:
    is_last_joiner (bool): ``True`` if the rank is one of the last to
        join; ``False`` otherwise.
Nr   )r   r   s     r   	post_hookJoinHook.post_hook    r   r   r   r   N)	__name__
__module____qualname____firstlineno____doc__r   boolr   __static_attributes__r   r   r   r	   r	      s    		 	 	r   r	   c                      ^  \ rS rSrSr\S	U 4S jj5       r\S\4S j5       r\	\S\
R                  4S j5       5       r\	\S\4S j5       5       rSrU =r$ )
r
   ,   aC  
This defines an abstract base class for joinable classes.

A joinable class
(inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
which returns a :class:`JoinHook` instance, in addition to
:meth:`join_device` and :meth:`join_process_group` that return device and
process group information, respectively.
r   c                 T   > [         TU ]  5         [        R                  5       U l        g N)super__init___JoinConfigconstruct_disabled_join_config_join_config)r   	__class__s    r   r)   Joinable.__init__7   s    'FFHr   c                     g)aV  
Return a :class:`JoinHook` instance for the given :class:`Joinable`.

Arguments:
    kwargs (dict): a :class:`dict` containing any keyword arguments
        to modify the behavior of the join hook at run time; all
        :class:`Joinable` instances sharing the same join context
        manager are forwarded the same value for ``kwargs``.
Nr   )r   kwargss     r   	join_hookJoinable.join_hook<   s     	r   c                     g)zeReturn the device from which to perform collective communications needed by the join context manager.Nr   r   s    r   join_deviceJoinable.join_deviceI        	r   c                     g)zfReturns the process group for the collective communications needed by the join context manager itself.Nr   r   s    r   join_process_groupJoinable.join_process_groupO   r6   r   )r,   r   )r   r   r   r    r!   r   r)   r	   r1   propertytorchdevicer4   r   r8   r#   __classcell__)r-   s   @r   r
   r
   ,   s     I I 
X 
 
 U\\    C   r   r
   c                   H    \ rS rSr% Sr\\S'   \\S'   \\S'   \S 5       rSr	g)	r*   V   zdThis includes all fields needed from a :class:`Joinable` instance for the join context manager side.enablethrow_on_early_terminationis_first_joinablec                      [        SSSS9$ )zReturn a :class:`_JoinConfig` instance indicating that join-related logic should be disabled.

e.g. if the caller is not in a join context manager.
Fr@   rA   rB   )r*   r   r   r   r+   *_JoinConfig.construct_disabled_join_config]   s     Ue
 	
r   r   N)
r   r   r   r    r!   r"   __annotations__staticmethodr+   r#   r   r   r   r*   r*   V   s(    oL $$
 
r   r*   c                       \ rS rSrSr  SS\\   S\S\4S jjrSS jr	SS	 jr
S
 rS\\\      S\\   S\\   4S jrS rS r\S\4S j5       rSrg)r   h   a
  
This class defines the generic join context manager, which allows custom hooks to be called after a process joins.

These hooks should shadow the
collective communications of non-joined processes to prevent hanging and
erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
for details about the hook definition.

.. warning::
    The context manager requires each participating :class:`Joinable` to
    call the method :meth:`notify_join_context()` before its own per-
    iteration collective communications to ensure correctness.

.. warning::
    The context manager requires that all ``process_group`` attributes in
    the :class:`JoinHook` objects are the same. If there are multiple
    :class:`JoinHook` objects, then the ``device`` of the first is used.
    The process group and device information is used for checking for non-
    joined processes and for notifying processes to throw an exception if
    ``throw_on_early_termination`` is enabled, both of which using an all-
    reduce.

Arguments:
    joinables (List[Joinable]): a list of the participating
        :class:`Joinable` s; their hooks are iterated over in the given
        order.

    enable (bool): a flag enabling uneven input detection; setting to
        ``False`` disables the context manager's functionality and should
        only be set when the user knows the inputs will not be uneven
        (default: ``True``).

    throw_on_early_termination (bool): a flag controlling whether to throw an
        exception upon detecting uneven inputs (default: ``False``).

Example::

    >>> import os
    >>> import torch
    >>> import torch.distributed as dist
    >>> import torch.multiprocessing as mp
    >>> # xdoctest: +SKIP
    >>> import torch.nn.parallel.DistributedDataParallel as DDP
    >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
    >>> from torch.distributed.algorithms.join import Join
    >>>
    >>> # On each spawned worker
    >>> def worker(rank):
    >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
    >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
    >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
    >>>     # Rank 1 gets one more input than rank 0
    >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
    >>>     with Join([model, optim]):
    >>>         for input in inputs:
    >>>             loss = model(input).sum()
    >>>             loss.backward()
    >>>             optim.step()
    >>>     # All ranks reach here without hanging/erroring
	joinablesr@   rA   c                    [        U5      S:X  a  [        S5      eXl        U R                   Vs/ s H  oUR                  " S0 UD6PM     snU l        X l        X0l        U R                  5         U R                  5         g s  snf )Nr   z7The join context manager requires at least one joinabler   )	len
ValueError
_joinablesr1   _join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   rJ   r@   rA   r0   joinables         r   r)   Join.__init__   sw     y>QVWW#9=
9HX((
 +E(""$!
s   A?Nc                     [        U R                  5      S:  d   eSnU R                   H)  n[        U R                  U R                  US9Ul        SnM+     g)zESet the :class:`_JoinConfig` of each participating :class:`Joinable`.r   TrD   FN)rL   rN   r*   rP   rQ   r,   )r   rB   rT   s      r   rR   Join._set_joinable_configs   sU    4??#a''' H$/||+/+K+K"3%H!
 !& (r   c                 
   SnSnU R                    H>  nUc  UR                  nOXR                  :w  a  [        S5      eUb  M2  UR                  nM@     Xl        [
        R                  " U R                  5      U l        X l        g)as  
Extract the process group and device information from the joinables.

If there are multiple joinables, then the context manager uses the
first specified device.

Preconditions:
    ``self._joinables`` is not ``None`` and is non-empty.

Raises:
    ValueError
        If there are multiple conflicting ``process_group`` attributes
        among the ``Joinable`` objects.
Nz7Using join context manager with multiple process groups)	rN   r8   rM   r4   _process_groupdistget_rank_rank_device)r   process_groupr<   rT   s       r   rS   Join._extract_dist_info   s~     H$ ( ; ;"="== M  ~!-- ( ,]]4#6#67
r   c                     g r'   r   r   s    r   	__enter__Join.__enter__   s    r   typevalue	tracebackc           	          U R                   (       a  U(       a  gSnSnSnSn[        R                  " S5        U(       d  Xg:  a*  [        R                  " SU SU R                   S	U S
35        U R                  5       nUS:X  a  SnOKU R                  (       a  U R                  5         U R                   H  n	U	R                  5         M     SnUS-  nU(       d  M  U R                   H  n	U	R                  U5        M     g)z
Repeatedly runs the main hooks until all processes join; then, runs the post-hooks.

Raises:
    RuntimeError
        If ``throw_on_early_termination=True``.
NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )rP   warningssimplefilterwarnr\   _get_num_nonjoined_procsrQ   _notify_procs_to_terminaterO   r   r   )
r   rc   rd   re   all_procs_joinedr   iWARN_THRESHOLDnum_nonjoined_procsr1   s
             r   __exit__Join.__exit__   s     ||t f%"!A%&&=zzl.0@ A33 #'"?"?"A"a'#' 33335 "&!1!1I'') "2 "'Q/ #"4 ))I/ *r   c                     [         R                  " SU R                  S9n[        R                  " XR
                  S9  UR                  5       $ )zaReturn the number of non-joined processes by shadowing an all-reduce in the non-joined processes.rh   r<   group)r;   zerosr]   rZ   
all_reducerY   item)r   rq   s     r   rl   Join._get_num_nonjoined_procs  s9    #kk!DLLA+3F3FG"''))r   c                     [         R                  " SU R                  S9n[        R                  " XR
                  S9  [        SU R                   S35      e)zSchedule an all-reduce to notify non-joined processes to terminate.

Also raise a ``RuntimeError`` indicating that the current process has exhausted its inputs.
rh   ru   rv   zRank z exhausted all inputs.)r;   onesr]   rZ   ry   rY   RuntimeErrorr\   )r   r}   s     r   rm   Join._notify_procs_to_terminate  sC    
 zz!DLL1$7$78U4::,.DEFFr   rT   c                    [        U S5      (       d   S[        U 5       S35       eU R                  nUR                  (       a  UR                  (       d  gU R
                  nU R                  n[        R                  " SUS9n[        R                  " XCSS9nUR                  (       aK  [        R                  " SUS9n[        R                  " XcS	9  UR                  5       nU(       a  [        S
5      eU$ )a  
Notifies the join context manager that the calling process has not yet joined.

Then, if ``throw_on_early_termination=True``, checks if uneven inputs have been detected
(i.e. if one process has already joined) and throws an exception if so.

This method should be called from a :class:`Joinable` object before
its per-iteration collective communications. For example, this should
be called at the beginning of the forward pass in
:class:`DistributedDataParallel`.

Only the first :class:`Joinable` object passed into the context
manager performs the collective communications in this method, and
for the others, this method is vacuous.

Arguments:
    joinable (Joinable): the :class:`Joinable` object calling this
        method.

Returns:
    An async work handle for the all-reduce meant to notify the context
    manager that the process has not yet joined if ``joinable`` is the
    first one passed into the context manager; ``None`` otherwise.
r,   zCheck that the z/ constructor calls the ``Joinable`` constructorNrh   ru   T)rw   async_oprv   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrc   r,   rB   r@   r4   r8   r;   r}   rZ   ry   rA   rx   rz   r~   )rT   join_configr<   r^   r}   workrx   should_throws           r   notify_join_contextJoin.notify_join_context'  s    4 x00 	
d8n- .' '	
0
 ++,,K4F4F%% 33 zz!F+t4H11KK&1EOOE7 ::<L"1  r   )r]   rP   rO   rN   rY   r\   rQ   )TFr   )r   r   r   r    r!   listr
   r"   r)   rR   rS   ra   r   rc   BaseExceptionr   rr   rl   rm   rG   r   r#   r   r   r   r   r   h   s    ;@ +0	">" " %)	"$
&< 20tM*+20 &20 M*	20h*G 4h 4 4r   r   )ri   abcr   r   typesr   typingr   r   r   r;   torch.distributeddistributedrZ   __all__r	   r
   r*   r   r   r   r   <module>r      sP     #  , ,    + <'s 'T
* 
$t tr   