o
    wZhB0                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZmZ d dlmZ g dZe
d	d
dZdefddZedG dd dee ZedG dd deZedG dd dee ZdS )    N)defaultdict)IteratorSized)AnyCallableOptionalTypeVar)functional_datapipe)	DataChunkIterDataPipe)_check_unpickable_fn)BatcherIterDataPipeGrouperIterDataPipeUnBatcherIterDataPipe_T_coT)	covariantnamec                 C   sN   | dv rt jd|  d|  dtdd ttjjjjj	| S t
dt d|  )	N)ZSHARDING_PRIORITIESZShardingFilterIterDataPipe`zc` from `torch.utils.data.datapipes.iter.grouping` is going to be removed in PyTorch 2.1Please use `z5` from the `torch.utils.data.datapipes.iter.sharding`   )category
stacklevelzmodule z has no attribute )warningswarnFutureWarninggetattrtorchutilsdataZ	datapipesiterZshardingAttributeError__name__)r    r!   W/var/www/auris/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/grouping.py__getattr__   s   r#   batchc                       sz   e Zd ZU dZeed< eed< eed< defdededede	e ddf
 fd	d
Z
dee fddZdefddZ  ZS )r   a2  
    Creates mini-batches of data (functional name: ``batch``).

    An outer dimension will be added as ``batch_size`` if ``drop_last`` is set to ``True``, or ``length % batch_size`` for the
    last batch if ``drop_last`` is set to ``False``.

    Args:
        datapipe: Iterable DataPipe being batched
        batch_size: The size of each batch
        drop_last: Option to drop the last batch if it's not full
        wrapper_class: wrapper to apply onto each batch (type ``List``) before yielding,
            defaults to ``DataChunk``

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp = IterableWrapper(range(10))
        >>> dp = dp.batch(batch_size=3, drop_last=True)
        >>> list(dp)
        [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    datapipe
batch_size	drop_lastFwrapper_classreturnNc                    s6   |dksJ dt    || _|| _|| _|| _d S )Nr   z+Batch size is required to be larger than 0!)super__init__r%   r&   r'   r(   )selfr%   r&   r'   r(   	__class__r!   r"   r+   A   s   

zBatcherIterDataPipe.__init__c                 c   sd    g }| j D ]}|| t|| jkr| |V  g }qt|dkr.| js0| |V  d S d S d S Nr   )r%   appendlenr&   r(   r'   )r,   r$   xr!   r!   r"   __iter__O   s   

zBatcherIterDataPipe.__iter__c                 C   sP   t | jtr| jrt| j| j S t| j| j d | j S tt| j d)N   z# instance doesn't have valid length)	
isinstancer%   r   r'   r1   r&   	TypeErrortyper    r,   r!   r!   r"   __len__Z   s
   zBatcherIterDataPipe.__len__)r    
__module____qualname____doc__r   __annotations__intboolr
   r7   r+   r   r3   r9   __classcell__r!   r!   r-   r"   r   %   s(   
 r   Zunbatchc                   @   s4   e Zd ZdZddedefddZdd Zd	d
 ZdS )r   a   
    Undos batching of data (functional name: ``unbatch``).

    In other words, it flattens the data up to the specified level within a batched DataPipe.

    Args:
        datapipe: Iterable DataPipe being un-batched
        unbatch_level: Defaults to ``1`` (only flattening the top level). If set to ``2``,
            it will flatten the top two levels, and ``-1`` will flatten the entire DataPipe.

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> source_dp = IterableWrapper([[[0, 1], [2]], [[3, 4], [5]], [[6]]])
        >>> dp1 = source_dp.unbatch()
        >>> list(dp1)
        [[0, 1], [2], [3, 4], [5], [6]]
        >>> dp2 = source_dp.unbatch(unbatch_level=2)
        >>> list(dp2)
        [0, 1, 2, 3, 4, 5, 6]
    r4   r%   unbatch_levelc                 C   s   || _ || _d S N)r%   rA   )r,   r%   rA   r!   r!   r"   r+   |   s   
zUnBatcherIterDataPipe.__init__c                 c   s(    | j D ]}| j|| jdE d H  qd S )NrA   )r%   _diverA   )r,   elementr!   r!   r"   r3      s   
zUnBatcherIterDataPipe.__iter__c                 c   s    |dk r	t d|dkr*t|ttfr%|D ]}| j|ddE d H  qd S |V  d S |dkr3|V  d S t|ttfrM|D ]}| j||d dE d H  q<d S td| j d)Nz unbatch_level must be -1 or >= 0rC   r   r4   zunbatch_level z" exceeds the depth of the DataPipe)
ValueErrorr5   listr
   rD   
IndexErrorrA   )r,   rE   rA   itemr!   r!   r"   rD      s$   

zUnBatcherIterDataPipe._diveN)r4   )	r    r:   r;   r<   r   r>   r+   r3   rD   r!   r!   r!   r"   r   d   s
    r   groupbyc                   @   s   e Zd ZdZdddddddee deegef ded	e	d
e
e	 de
e	 defddZdd Zdd ZdddZdd Zdd Zdd ZdS )r   a!
  
    Groups data from IterDataPipe by keys from ``group_key_fn``, yielding a ``DataChunk`` with batch size up to ``group_size``.

    (functional name: ``groupby``).

    The samples are read sequentially from the source ``datapipe``, and a batch of samples belonging to the same group
    will be yielded as soon as the size of the batch reaches ``group_size``. When the buffer is full,
    the DataPipe will yield the largest batch with the same key, provided that its size is larger
    than ``guaranteed_group_size``. If its size is smaller, it will be dropped if ``drop_remaining=True``.

    After iterating through the entirety of source ``datapipe``, everything not dropped due to the buffer capacity
    will be yielded from the buffer, even if the group sizes are smaller than ``guaranteed_group_size``.

    Args:
        datapipe: Iterable datapipe to be grouped
        group_key_fn: Function used to generate group key from the data of the source datapipe
        keep_key: Option to yield the matching key along with the items in a tuple,
            resulting in `(key, [items])` otherwise returning [items]
        buffer_size: The size of buffer for ungrouped data
        group_size: The max size of each group, a batch is yielded as soon as it reaches this size
        guaranteed_group_size: The guaranteed minimum group size to be yielded in case the buffer is full
        drop_remaining: Specifies if the group smaller than ``guaranteed_group_size`` will be dropped from buffer
            when the buffer is full

    Example:
        >>> import os
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> def group_fn(file):
        ...     return os.path.basename(file).split(".")[0]
        >>> source_dp = IterableWrapper(["a.png", "b.png", "a.json", "b.json", "a.jpg", "c.json"])
        >>> dp0 = source_dp.groupby(group_key_fn=group_fn)
        >>> list(dp0)
        [['a.png', 'a.json', 'a.jpg'], ['b.png', 'b.json'], ['c.json']]
        >>> # A group is yielded as soon as its size equals to `group_size`
        >>> dp1 = source_dp.groupby(group_key_fn=group_fn, group_size=2)
        >>> list(dp1)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
        >>> # Scenario where `buffer` is full, and group 'a' needs to be yielded since its size > `guaranteed_group_size`
        >>> dp2 = source_dp.groupby(group_key_fn=group_fn, buffer_size=3, group_size=3, guaranteed_group_size=2)
        >>> list(dp2)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
    Fi'  N)keep_keybuffer_size
group_sizeguaranteed_group_sizedrop_remainingr%   group_key_fnrL   rM   rN   rO   rP   c                C   s   t | || _|| _|| _|| _tt| _d| _|| _	d | _
|d ur7|d ur7d|  k r1|ks4J  J || _
|d urP|d urKd|  k rJ|ksMJ  J || _
|| _t| _d S r/   )r   r%   rQ   rL   max_buffer_sizer   rH   buffer_elementscurr_buffer_sizerN   rO   rP   r
   r(   )r,   r%   rQ   rL   rM   rN   rO   rP   r!   r!   r"   r+      s"   
$
zGrouperIterDataPipe.__init__c                 C   s   d }d}d }| j  D ]}t| j | |krt| j | }|}q| jd ur7|| jk r7| js7tdt| j | | jd u sA|| jkrF| j | }|  j|8  _| j |= |S )Nr   zFailed to group items)rS   keysr1   rO   rP   RuntimeErrorstrrT   )r,   Zbiggest_keyZbiggest_sizeresult_to_yieldZfindkeyr!   r!   r"   _remove_biggest_key   s*   




z'GrouperIterDataPipe._remove_biggest_keyc                 c   s"   | j D ]d}| |}| j| | |  jd7  _| jd urK| jt| j| krK| | j| }| jr8||fn|V  |  jt| j| 8  _| j|= | j| j	krh| 
 }|d urh| |}| jre||fn|V  qt| j D ]}| | j|}|  jt|8  _| jr||fn|V  qpd S )Nr4   )r%   rQ   rS   r0   rT   rN   r1   r(   rL   rR   rY   tuplerU   pop)r,   r2   keyresultrX   r!   r!   r"   r3     s.   


zGrouperIterDataPipe.__iter__r)   c                 C   s   d| _ tt| _d S r/   )rT   r   rH   rS   r8   r!   r!   r"   reset  s   zGrouperIterDataPipe.resetc              
   C   sD   | j | j| j| j| j| j| j| j| j| j	f
}t
jd ur t
|S |S rB   )r%   rQ   rL   rR   rN   rO   rP   r(   _valid_iterator_id_number_of_samples_yieldedr   Zgetstate_hookr,   stater!   r!   r"   __getstate__   s   

z GrouperIterDataPipe.__getstate__c                 C   s@   |\
| _ | _| _| _| _| _| _| _| _| _	d| _
tt| _d S r/   )r%   rQ   rL   rR   rN   rO   rP   r(   r_   r`   rT   r   rH   rS   ra   r!   r!   r"   __setstate__1  s   z GrouperIterDataPipe.__setstate__c                 C   s   | j   d S rB   )rS   clearr8   r!   r!   r"   __del__A  s   zGrouperIterDataPipe.__del__)r)   N)r    r:   r;   r<   r   r   r   r   r?   r>   r   r+   rY   r3   r^   rc   rd   rf   r!   r!   r!   r"   r      s8    1	

r   )r   collectionsr   collections.abcr   r   typingr   r   r   r   Z(torch.utils.data.datapipes.iter.shardingr   Z%torch.utils.data.datapipes._decoratorr	   Z#torch.utils.data.datapipes.datapiper
   r   Z'torch.utils.data.datapipes.utils.commonr   __all__r   rW   r#   r   r   r   r!   r!   r!   r"   <module>   s"   >4