o
    wZh                     @   st   d dl mZ d dlmZ d dlmZ d dlmZ ddgZG dd deZ	G dd	 d	eZ
ed
G dd de
ZdS )    )Sized)IntEnum)functional_datapipe)IterDataPipeSHARDING_PRIORITIESShardingFilterIterDataPipec                   @   s   e Zd ZdZdZdZdS )r            N)__name__
__module____qualname__DEFAULTZDISTRIBUTEDZMULTIPROCESSING r   r   W/var/www/auris/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/sharding.pyr      s    c                   @   s"   e Zd ZdededefddZdS )_ShardingIterDataPipenum_of_instancesinstance_idsharding_groupc                 C   s   t N)NotImplementedErrorselfr   r   r   r   r   r   apply_sharding   s   z$_ShardingIterDataPipe.apply_shardingN)r   r   r   intr   r   r   r   r   r   r      s    r   Zsharding_filterc                   @   sF   e Zd ZdZddefddZejfddZdd	 Z	d
d Z
dd ZdS )r   ao  
    Wrapper that allows DataPipe to be sharded (functional name: ``sharding_filter``).

    After ``apply_sharding`` is called, each instance of the DataPipe (on different workers) will have every `n`-th element of the
    original DataPipe, where `n` equals to the number of instances.

    Args:
        source_datapipe: Iterable DataPipe that will be sharded
    Nsource_datapipec                 C   s*   || _ || _i | _d| _d| _|   d S )Nr   r   )r   sharding_group_filtergroupsr   r   _update_num_of_instances)r   r   r   r   r   r   __init__+   s   z#ShardingFilterIterDataPipe.__init__c                 C   sv   ||krt d| d| d|tjkr$t| jr#tj| jvr#tdn
tj| jv r.td||f| j|< |   d S )Nzinstance_id(z*) should be smaller than num_of_instances()z8ShardingFilter cannot mix DEFAULT and non DEFAULT groups)
ValueErrorr   r   lenr   RuntimeErrorr   r   r   r   r   r   3   s    
z)ShardingFilterIterDataPipe.apply_shardingc                    sd    fddt  j D }|  d _d _|D ]\}}  j j| 7  _  j|9  _qd S )Nc                    s*   g | ]} j d u s| j kr j| qS r   )r   r   ).0keyr   r   r   
<listcomp>H   s
    zGShardingFilterIterDataPipe._update_num_of_instances.<locals>.<listcomp>r   r   )sortedr   keysreverser   r   )r   Zsorted_sharding_groupsZgroup_num_of_instancesZgroup_instance_idr   r&   r   r   G   s   
z3ShardingFilterIterDataPipe._update_num_of_instancesc                 c   s0    t | jD ]\}}|| j | jkr|V  qd S r   )	enumerater   r   r   )r   iitemr   r   r   __iter__W   s   z#ShardingFilterIterDataPipe.__iter__c                 C   sP   t | jtrt| j| j | jt| j| j k rd S d S tt| j d)Nr   r   z# instance doesn't have valid length)	
isinstancer   r   r"   r   r   	TypeErrortyper   r&   r   r   r   __len__\   s   z"ShardingFilterIterDataPipe.__len__r   )r   r   r   __doc__r   r   r   r   r   r   r.   r2   r   r   r   r   r      s    
	
N)collections.abcr   enumr   Z%torch.utils.data.datapipes._decoratorr   Z#torch.utils.data.datapipes.datapiper   __all__r   r   r   r   r   r   r   <module>   s   
