a
    hF?                  
   @   s  d dl Z d dlZd dlmZmZ d dlZd dlmZ d dlm	Z	m
Z
 g dZe eZdZG dd dZG d	d
 d
eZeedejZd ZG dd dZG dd dZdd Zdeedf eeeef  eeeedf  eeeef  eee ee f dddZee dddZdS )    N)AnyOptionalmap_aggregate)tree_flattentree_unflatten)TensorChunkSpecsplit_args_kwargs_into_chunksmerge_chunksFc                   @   s   e Zd ZdZdd ZdS )_CustomReducera$  
    Custom reducer class that can be used to specify a custom operation that
    reduces losses of multiple microbatches into one value.

    Example:
    >>> # xdoctest: +SKIP
    >>> sum_reducer = _CustomReducer(
    >>>     torch.tensor(0.0),
    >>>     lambda a, b: a + b
    >>> )
    c                 C   s   || _ || _d S N)
init_value	reduce_fn)selfr   r    r   U/var/www/auris/lib/python3.9/site-packages/torch/distributed/pipelining/microbatch.py__init__)   s    z_CustomReducer.__init__N)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   c                   @   s   e Zd ZdS )_LossReducerNr   r   r   r   r   r   r   r   .   s   r   g        c                   @   sf   e Zd ZU dZdd Zeed< dd Zdd Ze	e
ed	f d
ddZe	eeef d
ddZdS )r   z2
    Class used to specify chunking of inputs
    c                 C   s
   || _ d S r   	split_dim)r   r   r   r   r   r   >   s    zTensorChunkSpec.__init__r   c                 C   s    | j j d| j j d| j dS )N.())	__class__r   r   r   r   r   r   r   __repr__C   s    zTensorChunkSpec.__repr__c                 C   s   d| j  dS )NzTensorChunkSpec(r   r   r   r   r   r   __str__H   s    zTensorChunkSpec.__str__.)
chunk_dimsc                 C   s   t | dd }|S )a  
        A helper for creating a tuple of `TensorChunkSpec` from a tuple of chunk
        dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # There are three positional arguments to the model, and
            >>> # we are chunking them along dimension 0, 0 and 1, respectively
            >>> args_chunk_spec = TensorChunkSpec.from_tuple((0, 0, 1))
        c                 S   s   t | S r   r   dimr   r   r   <lambda>Z       z,TensorChunkSpec.from_tuple.<locals>.<lambda>r   )r"   args_chunk_specr   r   r   
from_tupleK   s
    zTensorChunkSpec.from_tuplec                 C   s   t | dd }|S )a\  
        A helper for creating a dictionary of `TensorChunkSpec` from a
        dictionary of chunk dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # Chunk dimension 0 for the "id" argument, 1 for the "mask" argument
            >>> kwargs_chunk_spec = TensorChunkSpec.from_dict({"id": 0, "mask": 1})
        c                 S   s   t | S r   r#   r$   r   r   r   r&   l   r'   z+TensorChunkSpec.from_dict.<locals>.<lambda>r   )r"   kwargs_chunk_specr   r   r   	from_dict^   s
    zTensorChunkSpec.from_dictN)r   r   r   r   r   int__annotations__r    r!   staticmethodtupler)   dictstrr+   r   r   r   r   r   9   s   


r   c                   @   s   e Zd ZdS )
_ReplicateNr   r   r   r   r   r2   r   s   r2   c           !         s  i }g }|}d}t | t |ksDJ dt|   dt|  |  D ]\}}t|\}	}
||
 || }|dusJ t|\}}t |	t |krtd| d| g }t|	|D ]f\}}|tu st	|t
js||g|  qt	|trt	|t
jsJ | d||j}||k rr|rVtd| d	| d
| d |}ntd| d| d| dt
|||j}trg }d}|D ]f}t
|}|||j }tdddg|j }t||||j< |||< || |||j7 }q|| n
|| d}qtd| q|||< qLg }t|D ]B i }| D ]$\}} fdd|D }|||< qN|| q>g }|D ]V}i }t |t |ksJ t| |D ]\\}}} t|| ||< q|| q|S )aW  
    Given a dictionary of args, and a dictionary of chunking specs, shard the
    args according to the chunking specs.

    Args:
        args_dict: Dictionary of args
        args_chunk_spec: Dictionary of chunking specs
        num_chunks: Number of chunks to shard the args into

    Returns:
        args_split: List of sharded args
    Tzargs_dict.keys() = z args_chunk_spec.keys() = NzArgument value z9 did not have the same number of values as as chunk spec z is not a tensorz%Tensor size on chunking dimension is z', downsizing the number of chunks from z to r   zArg z% on chunking dimension has a size of z$, smaller than the number of chunks z. PiPPy cannot reduce the number of chunks because other arguments have bigger chunk-dimension sizes. Please adjust your num_chunks setting.r   FzUnrecognized chunk spec: c                    s   g | ]}|  qS r   r   ).0Zv_flat	chunk_idxr   r   
<listcomp>   r'   z'_shard_dict_of_args.<locals>.<listcomp>)lenlistkeysitemsr   append
ValueErrorzipr2   
isinstancetorchZTensorr   sizer   loggerwarningRuntimeErrortensor_split_debug_mask_minibatchesZ
zeros_likeslicendim	TypeErrorranger   )!Z	args_dictr(   Z
num_chunksZargs_sharded_replicatedZ	arg_specsreal_num_chunksZfirst_tensorZarg_keyargZflatspec
chunk_specZchunk_spec_flat_Zsharded_arg_flatvZchunk_vZv_split_dim_sizeZchunk_tensorsZexpanded_chunksZsplit_dim_idxZchunk_tensornew_valZ	upper_idxslice_indicesZchunks_flat
chunk_argskeyZarg_single_chunk
args_splitchunkZper_chunk_argsZarg_specr   r4   r   _shard_dict_of_argsv   s    






rV   .)argskwargschunksr(   r*   returnc           	      C   s   |du ri }|du r&t tft|  }|du r>t|t t}ttt| tt||}t|}t|||}t||k rt|}ttt| tt||}t|t|krtdt| dt| dd |D }||fS )a  
    Given a sequence of args and kwargs, split them into a number of chunks
    according to  their respective chunking specs.

    Args:
        args: Tuple of args
        kwargs: Dict of kwargs
        chunks: Number of chunks to split the args and kwargs into
        args_chunk_spec: chunking specs for args, in same shape as args
        kwargs_chunk_spec: chunking specs for kwargs, in same shape as kwargs

    Returns:
        args_split: List of sharded args
        kwargs_split: List of sharded kwargs
    Nz;args and kwargs are split into different number of chunks: z, c                    s*   g | ]" t  fd dtt D qS )c                 3   s   | ]} | V  qd S r   r   )r3   irR   r   r   	<genexpr>W  r'   z;split_args_kwargs_into_chunks.<locals>.<listcomp>.<genexpr>)r/   rI   r7   )r3   r   r\   r   r6   V  s   z1split_args_kwargs_into_chunks.<locals>.<listcomp>)r   DEFAULT_CHUNK_DIMr7   r0   fromkeysrV   	enumeraterC   )	rW   rX   rY   r(   r*   Zargs_split_dictrJ   Zkwargs_splitrT   r   r   r   r	      sH    8



r	   )rY   c                    s@  |durt |\}}n"t | d \}}ttgt| }g | D ]>}t |\}}t|t|krttd| d| | q@g }t|D ]\ }	t|	tr fddttD }
t	r|
d j
}|
dd D ]}|j
|ksJ qtjtj|dd	it|
|	jd
}g }d}t|
t|ks.J t|
|D ]T\}}|||	j }tdddg|j }t||||	j< || }|| |}q8n|
}|tj||	jd qt|	tr|	j}ttD ]}|	||   }q|| qd   }tdtD ]}|   |ksJ q|| qt||S )z
    Given a list of chunks, merge them into a single value according to
    the chunk spec.

    Args:
        chunks: list of chunks
        chunk_spec: Chunking spec for the chunks

    Returns:
        value: Merged value
    Nr   zChunk z did not match chunk spec c                    s   g | ]}|   qS r   r   )r3   r5   Zarg_idxZchunks_flattenedr   r   r6     s   z merge_chunks.<locals>.<listcomp>   Zdevicemeta)sectionsr%   r$   )r   r   r^   r7   r<   r;   r`   r>   rI   rE   shaper?   rD   emptyr   r=   r@   rF   rG   catr   r   r   r   )rY   rM   Zspec_flattenedZflatten_specZchunk0_flatrU   Zchunk_flattenedrN   Zargs_flattenedrK   Zpartial_valuesZoverall_shapevalZmeta_chunksZvalues_to_catZchunk_start_idxZpartial_valueZ
meta_chunkZchunk_end_idxrQ   ZslicedZreduced_valr5   valuer   ra   r   r
   ^  sb    -



r
   )NN) loggingoperatortypingr   r   r?   Ztorch.fx.noder   Ztorch.utils._pytreer   r   __all__	getLoggerr   rA   rE   r   r   ZtensoraddZsum_reducerr^   r   r2   rV   r/   r0   r1   r,   r8   r	   r
   r   r   r   r   <module>   s8   
9   
j