o
    Zh5                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
 d dlmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z!m"Z" d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- ddl.m/Z/m0Z0m1Z1 g dZ2G dd deZ3ede4d		 		d2de"de+de	ej5 de6de7d e	e% d!e!fd"d#Z8ed$d%e/dddddd&de"d'e
e9ej:df de	e+ d e	e% de	ej5 de7d!e!fd(d)Z;ed$d%dddde3j<d*de"d'e
e9ej:df de	e+ d e	e% de	ej5 d+e3d!efd,d-Z=de"d!e"fd.d/Z>		 		d2de"de+de	ej5 de6de7d e	e% d!e!fd0d1Z?dS )3    N)Future)Enum)castOptionalUnion)
deprecated)_copy_state_dict_create_cpu_state_dict)_AsyncCheckpointExecutor)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)MetadataSTATE_DICT_TYPE)SavePlanSavePlanner)AsyncStager)Stateful)StorageWriter)_get_default_group   )_api_bc_check_DistWrapper_profile)save_state_dictsave
async_saveAsyncCheckpointerTypec                   @   s   e Zd ZdZdZdZdS )r   z!Enum for async checkpointer type.threadprocessN)__name__
__module____qualname____doc__THREADPROCESS r(   r(   \/var/www/auris/lib/python3.10/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr   '   s    r   za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc                 C   sD   |   t  t| |||||W  d   S 1 sw   Y  dS )z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r+   r,   r-   r.   r/   r0   r(   r(   r)   r   .   s   $r   T)Zlog_exceptions)checkpoint_idr,   r0   r-   r/   r4   c                C   s   t jd |pt  pt  }|rtd t  t	t
t||dd}tt| ||||dW  d   S 1 s<w   Y  dS )a  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        no_dist (bool):
            If ``True``, this function will assume the intent is to load
            a checkpoint without using cross-rank synchronization.
            (Default: ``False``)

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.Freader)r+   r,   r-   r/   r0   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r3   _stateful_to_state_dict)r+   r4   r,   r0   r-   r/   r(   r(   r)   r   J   s$   Y$r   )r4   r,   r0   r-   async_checkpointer_typer@   c          
      C   s   t jd t rt r|pt }t d|jv sJ dt	t
t||dd}t| } t|tr8|| }nt| }t| |dd |tjkrKt nt }|j|||||d}	t|trd|jrd|  |	S )a  Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'Fr5   )Z
type_check)r4   r,   r0   r-   )r7   r8   r9   r:   r;   r<   r   ZdeviceZ_device_typesr   r   r   r?   
isinstancer   Zstager	   r   r   r'   r   r   Zexecute_saveZ should_synchronize_after_executeZsynchronize_staging)
r+   r4   r,   r0   r-   r@   ZpgZstaged_state_dictexecutorfr(   r(   r)   r      s>   9


	r   c                 C   s4   i }|   D ]\}}t|tr| n|||< q|S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.)itemsrB   r   r+   )r+   Zstateful_state_dictkeyelemr(   r(   r)   r?     s
   r?   c                    s   t jd t|| |d u rt d usJ d i }tdd  }d ur1||d< j|d< tdi |fdd}tdi |fdd}	d||	 tdi | fd	d
}
tdi |fdd}	d|
|S )Nz,torch.distributed.checkpoint.save_state_dictr4   r-   c                     st   d usJ   } dtjjvr td  j n	j|  jd  j 	 }
|}|S )Nstorage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.)r+   rH   is_coordinator)rH   inspect	signatureZset_up_planner
parametersr=   r>   rI   Zset_up_storage_writerZcreate_local_planZprepare_local_plan)rH   Z
local_plan)distWr0   r+   r,   r(   r)   
local_step>  s    
z$_save_state_dict.<locals>.local_stepc                    s(   d usJ  | \}  | } | S N)Zcreate_global_planZprepare_global_plan)Zall_local_plans)global_metadatar0   r,   r(   r)   global_stepU  s   
z%_save_state_dict.<locals>.global_stepZplanc                     s2   d usJ   } | }|  | S rO   )Zfinish_plan
write_datawaitvalue)Zfinal_local_planZ
all_writes)central_planr0   r,   r(   r)   rR   `  s
   
z$_save_state_dict.<locals>.write_datac                    s    d usJ j  | d  S )N)metadataresults)finish)Zall_results)rP   r,   r(   r)   finish_checkpointi  s   z+_save_state_dict.<locals>.finish_checkpointwriter(   )
r7   r8   r9   r   r   getattrgroupr   Zreduce_scatterZ
all_reduce)r+   r,   r-   r.   r/   r0   Zckpt_kwargsZckpt_idrN   rQ   rR   rY   r(   )rU   rM   rP   r0   r+   r,   r)   r3   (  s(   
r3   )Nr   FN)@rJ   osr=   concurrent.futuresr   enumr   typingr   r   r   Ztyping_extensionsr   r7   Ztorch.distributeddistributedr:   Z#torch.distributed._state_dict_utilsr   r	   Z,torch.distributed.checkpoint._async_executorr
   Z4torch.distributed.checkpoint._async_process_executorr   Z3torch.distributed.checkpoint._async_thread_executorr   Z+torch.distributed.checkpoint._storage_utilsr   Z,torch.distributed.checkpoint.default_plannerr   Z#torch.distributed.checkpoint.loggerr   Z%torch.distributed.checkpoint.metadatar   r   Z$torch.distributed.checkpoint.plannerr   r   Z$torch.distributed.checkpoint.stagingr   Z%torch.distributed.checkpoint.statefulr   Z$torch.distributed.checkpoint.storager   Z"torch.distributed.distributed_c10dr   utilsr   r   r   __all__r   FutureWarningZProcessGroupintboolr   strPathLiker   r&   r   r?   r3   r(   r(   r(   r)   <module>   s   md