o
    Zh~                     @  sP  d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
 d dlZd dlmZ d dlmZ ddlmZmZ dd	lmZ dd
lmZmZmZmZmZmZmZ eeZej !edZ"e
rhddl#m$Z$ d8ddZ%d8ddZ&d8ddZ'd9ddZ(d:ddZ)d;d d!Z*d"d# Z+d$d% Z,d8d&d'Z-d<d*d+Z.d=d-d.Z/d/d0 Z0d>d6d7Z1dS )?    )annotationsN)defaultdict)AnyTYPE_CHECKING)StorageWeakRef)
OrderedSet   )configir)WeakDep)contains_collectivecontains_waitfind_recursive_deps_of_nodefind_recursive_users_of_nodeis_collectiveis_fallback_opis_waitoverlap)BaseSchedulerNodesnodeslist[BaseSchedulerNode]returnc                 C  s   t | ddddS )z7
    Greedily schedules waits as late as possible.
    FTraise_comms
sink_waitsreorder_for_overlap_schedule_for_commr    r   D/var/www/auris/lib/python3.10/site-packages/torch/_inductor/comms.pyr   $      r   c                 C  s   t | ddddS )z8
    Greedily schedules comms as early as possible.
    TFr   r   r   r   r   r    r   -   r!   r   c                 C  s   t | ddddS )a  
    This achieves the following overall scheduling procedure:
        Step 1: Given that we've currently scheduled comm N, we now schedule all compute nodes
            that are required for comm N + 1 but do not depend on comm N, to run at the same time with comm N.
        Step 2: If all those compute nodes are sufficient to overlap comm N, we're done.
            Otherwise, we now need to look elsewhere to find compute that overlaps with comm N.
            We prioritize compute nodes that are needed sooner.
        Step 3: We schedule the compute nodes dependent on comm N and required for comm N + 1.
        Step 4: We schedule comm N + 1.
        Repeat this for subsequent comm nodes.
    Tr   r   r   r   r   r    reorder_compute_for_overlap6   s   r"   boolr   c                   s  i }i i i i 	t | D ]1\}}| D ]}|||< q| D ]}||< q#|| < | }	tj|	< d|	< |	|	< qd}
| D ]4}|rnt|rn|
| < |jD ]}|  }t| |
|< qW|
d7 }
qF|rzt	|rzd| < qFG 	fddd dd | D g t
tdd | D 
 D ]\}}t|dkrt | |D ]	}| | qqg  fdd	fd
d
fdd}trtj}|rt|r|| n| ts݈ D ]\}}t|dksJ d qS )a  
    Schedule `snodes` for various comm optimization objectives.

    Args:
        snodes: the nodes to be scheduled.
        raise_comms: whether to greedily schedule collectives as early as possible
        sink_wait: whether to greedily schedule waits as late as possible
        reorder_compute_for_overlap: whether to reorder compute nodes to
            optimize for compute/communication overlapping.

    Returns:
        The new schedule order.

    Some notes on the synergy between different options:
        - `raise_comms` provides more overlapping oppurtunies for `reorder_compute_for_overlap`.
        - When both `raise_comms` and `sink_waits` is `True`, `raise_comms` is prioritized.
    r   r   c                      s(   e Zd Zd fddZdd ZdS )	z$_schedule_for_comm.<locals>.Runnabler   Nonec                   s>   || _ tt| } |  }| | | f| _d S N)snodenextiterget_operation_namesget_namescore)selfr&   nameZ
fused_namename_to_fused_nodescores_0scores_1scores_2r   r    __init__   s   
z-_schedule_for_comm.<locals>.Runnable.__init__c                 S  s   | j |j k S r%   r+   )r,   otherr   r   r    __lt__   s   z+_schedule_for_comm.<locals>.Runnable.__lt__N)r   r$   )__name__
__module____qualname__r3   r6   r   r.   r   r    Runnable   s    
r:   c                 S  s"   i | ]}|t d d |jD qS )c                 s  s    | ]}|j V  qd S r%   )r-   ).0depr   r   r    	<genexpr>   s    z0_schedule_for_comm.<locals>.<dictcomp>.<genexpr>)r   Zunmet_dependenciesr;   r&   r   r   r    
<dictcomp>   s    z&_schedule_for_comm.<locals>.<dictcomp>c                 S  s   i | ]}|t |qS r   )estimate_op_runtimer>   r   r   r    r?      s    c                   sX    |  |  D ] }| D ]} |  | t|  dkr(t |  qq	dS )zU
        Schedules `snode` and put all unblocked nodes onto the ready queue.
        r   N)appendget_buffer_namesremovelenheapqheappush)r&   buf_name)r:   buffer_usersready	scheduled
unmet_depsr   r    schedule   s   
z$_schedule_for_comm.<locals>.schedulec                    s.   dd  D } t | dkrdS t| dd dS )zh
        Return the next node in the ready queue that's neither a collective or
        a wait.
        c                 S  s$   g | ]}t |jst|js|qS r   )r   r&   r   r;   xr   r   r    
<listcomp>   s    zI_schedule_for_comm.<locals>.get_overlapping_candidate.<locals>.<listcomp>r   Nc                 S  s   | j S r%   r4   rN   r   r   r    <lambda>   s    zG_schedule_for_comm.<locals>.get_overlapping_candidate.<locals>.<lambda>key)rD   min)
candidates)rI   r   r    get_overlapping_candidate   s   z5_schedule_for_comm.<locals>.get_overlapping_candidatec                   sx   t | sJ |  |  }|dkr5   }dur5| |j ||j 8 }|dkr5   }dust dS )z
        Schedules collective node `snode`, along with one or more compute nodes
        to overlap with it. The strategy is described in the comment of
        `reorder_compute_for_overlap`.
        r   N)r   rC   r&   rE   heapify)r&   Zcollective_cost	candidate)rV   rI   rL   snode_to_costr   r    schedule_collective_for_overlap   s   

z;_schedule_for_comm.<locals>.schedule_collective_for_overlapz;Detected unscheduled nodes. Nodes with unmet dependencies: )	enumeraterB   r)   r*   sysmaxsizer   	ancestorsrT   r   r   r   itemsrD   rE   rF   addheappopr&   )r   r   r   r   Zbuf_name_to_snodeidxr&   rG   Zop_name	node_nameZcomm_idxZancZanc_fused_namedepsr<   rZ   r   )r:   rH   rV   r/   rI   rL   rJ   r0   r1   r2   rY   rK   r    r   I   sh   &






r   nodesc                 C  st   t j s| S dd | D }tdt|D ]"}tt||  }||d   D ]}|| t	||d q)q| S )z
    Decide global ordering of comms, by just enforcing the ordering that's in the input graph
    (might not be the same ordering as the eager mode program).
    TODO: Come up with a better approach
    c                 S  s   g | ]}t |r|qS r   )r   )r;   nr   r   r    rO      s    z3decide_global_ordering_of_comms.<locals>.<listcomp>r   mutating_buf)
torchdistributedis_availablerangerD   r'   r(   rB   add_fake_depr   )re   name_to_bufr/   Z
comm_nodesirh   bufr   r   r    decide_global_ordering_of_comms   s   
rq   r&   r   floatc                 C  s2   t jdkr|  }|S tt jsJ t | }|S )z:
    Returns estimated op runtime in nanoseconds (ns)
    default)r	   r@   Zget_estimated_runtimecallable)r&   Zruntimer   r   r    r@      s   

r@   c                 C  s~   d}t | jtjrd| jj d}d}| j }t |tjr)d|j d|j d}| j	 p/d}| jj
j | | d| dS )N z ()z (size=z	, stride=)
isinstancenoder
   ZExternKernelOutZpython_kernel_nameZget_output_specZLayoutsizeZstrideZmaybe_get_name	__class__r7   )r&   detailZout_tensor_infoZlayoutrc   r   r   r    node_summary  s   
r|   c                 C  s   d}d }| D ]N}|d u r2t |r|t|7 }|j}nt|jr#td|t|7 }tt|  qt |r:tdt|jrJtt|  d }qtdt|  qtd|d d   d S )Ng        z8Wait is not expected when there is no collective runningzkFound two collectives running at the same time. `visualize_overlap` needs to be updated to handle this casez| zEst. runtime (ms): i  )r   r@   rx   r   AssertionErroroverlap_logdebugr|   )orderZtotal_est_runtimeZcur_comm_noder&   r   r   r    visualize_overlap  s0   

r   c                 C  s   | }t jD ]u}t|tr|t v rt | }tj dkrFt	d| d zt
| W n tyE } zt	t| W Y d }~nd }~ww ||}tj dkrzt	d| d zt
| W q tyy } zt	t| W Y d }~qd }~ww q|S )Nr   z.==== Visualize overlap before reordering pass z ====z-==== Visualize overlap after reordering pass )r	   Z'reorder_for_compute_comm_overlap_passesrw   strglobalsri   rj   Zget_rankr~   r   r   	Exception)r   r   per   r   r    $reorder_compute_and_comm_for_overlap0  s6   



r   graphtorch.fx.Graphc              
     s4  t | jtt tt  tD ]E\}}|jdkrV|jtjjj	j
krV|jd jdks9J d| d|jd  d|jd }|jd }|dkrO| | q | | q fdd	}tt }tD ]4\}}|jdkr|jtjjjj
kr|}|jd jdksJ d
 d|  d|r| | qgdd }dd D ]'}|jdkrt|jtjjr|jjjr||s͈|| rJ d| dq| D ]\}	t|	D ]|\}
}| }|jd u sJ |j\}|d }|
t|	d k r|	|
d  ntd }|| }tfdd|D r*J d d| d|  d|D ])}|jdkrT|jv rT|jtjjj	j
krTtfdd|jD }||_q,qq| D ]\}	t|	D ]\}
}| }| | qdq\D ] }|jdkr|jtjjj	j
kr|jd |v r| | qwdS )a  
    This FX graph pass replaces uses of FSDP2 unsharded params with their corresponding
    graph intermediates that were fsdp.copy_ into the unsharded params in the original graph.

    NOTE: Can only apply this pass to any of the FSDP2 unsharded params that have this pattern
    (or repetition of): `resize_(full) -> copy_ -> resize_(0)`. Because of this, for partial-graph case
    where `resize_(full) -> copy_` is in one graph and `resize_(0)` is in another graph, we can't
    remove these resize and copy ops and thus we will have worse performance there.

    In other words, "do we try to remove all the resize_(full) -> copy_ -> resize_(0) nodes for this unsharded param"
    is actually a per-unsharded-param decision, since for each unsharded param, we look at its resize sequence pattern
    (in `check_resize_pattern()`) to determine if its set of resize and copy nodes can be removed.
    Zcall_functionr   placeholderz1Resize can only operate on graph inputs, but got z# which is resizing non-graph-input 
r   c                   s    | g }  | g }t|t|ks)td|  dt| dt| d dS t||D ]$\}}||krRtd|  d|  d| d	|  d| d
  dS q.dS )NzH
Unequal number of resize-to-full and resize-to-0 nodes for graph input z:
z vs. zK.
Skipping `remove_fsdp2_unsharded_param_graph_input_usage` FX graph pass.
Fz
For graph input z: resize-to-full node z
 at index z 
happens after resize-to-0 node zd.
Skipping `remove_fsdp2_unsharded_param_graph_input_usage` FX graph pass for that unsharded param.
T)getrD   logwarningzip)graph_inputZresized_to_full_idxesZresized_to_0_idxesZresize_to_full_idxZresize_to_0_idx)&graph_input_to_resized_to_0_node_idxes)graph_input_to_resized_to_full_node_idxes	node_listr   r    check_resize_patternn  sF   	zLremove_fsdp2_unsharded_param_graph_input_usage.<locals>.check_resize_patternz\
Assumed all FSDP2 `unsharded_param`s to be graph input, but it's not true!
Offending node: z	. Graph: c                 S  s$   | j tjjjjkp| j tjjjjkS r%   )targetri   opsfsdpcopy_rs   inductorresize_storage_bytes_rx   r   r   r    is_allowed_mutation  s   zKremove_fsdp2_unsharded_param_graph_input_usage.<locals>.is_allowed_mutationc                   sd   t  jtjjrdd t jjjD ng }t fdd|D }tdd |D }t	||@ dkS )Nc                 S  s&   g | ]\}}|j d ur|j jr|qS r%   )Z
alias_infoZis_write)r;   ro   rN   r   r   r    rO     s
    zyremove_fsdp2_unsharded_param_graph_input_usage.<locals>.is_node_mutating_unsharded_param_or_its_alias.<locals>.<listcomp>c                   s$   g | ]}t  j| jd   qS val)r   argsmetauntyped_storage)r;   ro   r   r   r    rO     s    c                 S  s   g | ]}t |jd   qS r   )r   r   r   )r;   unsharded_paramr   r   r    rO     s    r   )
rw   r   ri   _ops
OpOverloadr[   _schema	argumentsr   rD   )rx   Zunsharded_paramsZmutated_arg_idxesZmutated_node_arg_storagesZstorages_of_unsharded_paramsr   r   r    -is_node_mutating_unsharded_param_or_its_alias  s"   	
zeremove_fsdp2_unsharded_param_graph_input_usage.<locals>.is_node_mutating_unsharded_param_or_its_aliaszdUser mutation on FSDP2 unsharded param is not allowed when Traceable FSDP2 is used. Violating node: c                 3  s    | ]	} |gV  qd S r%   r   )r;   rx   )r   r   r   r    r=     s
    

zAremove_fsdp2_unsharded_param_graph_input_usage.<locals>.<genexpr>z(Assumed no ops mutating unsharded param z in subgraph z, but it's not true!
Graph: c                 3  s     | ]}|u r
 n|V  qd S r%   r   )r;   arg)replacementr   r   r    r=     s
    
N)listre   r   r[   opr   ri   r   r   r   rs   r   rA   r   r   rw   r   r   r   Z
is_mutablekeysr_   rD   anytuple
erase_node)r   rb   rx   r   new_sizer   Z'unsharded_param_to_fsdp_copy_node_idxesZfsdp_copy_noder   Zfsdp_copy_node_idxesro   Zfsdp_copy_node_idx_Zsubgraph_start_idxZsubgraph_end_idxZsubgraph_nodesnew_argsr   )r   r   r   r   r   r   r    .remove_fsdp2_unsharded_param_graph_input_usageL  s   




%




$
r   r$   c           	        s  zdd l   j sJ  jjjr jjjsJ W n ttt	fy&   Y d S w ddl
m}m}m}m}m} 	  fdd}| }|| jjjj|tj| jjjj|d|d|d|d	|d
|d|d|d|d|d|dd dd fdd}||  ||  d S )Nr   r   )CallFunction
KeywordArgMatchPatternMatcherPassregister_graph_patternc                   sT   t | j}|D ] }|jtjkr'|jd j jjjj	u r'|jd dkr'| 
| qd S )Nr   r   )r   re   r   operatorgetitemr   r   r   all_gather_copy_inrs   r   )gr   rf   ri   r   r    remove_unused_getitem5  s   

z8reinplace_fsdp_all_gather.<locals>.remove_unused_getitemall_gather_inputsinp_split_sizesall_gather_input_numel
world_sizerankdtypedeviceitem_idx
group_size
group_namec                 S  s   | j d dkS )Nr   r   )kwargs)matchr   r   r    rQ   W  s    z+reinplace_fsdp_all_gather.<locals>.<lambda>)Z	pass_dictZextra_checkr   r   c                   sR    fdd}|  ||d |d |d |d |d |d |d	 |d
 |d g	 d S )Nc                    sX   | d d }| d }| d } j jjj| }|d }|d } j jjj||||d}|S )Nr   r   )out)r   r   r   rs   _c10d_functionalall_gather_into_tensor_out)r   Zcopy_in_argsr   r   r   r   Z	getitem_1all_gather_into_tensorr   r   r    replZ  s   

zEreinplace_fsdp_all_gather.<locals>.reinplace_all_gather.<locals>.replr   r   r   r   r   r   r   r   r   )Zreplace_by_example)r   r   r   r   r   r   r    reinplace_all_gatherB  s   z7reinplace_fsdp_all_gather.<locals>.reinplace_all_gather)r   r   )Z5torch.distributed.fsdp._fully_shard._fsdp_collectivesrj   rk   r   r   r   r   ImportErrorAttributeErrorr}   Zpattern_matcherr   r   r   r   r   rs   r   r   r   r   apply)	r   r   r   r   r   r   r   Z
graph_passr   r   r   r    reinplace_fsdp_all_gather  sP   



"r   c                 C  s2   t | tjjjtjjjfrJ t|  dd  S )N   )rw   ri   Z	_inductor	schedulerZFusedSchedulerNodeGroupedSchedulerNodeintr*   )r&   r   r   r    
get_op_idx  s   r   1list[torch._inductor.scheduler.BaseSchedulerNode]rn   4dict[str, torch._inductor.scheduler.SchedulerBuffer]r/   dict[str, BaseSchedulerNode]c              	     s  ddl m g }tt  }d}d}i }i }i fdd}	| D ]}
t|
jtjjj	j
drtfdd|
jD rd	}|
}t }t||| ttjjj	j
tjjjj
tjjjj
g t||| fd
dd t|dd d}t|}d}tt|D ]}|| }t|jtjjjj
r|d7 }|dkr|} nq}|d | }d }tt|d D ]}t||d  jtjr|d } nq|d usJ |	|d | }|	||d  }|||< q t|
jtjjjj
r5d	}|
}t }t||| t|dd d}d }tt|d D ]}t||d  jtjr|d } nq|d us!J |	|d | }|	||d  }|||< q tdks?J |rKt|dksKJ |rWt|dksWJ | D ]!}
|
 v rh|
  }
|
|v roqY||
 ||
 qYd }| D ]'\}}|d urtt|  }|! D ]}|"t#| |d q|}qd }| D ]'\}}|d urtt|  }|! D ]}|"t#| |d q|}q|S )Nr   )r   Fc                   s2    j | }| D ]}|| < q|| < |S r%   )r   creater*   )Zsnodes_to_groupZ
group_noder&   )r   snode_name_to_final_snoder   r    _create_group_node  s
   z:enforce_comm_ordering_for_fsdp.<locals>._create_group_node)r   c                 3  s(    | ]}t  | jtjjjjV  qd S r%   )r   rx   ri   r   r   r   rs   rM   )r/   r   r    r=     s    
z1enforce_comm_ordering_for_fsdp.<locals>.<genexpr>Tc                   s&   t | jpt | jo| jj v  S r%   )rw   ZNopKernelSchedulerNodeZExternKernelSchedulerNoderx   Zop_overloadrP   )allowed_opsr   r   r    rQ     s   
z0enforce_comm_ordering_for_fsdp.<locals>.<lambda>)Zcriteria_cbc                 S     t | S r%   r   rP   r   r   r    rQ         rR   r   c                 S  r   r%   r   rP   r   r   r    rQ      r   rg   )$ru   r   r   r   r   rx   ri   r   r   r   rs   r   r^   r   Zwait_tensorr   Zsplit_with_sizes_copyr   sortedrD   rl   r   rw   r
   Z_WaitKernelZ	chunk_catr*   rA   r`   r_   r'   r(   rB   Zget_outputsrm   r   )r   rn   r/   Z	new_orderrJ   Z	ag_existsZ	rs_existsZ$ag_grouped_node_to_wait_grouped_nodeZ$rs_grouped_node_to_wait_grouped_noder   r&   Zag_snodeZag_related_snode_setZag_related_snodesZend_idx_of_current_ag_blockZcopy_out_countro   Z	cur_snodeZwait_node_idxZag_group_nodeZag_wait_group_nodeZrs_snodeZrs_related_snode_setZrs_related_snodesZrs_group_nodeZrs_wait_group_nodeZprev_ag_waitZwait_group_noderh   oZprev_rs_waitr   )r   r/   r   r   r    enforce_comm_ordering_for_fsdp  s   









r   )r   r   r   r   )
r   r   r   r#   r   r#   r   r#   r   r   )re   r   r   r   )r&   r   r   rr   )r   r   )r   r   r   r$   )r   r   rn   r   r/   r   r   r   )2
__future__r   rE   loggingr   r\   collectionsr   typingr   r   ri   Z torch.multiprocessing.reductionsr   Ztorch.utils._ordered_setr   ru   r	   r
   dependenciesr   utilsr   r   r   r   r   r   r   	getLoggerr7   r   Z_loggingZgetArtifactLoggerr~   r   r   r   r   r"   r   rq   r@   r|   r   r   r   r   r   r   r   r   r   r    <module>   s@   $


	
	
 



 
Eo