a
    hr                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	m
Z
 d dlmZ dd Zd	d
 ZG dd de
ZG dd de	ZdS )    N)Client_get_global_client)Worker)
filesystem)AbstractBufferedFileAbstractFileSysteminfer_storage_optionsc                 C   s(   | d u rt  S t| tr| S t| S d S N)r   
isinstancer   )client r   I/var/www/auris/lib/python3.9/site-packages/fsspec/implementations/dask.py_get_client
   s
    
r   c                   C   s
   t tjS r
   )boolr   Z
_instancesr   r   r   r   
_in_worker   s    r   c                       sp   e Zd ZdZd fdd	Zedd Zdd Zd	d
 Zdd Z	dd Z
dd Zdd ZdddZdd Z  ZS )DaskWorkerFileSystema)  View files accessible to a worker as any other remote file-system

    When instances are run on the worker, uses the real filesystem. When
    run on the client, they call the worker to provide information or data.

    **Warning** this implementation is experimental, and read-only for now.
    Nc                    sT   t  jf i | |d u |d u A s*td|| _|| _d | _|| _|| _|   d S )NzKPlease provide one of filesystem instance (fs) or target_protocol, not both)	super__init__
ValueErrortarget_protocoltarget_optionsworkerr   fs_determine_worker)selfr   r   r   r   kwargs	__class__r   r   r   !   s    zDaskWorkerFileSystem.__init__c                 C   s:   t | }d|v r2d|v r2d|d  d|d  iS i S d S )Nhostportr   :r   )pathsor   r   r   _get_kwargs_from_urls1   s    z*DaskWorkerFileSystem._get_kwargs_from_urlsc                 C   sT   t  r2d| _| jd u rPt| jfi | jp(i | _nd| _t| j| _t	| | _
d S )NTF)r   r   r   r   r   r   r   r   daskZdelayedrfsr   r   r   r   r   9   s    

z&DaskWorkerFileSystem._determine_workerc                 O   s4   | j r| jj|i | n| jj|i |  d S r
   )r   r   mkdirr&   computer   argsr   r   r   r   r(   E   s    zDaskWorkerFileSystem.mkdirc                 O   s4   | j r| jj|i | n| jj|i |  d S r
   )r   r   rmr&   r)   r*   r   r   r   r,   K   s    zDaskWorkerFileSystem.rmc                 O   s4   | j r| jj|i | n| jj|i |  d S r
   )r   r   copyr&   r)   r*   r   r   r   r-   Q   s    zDaskWorkerFileSystem.copyc                 O   s4   | j r| jj|i | n| jj|i |  d S r
   )r   r   mvr&   r)   r*   r   r   r   r.   W   s    zDaskWorkerFileSystem.mvc                 O   s2   | j r| jj|i |S | jj|i | S d S r
   )r   r   lsr&   r)   r*   r   r   r   r/   ]   s    zDaskWorkerFileSystem.lsrbTc              	   K   sD   | j r$| jj|f||||d|S tf | |||||d|S d S )N)mode
block_size
autocommitcache_options)r   r"   r1   r2   r3   r4   )r   r   _openDaskFile)r   r"   r1   r2   r3   r4   r   r   r   r   r5   c   s*    		zDaskWorkerFileSystem._openc                 C   sf   | j rL| ||(}|| ||| W  d    S 1 s@0    Y  n| j|||| S d S r
   )r   r5   seekreadr&   fetch_ranger)   )r   r"   r1   startendfr   r   r   r9      s
    
.z DaskWorkerFileSystem.fetch_range)NNNN)r0   NTN)__name__
__module____qualname____doc__r   staticmethodr$   r   r(   r,   r-   r.   r/   r5   r9   __classcell__r   r   r   r   r      s"   	 
	    
r   c                       s8   e Zd Zd fdd	ZdddZdd Zd	d
 Z  ZS )r6   r0   c                    s&   |dkrt dt jf i | d S )Nr0   z1Remote dask files can only be opened in "rb" mode)r   r   r   )r   r1   r   r   r   r   r      s    zDaskFile.__init__Fc                 C   s   d S r
   r   )r   finalr   r   r   _upload_chunk   s    zDaskFile._upload_chunkc                 C   s   dS )zCreate remote file/uploadNr   r'   r   r   r   _initiate_upload   s    zDaskFile._initiate_uploadc                 C   s   | j | j| j||S )z*Get the specified set of bytes from remote)r   r9   r"   r1   )r   r:   r;   r   r   r   _fetch_range   s    zDaskFile._fetch_range)r0   )F)r=   r>   r?   r   rD   rE   rF   rB   r   r   r   r   r6      s   
r6   )r%   Zdistributed.clientr   r   Zdistributed.workerr   Zfsspecr   Zfsspec.specr   r   Zfsspec.utilsr	   r   r   r   r6   r   r   r   r   <module>   s   
q