
    ZTh                         U d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZ d dlmZmZ d	Z ed
e      Zeg df   Zeed<   eeee   egdf   Zeed<    G d dee         Zy)    N)contextmanager)TracebackType)CallableGenericListOptionalTypeTypeVar)!IngestWithConfigDataStreamRequest)Self	TypeAlias)_IngestionServiceImpl)FlowFlowOrderedChannelValuesi  T)boundFlushCallbackOnErrorCallbackc            	       J   e Zd ZU dZee   ed<   eed<   eed<   e	e
   ed<   e	ej                     ed<   e	ej                     ed<   e	e   ed<   d	ed
e	e   de	e
   de	e   fdZdefdZde	ee      de	e   de	e   defdZdefdZdefdZd Zd Zd Zd Zd Ze d        Z!y)BufferedIngestionServicez
    See `sift_py.ingestion.service.IngestionService.buffered_ingestion`
    for more information and how to leverage buffered ingestion.
    _buffer_buffer_size_ingestion_service_flush_interval_sec_flush_timer_lock	_on_erroringestion_servicebuffer_sizeflush_interval_secon_errorc                     g | _         |xs t        | _        || _        || _        d | _        |r1|| _        t        j                         | _	        | j                          y d | _        d | _	        y N)r   DEFAULT_BUFFER_SIZEr   r   r   r   r   	threadingLockr   _start_flush_timer)selfr   r   r    r!   s        ]/home/www/backend.miabetepe.com/venv/lib/python3.12/site-packages/sift_py/ingestion/buffer.py__init__z!BufferedIngestionService.__init__$   sd     '>+>"3! '9D$")DJ##%'+D$DJ    returnc                     | S r#    r(   s    r)   	__enter__z"BufferedIngestionService.__enter__9   s    r+   exc_typeexc_valexc_tbc                     | j                          |G| j                  )| j                  || j                  | j                         || j                          || j                          y)NT)_cancel_flush_timerr   r   flush)r(   r1   r2   r3   s       r)   __exit__z!BufferedIngestionService.__exit__<   s[     	  "~~)wdjjA M 

MJJLr+   flowsc                 r   | j                         5  d}t        | j                  t        | j                        z
  t        |            }|t        |      k  r||| D ]J  }|d   }|d   }|d   }| j
                  j                  |||      }| j                  j                  |       L t        | j                        | j                  k\  r| j                          |}t        || j                  t        | j                        z
  z   t        |            }|t        |      k  rddd       y# 1 sw Y   yxY w)z
        Ingests flows in batches for each request generated from a flow.
        See `sift_py.ingestion.service.IngestionService.create_ingestion_request`
        for more information.
        r   	flow_name	timestampchannel_valuesr:   r;   r<   N)		_use_lockminr   lenr   r   create_ingestion_requestappend_flush	r(   r8   
lhs_cursor
rhs_cursorflowr:   r;   r<   reqs	            r)   ingest_flowsz%BufferedIngestionService.ingest_flowsP   s.    ^^ 	J!!C$55E
J
 s5z)!*Z8 
-D $[ 1I $[ 1I%)*:%;N11JJ"+"+'5 K C
 LL'',
- t||$(9(99KKM'
 $"3"3c$,,6G"GHJ
# s5z)	 	 	   DD--D6c                 r   | j                         5  d}t        | j                  t        | j                        z
  t        |            }|t        |      k  r||| D ]J  }|d   }|d   }|d   }| j
                  j                  |||      }| j                  j                  |       L t        | j                        | j                  k\  r| j                          |}t        || j                  t        | j                        z
  z   t        |            }|t        |      k  rddd       y# 1 sw Y   yxY w)z
        Ingests flows in batches and performs client-side validations for each request
        generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request`
        for more information.
        r   r:   r;   r<   r=   N)	r>   r?   r   r@   r   r   try_create_ingestion_requestrB   rC   rD   s	            r)   try_ingest_flowsz)BufferedIngestionService.try_ingest_flowss   s.    ^^ 	J!!C$55E
J
 s5z)!*Z8 
-D $[ 1I $[ 1I%)*:%;N11NN"+"+'5 O C
 LL'',
- t||$(9(99KKM'
 $"3"3c$,,6G"GHJ
# s5z)	 	 	rJ   c                     | j                   rB| j                  r6| j                  5  | j                          ddd       | j                          y| j                          y# 1 sw Y   +xY w)z:
        Flush and ingest all requests in buffer.
        N)r   r   rC   _restart_flush_timerr/   s    r)   r6   zBufferedIngestionService.flush   sM    
  %%'KKM	 s   A  A)c                     t        | j                        dkD  r> | j                  j                  | j                    | j                  j	                          y y )Nr   )r@   r   r   ingestclearr/   s    r)   rC   zBufferedIngestionService._flush   sB    t||q *D##**DLL9LL  !r+   c                     | j                   rJt        j                  | j                   | j                        | _        | j                  j                          y y r#   )r   r%   Timerr6   r   startr/   s    r)   r'   z+BufferedIngestionService._start_flush_timer   s?    ## )0H0H$** UD##% $r+   c                 `    | j                   r"| j                   j                          d | _         y y r#   )r   cancelr/   s    r)   r5   z,BufferedIngestionService._cancel_flush_timer   s*    $$& $D r+   c                 D    | j                          | j                          y r#   )r5   r'   r/   s    r)   rO   z-BufferedIngestionService._restart_flush_timer   s      "!r+   c              #     K   	 | j                   r| j                   j                          d  | j                   r| j                   j                          y y # | j                   r| j                   j                          w w xY wwr#   )r   acquirereleaser/   s    r)   r>   z"BufferedIngestionService._use_lock   s]     	%zz

""$zz

""$ tzz

""$ s   B*A (B)A??BN)"__name__
__module____qualname____doc__r   r   __annotations__intr   r   floatr%   rT   r&   r   r*   r   r0   r	   BaseExceptionr   boolr7   r   rI   r   rM   r6   rC   r'   r5   rO   r   r>   r.   r+   r)   r   r      s   
 344!%(9??++INN##(( c] %UO	
 ?+*4 4./ -( '	
 
(!#; !F!t !F
!
&
%
" % %r+   r   )r%   
contextlibr   typesr   typingr   r   r   r   r	   r
   sift.ingest.v1.ingest_pb2r   typing_extensionsr   r   "sift_py.ingestion._internal.ingestr   sift_py.ingestion.flowr   r   r$   r   r   r`   rc   r   r   r.   r+   r)   <module>rl      s~     %  C C G - D A C,-#BH-y -%D:;]KTQ 
g%wqz g%r+   