o
    wZhi#                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d
dgZe
dddZedG dd dee Zdd ZedG dd
 d
eZdS )    N)
namedtuple)IteratorSized)AnyCallableOptionalTypeVarUnion)default_collate)functional_datapipe)dataframe_wrapper)IterDataPipe)_check_unpickable_fnvalidate_input_colCollatorIterDataPipeMapperIterDataPipe_T_coT)	covariantmapc                       sn   e Zd ZU dZeed< eed< 		ddededdf fddZdd	 Zde	e
 fd
dZdefddZ  ZS )r   a  
    Applies a function over each item from the source DataPipe (functional name: ``map``).

    The function can be any regular Python function or partial object. Lambda
    function is not recommended as it is not supported by pickle.

    Args:
        datapipe: Source Iterable DataPipe
        fn: Function being applied over each item
        input_col: Index or indices of data which ``fn`` is applied, such as:

            - ``None`` as default to apply ``fn`` to the data directly.
            - Integer(s) is used for list/tuple.
            - Key(s) is used for dict.

        output_col: Index of data where result of ``fn`` is placed. ``output_col`` can be specified
            only when ``input_col`` is not ``None``

            - ``None`` as default to replace the index that ``input_col`` specified; For ``input_col`` with
              multiple indices, the left-most one is used, and other indices will be removed.
            - Integer is used for list/tuple. ``-1`` represents to append result at the end.
            - Key is used for dict. New key is acceptable.

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper, Mapper
        >>> def add_one(x):
        ...     return x + 1
        >>> dp = IterableWrapper(range(10))
        >>> map_dp_1 = dp.map(add_one)  # Invocation via functional form is preferred
        >>> list(map_dp_1)
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        >>> # We discourage the usage of `lambda` functions as they are not serializable with `pickle`
        >>> # Use `functools.partial` or explicitly define the function instead
        >>> map_dp_2 = Mapper(dp, lambda x: x + 1)
        >>> list(map_dp_2)
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    datapipefnNreturnc                    sz   t    || _t| || _|| _|d u r|d urtdt|tt	fr3t
|dkr/td|d }|| _t|| d S )Nz3`output_col` must be None when `input_col` is None.   z3`output_col` must be a single-element list or tupler   )super__init__r   r   r   	input_col
ValueError
isinstancelisttuplelen
output_colr   )selfr   r   r   r!   	__class__ W/var/www/auris/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.pyr   F   s   
zMapperIterDataPipe.__init__c                    s  | j d u r| jd u r|  S | j d u r|  }n"t| j ttfr4t fdd| j D }| j| }n|  | j  }t trHd}t  nd}| jd u rvt| j ttfrp| | j d < t| j dd  ddD ]} |= qin| | j < n| jdkr | n| | j< |rt S  S )	Nc                 3   s    | ]} | V  qd S Nr%   ).0coldatar%   r&   	<genexpr>d   s    z/MapperIterDataPipe._apply_fn.<locals>.<genexpr>TFr   r   )reverse)r   r!   r   r   r   r   sortedappend)r"   r+   resargsZt_flagidxr%   r*   r&   	_apply_fn]   s.   






zMapperIterDataPipe._apply_fnc                 c   s    | j D ]}| |V  qd S r'   )r   r4   )r"   r+   r%   r%   r&   __iter__   s   
zMapperIterDataPipe.__iter__c                 C   s*   t | jtrt| jS tt| j d)Nz# instance doesn't have valid length)r   r   r   r    	TypeErrortype__name__)r"   r%   r%   r&   __len__   s   
zMapperIterDataPipe.__len__)NN)r8   
__module____qualname____doc__r   __annotations__r   r   r4   r   r   r5   intr9   __classcell__r%   r%   r#   r&   r      s    
 '#c                 C   s   t |jdkrtd|d }t|}g }g }|  D ]
}||vr&tdq|D ]F}|| v r>t| | s9td| | }nzdd lm} |j	
 }W n ty\ }	 ztd|	d }	~	ww |t| ||| }
||
 q)td|}|| }|S )Nr   z%Only supports one DataFrame per batchr   zConversion keys missmatchz5Collate (DF)DataPipe requires callable as dict valuesz?unable to import default collation function from the TorchArrowZCollateResult)r    itemsRuntimeError
df_wrapperZget_columnskeyscallableZtorcharrow.pytorchZpytorchZrecZDefault	Exceptionr0   strr   )
conversionitemZdfZcolumns_nameZtuple_namesZtuple_valuesnameZcollation_fntapevalueZtpl_clsr   r%   r%   r&   _collate_helper   sD   


rM   Zcollatec                       sf   e Zd ZdZedfdedeedef e	ee
ef eeef f df dee ddf fdd	Z  ZS )
r   af  
    Collates samples from DataPipe to Tensor(s) by a custom collate function (functional name: ``collate``).

    By default, it uses :func:`torch.utils.data.default_collate`.

    .. note::
        While writing a custom collate function, you can import :func:`torch.utils.data.default_collate` for the
        default behavior and `functools.partial` to specify any additional arguments.

    Args:
        datapipe: Iterable DataPipe being collated
        collate_fn: Customized collate function to collect and combine data or a batch of data.
            Default function collates to Tensor(s) based on data type.

    Example:
        >>> # xdoctest: +SKIP
        >>> # Convert integer data to float Tensor
        >>> class MyIterDataPipe(torch.utils.data.IterDataPipe):
        ...     def __init__(self, start, end):
        ...         super(MyIterDataPipe).__init__()
        ...         assert end > start, "this example code only works with end >= start"
        ...         self.start = start
        ...         self.end = end
        ...
        ...     def __iter__(self):
        ...         return iter(range(self.start, self.end))
        ...
        ...     def __len__(self):
        ...         return self.end - self.start
        ...
        >>> ds = MyIterDataPipe(start=3, end=7)
        >>> print(list(ds))
        [3, 4, 5, 6]
        >>> def collate_fn(batch):
        ...     return torch.tensor(batch, dtype=torch.float)
        ...
        >>> collated_ds = CollateIterDataPipe(ds, collate_fn=collate_fn)
        >>> print(list(collated_ds))
        [tensor(3.), tensor(4.), tensor(5.), tensor(6.)]
    Nr   rG   .
collate_fnr   c                    sX   |d urt  j||d d S t|rt  j||d d S tt|}t  j||d d S )N)r   )r   r   rD   	functoolspartialrM   )r"   r   rG   rN   r#   r%   r&   r      s   
zCollatorIterDataPipe.__init__)r8   r:   r;   r<   r
   r   r	   r   r   dictrF   r   r   r?   r%   r%   r#   r&   r      s    .()rO   collectionsr   collections.abcr   r   typingr   r   r   r   r	   Ztorch.utils.data._utils.collater
   Z%torch.utils.data.datapipes._decoratorr   Z$torch.utils.data.datapipes.dataframer   rB   Z#torch.utils.data.datapipes.datapiper   Z'torch.utils.data.datapipes.utils.commonr   r   __all__r   r   rM   r   r%   r%   r%   r&   <module>   s$   o+