
    [Th^              	       @   % S SK r S SKrS SKrS SKrS SKrS SKrS SKJrJrJ	r	J
r
  S SKrS SKJrJrJrJrJrJrJrJrJrJrJrJrJrJrJrJrJrJr  S SKJ r   SSK!J"r"J#r#  SSK$J%r%J&r&  SSK'J(r(J)r)J*r*J+r+  / S	Qr,\RZ                  " \.5      r/S
r0\)q1\Rd                  S 5       r3S r4 " S S5      r5\6" 5       q7\6\   \8S'   \Rr                  " 5       r:0 r;\<\=\>4   \8S'   \ R~                  " \55      r@\ R~                  \8S'   S rAS7S jrBS rC\R                  " 5       rE\Rd                  S 5       rF\4S\&4S\G4S jj5       rH\4S 5       rI\4\%4S j5       rJ\4S
\%4S j5       rKS rL\4S7S j5       rMS rNS8S\O4S jjrP\&S
4S\GS\O4S  jjrQ\
" S!5      rR\\R   rS\	(       a   " S" S#\\R   \\R   5      rTO  " S$ S#\\\R   5      rTS) rX\R                  " \5       H^  u  rZr[\ZR                  S*5      (       a  \ZS+:w  a  M$   \]" \[S,S5      r^\^c   S-5       e\^R                  S.S/5      r^\X" \Z\^5      r`\a" \T\Z\`5        M`     \4SS\&4S0 j5       rbSS\&4S1\G4S2 jjrc\4SS\&4S\G4S3 jj5       rd\4SS\&4S4 j5       reS5 rfS6 rgg! \U a.     " S% S&\R                  \SR                  5      rW " S' S#\\S\WS(9rT Nf = f)9    N)AnyGenericTYPE_CHECKINGTypeVar)_cleanup_python_rpc_handler)_delete_all_user_and_unforked_owner_rrefs_destroy_rref_context_get_current_rpc_agent_invoke_remote_builtin_invoke_remote_python_udf_invoke_remote_torchscript_invoke_rpc_builtin_invoke_rpc_python_udf_invoke_rpc_torchscript_is_current_rpc_agent_set_reset_current_rpc_agent_set_and_start_rpc_agentget_rpc_timeoutPyRRefRemoteProfilerManagerTensorPipeAgent
WorkerInfo)Future   )_group_membership_management_update_group_membership)DEFAULT_SHUTDOWN_TIMEOUTUNSET_RPC_TIMEOUT)_build_rpc_profiling_key_internal_rpc_pickler	PythonUDFRPCExecMode)	shutdownget_worker_inforemoterpc_sync	rpc_asyncRRefAllGatherStatesmethod_factory
new_methodTc              #   <   #    U q  Sv   [        q g! [        q f = f7f)zP
rpc_pickler: (.internal._InternalRPCPickler) Overrides the default RPC pickler
N)_default_picklerr    )rpc_picklers    Q/var/www/auris/envauris/lib/python3.13/site-packages/torch/distributed/rpc/api.py_use_rpc_picklerr0   I   s"      #100s    c                 F   ^  [         R                  " T 5      U 4S j5       nU$ )Nc                  H   > [        5       (       d  [        S5      eT" U 0 UD6$ )NzHRPC has not been initialized. Call torch.distributed.rpc.init_rpc first.)r   RuntimeError)argskwargsfuncs     r/   wrapper%_require_initialized.<locals>.wrapperW   s0    (**8  T$V$$    )	functoolswraps)r6   r7   s   ` r/   _require_initializedr<   V   s%    __T% % Nr9   c                       \ rS rSrS rSrg)r)   c   c                 F    0 U l         [        R                  " 5       U l        g N)gathered_objects	threadingEventproceed_signal)selfs    r/   __init__AllGatherStates.__init__d   s     !# (oo/r9   )rA   rD   N)__name__
__module____qualname____firstlineno__rF   __static_attributes__ r9   r/   r)   r)   c   s    0r9   r)   _ALL_WORKER_NAMES_all_gather_sequence_id!_all_gather_sequence_id_to_statesc                     U R                  5       nU Vs1 s H  o"R                  iM     snq[        5       (       d  [	        U 5        g g s  snf r@   )get_worker_infosnamerN   r   r   )agentworker_infosworker_infos      r/   _init_rpc_statesrW   ~   sG    ))+L=IJ\k))\J %&& ' ' Ks   A	c                 d   [            U(       d  [        nX;   d
   U S35       e[        U    nXR                  ;  d   U SU  S35       eX$R                  U'   U[	        UR                  R                  5       5      :X  a  UR                  R	                  5         S S S 5        g ! , (       d  f       g = f)Nz is not expected by leader.z reported intent sequence id z twice. )_all_gather_dict_lockrN   rP   rA   setkeysrD   )sequence_idworker_nameobjworker_namesstatess        r/   _gather_to_leaderra      s    	,L. -:;. 3;?"9"99 	
m8XN	
9 03,3v66;;=>>!!%%' 
		s   BB!!
B/c                     [            [        U    nS S S 5        WR                  R                  5       (       a   SU  S35       eXl        UR                  R                  5         g ! , (       d  f       NY= f)NzTermination signal sequence id z got set twice.)rY   rP   rD   is_setrA   rZ   )r\   objects_mapr`   s      r/   _broadcast_to_followersre      sg    	2;? 
 $$++-- 
)+oF- *
 
	s   
A$$
A2c               #   2  #    / [         l         Sv    [        R                  R	                  [         R                  5        [         ?g! [         ?f = f!  [        R                  R	                  [         R                  5        [         ?f ! [         ?f = f= f7f)aD  
A context manager that collects all futures returned by ``rpc_async`` and
waits them on the context manager's exit; relieving the user of needing
to explicitly call wait.


Example::
    >>> # xdoctest: +SKIP("distributed")
    >>> # On worker 0:
    >>> import torch
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> with rpc._wait_all():
    >>>    fut_1 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
    >>>    fut_2 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
    >>> #fut_1 and fut_2 are waited on
N)_thread_local_varfuture_listtorchfutureswait_allrM   r9   r/   	_wait_allrl      so     & %'!.	.MM""#4#@#@A!-!-	.MM""#4#@#@A!-!-sE   BA -A BABB-B	B	BBBtimeoutc           	         U(       d  [         c   S5       e[         n[        U5      n[        5       R                  5       R                  n[
           SR                  [        U5      5      n[        R                  US5      nUS-   [        U'   U[        U5      -   nSSS5        X4:H  nU[        :X  a  [        5       n	Sn
OU[        :X  a  Un	Sn
OU=pU(       a  [        WX@U5        O[        U[        WX@U4U	S9  [
           [         U   nSSS5        WR"                  R%                  U
S9  U(       a  0 nX1-
   H"  n['        U[(        X{R*                  4U	S9nXU'   M$     / nUR-                  5        H  u  p UR%                  5         M     U(       a/  [/        SU Vs/ s H  nUS   PM
     sn S	U	S
 SUS   S    35      e[
           [         R3                  U5      nSSS5        UR*                  $ ! , (       d  f       GN[= f! , (       d  f       N= f! [.         a  nUR1                  UU45         SnAM  SnAff = fs  snf ! , (       d  f       UR*                  $ = f)ap  
This is similar to torch.distributed.all_gather(), but is using RPC. It
picks the worker with the smallest name (alphabetic order) as the leader.
Then all followers send their data ``obj`` to the leader. After the leader
has received all, it will broadcast the results back to all followers. This
function blocks until all workers have received the gathered results.
Nz=`_ALL_WORKER_NAMES` is not initialized for `def _all_gather`. r   r   r4   rm   rm   z
Followers z  timed out in _all_gather after z.2fz! seconds. The first exception is )rN   minr
   r$   rS   rY   joinsortedrO   getstrr   r   r   ra   r&   rP   rD   waitr'   re   rA   itemsr3   appendpop)r^   r_   rm   leader_name	self_nameconcat_namessequence_numr\   	is_leaderrpc_timeoutsignal_timeoutr`   #worker_name_to_response_future_dictfollower_namefuterrorsexes                     r/   _all_gatherr      sa     , 	
K	
, )l#K&(88:??I	wwvl34.22<C0<q0@-"S%66	 
 (I##%'	,	, (/. +y|Dy|<		
 
2;? 
 ~6
 .0+)M9M'!#:#:;#	C BE> : "E"K"K"MM3
 #N F3FqadF34 5$S))J6RS9UV<.Z  
266{C 
"""K 
	B 
	0   3}b1223
 4
 
	"""sC   AG.=
H 4HH<
I.
G= 
H
H9H44H9
Ic                      [        S[        U 5      5        g! [         a   n[        R	                  SU5         SnAgSnAff = f)a  
Synchronizes local and remote RPC processes.

This will block until all local and remote RPC processes specified under worker_names
reach this method to wait for all outstanding work to complete.

Args:
    worker_names (List[str]): The set of workers to synchronize.

Nz(Failed to complete barrier, got error %s)r   rZ   r3   loggererror)r_   r   s     r/   _barrierr     s;    ED#l+, E?DDEs    
A=Ac                 n     [        SU S9  g! [         a  n[        R                  SU5        UeSnAff = f)aO  
Block until all local and remote RPC processes reach this method and wait
for all outstanding work to complete. Every RPC process must call this
method before exit to perform a graceful shutdown. This should be used to
terminate the RPC framework, and there is no guarantee that the RPC
framework will work after this method returns.
Nrq   z=Failed to respond to 'Shutdown Proceed' in time, got error %s)r   r3   r   r   )rm   r   s     r/   _wait_all_workersr   /  s9    D'* KR	
 	s   
 
4/4c           
      R   U (       a   [        5       n[        U[        5      (       a  UR                  (       a&  [	        U5        [        5         UR                  SUS9  OUR                  5       nUR                  n[        UR                  US5         UR                  5       nU H2  nUR                  U:w  d  M  [        UR                  [        U/ 0 S4S9  M4     UR                  SUS9  SSS5        [        5         g[        5         g! , (       d  f       N$= f! [        5         f = f)ap  
Perform a shutdown of the RPC agent, and then destroy the RPC agent. This
stops the local agent from accepting outstanding requests, and shuts
down the RPC framework by terminating all RPC threads. If ``graceful=True``,
this will block until all local and remote RPC processes reach this method
and wait for all outstanding work to complete. Otherwise, if
``graceful=False``, this is a local shutdown, and it does not wait for other
RPC processes to reach this method.

.. warning::
    For :class:`~torch.futures.Future` objects returned by
    :meth:`~torch.distributed.rpc.rpc_async`, ``future.wait()`` should not
    be called after ``shutdown()``.

Args:
    graceful (bool): Whether to do a graceful shutdown or not. If True,
                     this will 1) wait until there is no pending system
                     messages for ``UserRRefs`` and delete them; 2) block
                     until all local and remote RPC processes have reached
                     this method and wait for all outstanding work to
                     complete.

Example::
    Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
    on both workers. Refer to :meth:`~torch.distributed.init_process_group`
    API for more details. For example,

    export MASTER_ADDR=localhost
    export MASTER_PORT=5678

    Then run the following code in two different processes:

    >>> # xdoctest: +SKIP
    >>> # On worker 0:
    >>> import torch
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> # do some work
    >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
    >>> # ready to shutdown
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> # wait for worker 0 to finish work, and then shutdown.
    >>> rpc.shutdown()
T)r#   rm   F)r4   N)r
   
isinstancer   is_static_groupr   r   rs   r$   rS   r   storerR   r&   r   _finalize_shutdown)gracefulrm   rT   my_worker_infomy_nameall_worker_infosworkers          r/   r#   r#   A  s    d 	!*,Ee_559N9N!'*9;

D'
: "'!6!6!8(--1%++wN','='='?$"2!;;'1$ & 8&4b"e%D #3 JJgJ> O   ON  s*   B	D $D	:1D	+D 	
DD D&c                       [        [        5        [        5       R                  5         [	        5         [        5         g ! [        5       R                  5         [	        5         [        5         f = fr@   )r	   _ignore_rref_leakr
   r#   r   r   rM   r9   r/   r   r     sN    #/0 ))+ 	$% " 	 ))+ 	$% "s	   > .A,c                 j    U b  [        5       R                  U 5      $ [        5       R                  5       $ )a  
Get :class:`~torch.distributed.rpc.WorkerInfo` of a given worker name.
Use this :class:`~torch.distributed.rpc.WorkerInfo` to avoid passing an
expensive string on every invocation.

Args:
    worker_name (str): the string name of a worker. If ``None``, return the
                       the id of the current worker. (default ``None``)

Returns:
    :class:`~torch.distributed.rpc.WorkerInfo` instance for the given
    ``worker_name`` or :class:`~torch.distributed.rpc.WorkerInfo` of the
    current worker if ``worker_name`` is ``None``.
)r
   r$   )r]   s    r/   r$   r$     s0      %'77DD%'7799r9   c                     [        U [        5      (       a  U $ [        U [        [        45      (       a  [	        U 5      $ [        SU  35      e)Nz Cannot get WorkerInfo from name )r   r   rv   intr$   
ValueError)tos    r/   _to_worker_infor     sC    "j!!		Bc
	#	#r"";B4@AAr9   blockingc                     [        U R                  5       5      nU(       a  U$ [        [            " 5       nUR                  U5        U$ r@   )typelocal_valuer   
set_result)rrefr   	rref_typefutures       r/   _rref_typeof_on_ownerr     s>    T%%'(I
 )$r9   c                 r    [        U R                  5       [        U 4US9nU(       a  UR                  5       $ U$ )Nrp   )r'   ownerr   rw   )r   rm   r   r   s       r/   _rref_typeof_on_userr     s1     DJJL"7tgw
WCxxz
r9   Tc                       \ rS rSrSrg)r(   i  rM   NrH   rI   rJ   rK   rL   rM   r9   r/   r(   r(     s    r9   r(   c                       \ rS rSrSrg)r(   i  rM   Nr   rM   r9   r/   r(   r(         r9   c                       \ rS rSrSrg)RRefMetai  rM   Nr   rM   r9   r/   r   r     r   r9   r   c                       \ rS rSrSrg)r(   i  rM   Nr   rM   r9   r/   r(   r(     r   r9   )	metaclassc                 B   ^  U 4S jnUR                   (       a  Xl         U$ )Nc                 D   > [        [        [        U 5      T5      " U0 UD6$ r@   )getattrsuperr(   )rE   r4   r5   method_names      r/   methodmethod_factory.<locals>.method  s!    uT4(+6GGGr9   )__doc__)r   	docstringr   s   `  r/   r*   r*     s    H ~~"Mr9   ___str__r   z4RRef user-facing methods should all have docstrings.ztorch.distributed.rpc.PyRRefztorch.distributed.rpc.RRefc                    [         R                  R                  S5        [         R                  R                  R                  U5      n[        U 5      n[        5       n[        XuU[        R                  U5      nU n	U(       a  UOSnU(       a  UO0 n[        US5      n
U
(       a7  UR                  n[        U[         R                  R                  5      (       a  UnUb  [        XeU/UQ70 UD6nO[        U[         R                  R                  5      (       a:  [!        UR"                  [         R$                  R'                  U5      UU
/UQ70 UD6nO.[(        R+                  [-        XU5      5      u  p[/        XmXU
5      nU(       aZ  [         R0                  R3                  5       (       d   eU	c   eU	R5                  UR7                  5       5      nUR9                  U5        SSS5        U$ ! , (       d  f       W$ = f)a  
Make a remote call to run ``func`` on worker ``to`` and return an
:class:`~torch.distributed.rpc.RRef` to the result value immediately.
Worker ``to`` will be the owner of the returned
:class:`~torch.distributed.rpc.RRef`, and the worker calling ``remote`` is
a user. The owner manages the global reference count of its
:class:`~torch.distributed.rpc.RRef`, and the owner
:class:`~torch.distributed.rpc.RRef` is only destructed when globally there
are no living references to it.

Args:
    to (str or WorkerInfo or int): name/rank/``WorkerInfo`` of the destination worker.
    func (Callable): a callable function, such as Python callables, builtin
                     operators (e.g. :meth:`~torch.add`) and annotated
                     TorchScript functions.
    args (tuple): the argument tuple for the ``func`` invocation.
    kwargs (dict): is a dictionary of keyword arguments for the ``func``
                   invocation.

    timeout (float, optional): timeout in seconds for this remote call. If the
                               creation of this
                               :class:`~torch.distributed.rpc.RRef` on worker
                               ``to`` is not successfully processed on this
                               worker within this timeout, then the next time
                               there is an attempt to use the RRef (such as
                               ``to_here()``), a timeout will be raised
                               indicating this failure. A value of 0 indicates
                               an infinite timeout, i.e. a timeout error will
                               never be raised. If not provided, the default
                               value set during initialization or with
                               ``_set_rpc_timeout`` is used.

Returns:
    A user :class:`~torch.distributed.rpc.RRef` instance to the result
    value. Use the blocking API :meth:`torch.distributed.rpc.RRef.to_here`
    to retrieve the result value locally.

.. warning ::
    The ``remote`` API does not copy storages of argument tensors until
    sending them over the wire, which could be done by a different thread
    depending on the RPC backend type. The caller should make sure that the
    contents of those tensors stay intact until the returned RRef is
    confirmed by the owner, which can be checked using the
    :meth:`torch.distributed.rpc.RRef.confirmed_by_owner` API.

.. warning ::
    Errors such as timeouts for the ``remote`` API are handled on a
    best-effort basis. This means that when remote calls initiated by
    ``remote`` fail, such as with a timeout error, we take a best-effort
    approach to error handling. This means that errors are handled and set
    on the resulting RRef on an asynchronous basis. If the RRef has not been
    used by the application before this handling (such as ``to_here`` or
    fork call), then future uses of the ``RRef`` will appropriately raise
    errors. However, it is possible that the user application will use the
    ``RRef`` before the errors are handled. In this case, errors may not be
    raised as they have not yet been handled.

Example::

    Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
    on both workers. Refer to :meth:`~torch.distributed.init_process_group`
    API for more details. For example,

    export MASTER_ADDR=localhost
    export MASTER_PORT=5678

    Then run the following code in two different processes:

    >>> # xdoctest: +SKIP
    >>> # On worker 0:
    >>> import torch
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
    >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
    >>> x = rref1.to_here() + rref2.to_here()
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> rpc.shutdown()

    Below is an example of running a TorchScript function using RPC.

    >>> # On both workers:
    >>> @torch.jit.script
    >>> def my_script_add(tensor: torch.Tensor, scalar: int):
    >>>    return torch.add(tensor, scalar)

    >>> # On worker 0:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
    >>> rref.to_here()
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> rpc.shutdown()
ztorch.distributed.rpc_remoterM   _wrapped_async_rpc_functionN)ri   _C_log_api_usage_oncejit	_builtins_find_builtinr   _get_should_profile_enable_rpc_profilerr"   REMOTEhasattrr   r   ScriptFunctionr   r   rS   _jit_internal_qualified_namer-   	serializer!   r   autograd_profiler_enabled_call_end_callbacks_on_future_get_future_set_profiling_future)r   r6   r4   r5   rm   qualified_namedst_worker_infoshould_profilectx_managerrfis_async_execwrappedr   pickled_python_udftensorsr   s                   r/   r%   r%   !  s   P 
HH  !?@YY((66t<N%b)O(*N&k.@.@/K 
t!r&CD66G'599#;#;<<%);?CID eii6677-$$##33D9	
  D -=,F,F$f--) -W}D >>335555>!>2243C3C3EFC&&s+I 
L KM 
L Ks   <E'G--
G<r   c           	         [        U5      (       d  [        S5      e[        R                  R                  R                  U5      n[        U 5      n[        5       n[        XXU5      n	U	 n
U(       a  UOSnU(       a  UO0 n[        US5      nU(       a7  UR                  n[        U[        R                  R                  5      (       a  UnUb  [        XvU/UQ70 UD6nO[        U[        R                  R                  5      (       a8  [        UR                  [        R                   R#                  U5      UUUU5      nO.[$        R'                  [)        XU5      5      u  p[+        X~XU5      nU(       a;  [        R,                  R/                  5       (       d   eU
c   eU
R1                  U5      nS S S 5        U$ ! , (       d  f       W$ = f)Nzfunction should be callable.rM   r   )callable	TypeErrorri   r   r   r   r   r   r   r   r   r   r   r   r   rS   r   r   r-   r   r!   r   r   r   r   )r   r6   rpc_typer4   r5   r   r   r   r   r   r   r   r   r   r   r   s                   r/   _invoke_rpcr     s    D>>677YY((66t<N%b)O(*N&K 
t!r&CD66G'599#;#;<<%%?CGMC eii6677)$$##33D9C -=,F,F$f--) )W=C >>335555>!> 2237CO 
P JQ 
P Js   )EF99
Gc                     [         R                  R                  S5        [        X[        R
                  X#U5      nUR                  5       $ )aB
  
Make a blocking RPC call to run function ``func`` on worker ``to``. RPC
messages are sent and received in parallel to execution of Python code. This
method is thread-safe.

Args:
    to (str or WorkerInfo or int): name/rank/``WorkerInfo`` of the destination worker.
    func (Callable): a callable function, such as Python callables, builtin
                     operators (e.g. :meth:`~torch.add`) and annotated
                     TorchScript functions.
    args (tuple): the argument tuple for the ``func`` invocation.
    kwargs (dict): is a dictionary of keyword arguments for the ``func``
                   invocation.
    timeout (float, optional): timeout in seconds to use for this RPC. If
                               the RPC does not complete in this amount of
                               time, an exception indicating it has
                               timed out will be raised. A value of 0
                               indicates an infinite timeout, i.e. a timeout
                               error will never be raised. If not provided,
                               the default value set during initialization
                               or with ``_set_rpc_timeout`` is used.

Returns:
    Returns the result of running ``func`` with ``args`` and ``kwargs``.

Example::
    Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
    on both workers. Refer to :meth:`~torch.distributed.init_process_group`
    API for more details. For example,

    export MASTER_ADDR=localhost
    export MASTER_PORT=5678

    Then run the following code in two different processes:

    >>> # xdoctest: +SKIP
    >>> # On worker 0:
    >>> import torch
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> rpc.shutdown()

    Below is an example of running a TorchScript function using RPC.

    >>> # On both workers:
    >>> @torch.jit.script
    >>> def my_script_add(tensor: torch.Tensor, scalar: int):
    >>>    return torch.add(tensor, scalar)

    >>> # On worker 0:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> rpc.shutdown()

ztorch.distributed.rpc_sync)ri   r   r   r   r"   SYNCrw   r   r6   r4   r5   rm   r   s         r/   r&   r&     s:    J 
HH  !=>
b 0 0$
HC88:r9   c                     [         R                  R                  S5        [        X[        R
                  X#U5      n[        [        S5      (       a  [        R                  R                  U5        U$ )a(  
Make a non-blocking RPC call to run function ``func`` on worker ``to``. RPC
messages are sent and received in parallel to execution of Python code. This
method is thread-safe. This method will immediately return a
:class:`~torch.futures.Future` that can be awaited on.

Args:
    to (str or WorkerInfo or int): name/rank/``WorkerInfo`` of the destination worker.
    func (Callable): a callable function, such as Python callables, builtin
                     operators (e.g. :meth:`~torch.add`) and annotated
                     TorchScript functions.
    args (tuple): the argument tuple for the ``func`` invocation.
    kwargs (dict): is a dictionary of keyword arguments for the ``func``
                   invocation.
    timeout (float, optional): timeout in seconds to use for this RPC. If
                               the RPC does not complete in this amount of
                               time, an exception indicating it has
                               timed out will be raised. A value of 0
                               indicates an infinite timeout, i.e. a timeout
                               error will never be raised. If not provided,
                               the default value set during initialization
                               or with ``_set_rpc_timeout`` is used.


Returns:
    Returns a :class:`~torch.futures.Future` object that can be waited
    on. When completed, the return value of ``func`` on ``args`` and
    ``kwargs`` can be retrieved from the :class:`~torch.futures.Future`
    object.

.. warning ::
    Using GPU tensors as arguments or return values of ``func`` is not
    supported since we don't support sending GPU tensors over the wire. You
    need to explicitly copy GPU tensors to CPU before using them as
    arguments or return values of ``func``.

.. warning ::
    The ``rpc_async`` API does not copy storages of argument tensors until
    sending them over the wire, which could be done by a different thread
    depending on the RPC backend type. The caller should make sure that the
    contents of those tensors stay intact until the returned
    :class:`~torch.futures.Future` completes.

Example::
    Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
    on both workers. Refer to :meth:`~torch.distributed.init_process_group`
    API for more details. For example,

    export MASTER_ADDR=localhost
    export MASTER_PORT=5678

    Then run the following code in two different processes:

    >>> # xdoctest: +SKIP
    >>> # On worker 0:
    >>> import torch
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
    >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
    >>> result = fut1.wait() + fut2.wait()
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> rpc.shutdown()

    Below is an example of running a TorchScript function using RPC.

    >>> # On both workers:
    >>> @torch.jit.script
    >>> def my_script_add(tensor: torch.Tensor, scalar: int):
    >>>    return torch.add(tensor, scalar)

    >>> # On worker 0:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker0", rank=0, world_size=2)
    >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
    >>> ret = fut.wait()
    >>> rpc.shutdown()

    >>> # On worker 1:
    >>> import torch.distributed.rpc as rpc
    >>> rpc.init_rpc("worker1", rank=1, world_size=2)
    >>> rpc.shutdown()
ztorch.distributed.rpc_asyncrh   )
ri   r   r   r   r"   ASYNCr   rg   rh   ry   r   s         r/   r'   r'   ?  sU    r 
HH  !>?
b 1 14
IC -00%%,,S1Jr9   c                      [         R                  R                  R                  n [         R                  R                  5       =(       a5    [         R                  R                  R                  5       U R                  :H  $ r@   )	ri   r   	_profilerActiveProfilerTyper   r   	_autograd_profiler_typeLEGACY)r   s    r/   r   r     sW     ++>>((* 	MHH--/3E3L3LLr9   c                    [         R                  " 5       nU (       a  UcU  [        U[        R                  R
                  5      (       a  [        R                  R                  U5      OUR                  nOUn[        UU[        5       R                  UR                  5      n[        R                  " U5        [        R                  R                  R!                  U5      nU$ r@   )
contextlibnullcontextr   ri   r   r   r   r   rJ   r   r$   rS   r   set_current_profiling_keyr   profilerrecord_function)r   r   r6   r   r   r   	func_namerpc_profiling_keys           r/   r   r     s     ((*K ! dEII$<$<== ##33D9&&  'I4""  	
 	778IJnn--==>OPr9   r@   )T)hcollectionsr   r:   inspectloggingrB   typingr   r   r   r   ri   torch._C._distributed_rpcr   r   r	   r
   r   r   r   r   r   r   r   r   r   r   r   r   r   r   torch.futuresr   _utilsr   r   	constantsr   r   internalr   r    r!   r"   __all__	getLoggerrH   r   r   r-   contextmanagerr0   r<   r)   rZ   rN   __annotations__RLockrY   rO   dictrv   r   defaultdictrP   rW   ra   re   localrg   rl   floatr   r   r   r#   r   r$   r   boolr   r   r   GenericWithOneTypeVarr(   r   	__class__r   r*   
getmembersr   r   
startswithr   r   replacer+   setattrr%   r   r&   r'   r   r   rM   r9   r/   <module>r     s         7 7     ( ! J B 
 
		8	$  (  	1 	1
0 0& "e 3s8 #!) *, c3h ,=H=T=T> !;#:#: 
((   OO%  . .8 "&9J V# V# V#r E E" 6  " $< J JZ#* : :*B
$ 
 -t8< CL
  vay'!* 	671: 	. #--f5Kc""{i'? 	40I X"XX  !!&(DI
  Y7JD+z*1 66 t5F V Vt "$EV7=B7t  @Q F F FR !$8I \ \~A  
	v'')>)H)H 	
	60H 	
s   %I) )1JJ