Ë
    ZTÐh2  ã                  óº   — d dl mZ d dlmZ d dlmZmZmZmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZmZmZ  G d„ de«      Zy)é    )Úannotations)Údatetime)ÚDictÚListÚOptionalÚUnion)Ú IngestWithConfigDataChannelValueÚ!IngestWithConfigDataStreamRequest)ÚIngestionConfig)ÚSiftChannel)Ú_IngestionServiceImpl)ÚBufferedIngestionServiceÚOnErrorCallback)ÚChannelValue)ÚTelemetryConfig)ÚFlowÚ
FlowConfigÚFlowOrderedChannelValuesc                  óv  ‡ — e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   d
ed<   ded<   ded<   	 	 	 d	 	 	 	 	 	 	 	 	 dˆ fd„Zdˆ fd„Z	 	 	 	 	 d 	 	 	 	 	 	 	 	 	 	 	 	 	 d!ˆ fd„Zˆ fd„Z	 	 	 	 	 	 	 	 d"ˆ fd„Z		 	 	 	 	 	 	 	 d#ˆ fd„Z
d$ˆ fd„Zd%ˆ fd„Z	 	 	 d&	 	 	 	 	 	 	 d'd„Zd(ˆ fd„Zd)ˆ fd„Zd(ˆ fd„Zd)ˆ fd„Zˆ xZS )*ÚIngestionServicea&  
    A fully configured service that, when instantiated, is ready to start ingesting data.

    - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`.
    - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
    - `asset_name`: The name of the asset to telemeter.
    - `flow_configs_by_name`: A mapping of flow config name to the actual flow config.
    - `run_id`: The ID of the optional run to associated ingested data with.
    - `organization_id`: ID of the organization of the user.
    - `end_stream_on_error`:
        By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion
        won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True`
        will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This
        is useful for debugging purposes.
    - `lazy_flow_creation`:
        By default, the entire telemetry config is processed when the service is initialized, and if needed, the config and all flow info
        is sent to Sift. In the event a sufficiently large telemetry config is provided which is too large to send in one single
        gRPC message, the ingestion service will instead use a lazy flow ingestion method, which sets this boolean to True. This method
        registers individual flows the first time they are ingested. Initializing with `force_lazy_flow_creation` will force this behavior
        for any telemetry flow size. If a sufficently large telemetry config is being sent, and lazy flow ingestion behavior is not desired,
        the list of flows must be broken up beforehand and sent through the service's create flow methods.
    r   Útransport_channelr   Úingestion_configÚstrÚ
asset_namezDict[str, FlowConfig]Úflow_configs_by_nameúOptional[str]Úrun_idÚorganization_idÚboolÚend_stream_on_errorÚlazy_flow_creationc                ó.   •— t         ‰|   |||||¬«       y )N)ÚchannelÚconfigr   r    Úforce_lazy_flow_creation)ÚsuperÚ__init__)Úselfr#   r$   r   r    r%   Ú	__class__s         €ú^/home/www/backend.miabetepe.com/venv/lib/python3.12/site-packages/sift_py/ingestion/service.pyr'   zIngestionService.__init__5   s&   ø€ ô 	‰ÑØØØØ 3Ø%=ð 	õ 	
ó    c                ó   •— t        ‰|   |Ž  y)zi
        This method performs the actual data ingestion given a list of data ingestion requests.
        N)r&   Úingest)r(   Úrequestsr)   s     €r*   r-   zIngestionService.ingestE   s   ø€ ô 	‰‰˜Ò!r+   c           	     ó0   •— t         ‰|   |||||||«       y)zå
        Retrieve an existing run or create one to use during this period of ingestion.

        Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name.
        N)r&   Ú
attach_run)	r(   r#   Úrun_nameÚdescriptionr   ÚtagsÚmetadataÚ	force_newr)   s	           €r*   r0   zIngestionService.attach_runK   s!   ø€ ô 	‰ÑØX˜{¨O¸TÀ8ÈYõ	
r+   c                ó"   •— t         ‰|   «        y)z
        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
        the run being detached.
        N)r&   Ú
detach_run)r(   r)   s    €r*   r7   zIngestionService.detach_run^   s   ø€ ô
 	‰ÑÕr+   c                ó&   •— t         ‰|   |||«      S )aà  
        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
        `channel_values` will be assigned an empty value.

        This function will perform validation checks to ensure that the values provided in the dictionary; this
        includes:
          - Making sure the flow exists
          - Making sure that the there are no unexpected channels provided for the given flow
          - Making sure the channel value is the expected type
          - Making sure that the timestamp is in UTC

        If any of the above validations fail then a `IngestionValidationError` will be raised.

        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
        ingestion will be handled by the Sift API.
        )r&   Útry_create_ingestion_request©r(   Ú	flow_nameÚ	timestampÚchannel_valuesr)   s       €r*   r9   z-IngestionService.try_create_ingestion_requeste   s   ø€ ô0 ‰wÑ3°I¸yÈ.ÓYÐYr+   c                ó&   •— t         ‰|   |||«      S )aÆ  
        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
        argument validation or if they require low-latency execution time client-side.

        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
        data ingestion stream will be terminated if an error is encountered during ingestion.

        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
          associated with the `flow_name`.
        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
        - The `timestamp` must be in UTC.
        )r&   Úcreate_ingestion_requestr:   s       €r*   r?   z)IngestionService.create_ingestion_request   s   ø€ ô, ‰wÑ/°	¸9ÀnÓUÐUr+   c                ó   •— t        ‰|   |Ž S )z½
        Combines the requests creation step and ingestion into a single call.
        See `create_ingestion_request` for information about how client-side validations are handled.
        )r&   Úingest_flows©r(   Úflowsr)   s     €r*   rA   zIngestionService.ingest_flows—   s   ø€ ô
 ‰wÑ# UÐ+Ð+r+   c                ó   •— t        ‰|   |Ž S )zÁ
        Combines the requests creation step and ingestion into a single call.
        See `try_create_ingestion_request` for information about how client-side validations are handled.
        )r&   Útry_ingest_flowsrB   s     €r*   rE   z!IngestionService.try_ingest_flowsž   s   ø€ ô
 ‰wÑ'¨Ð/Ð/r+   c                ó   — t        | |||«      S )aJ  
        This method automates buffering requests and streams them in batches. It is recommended to be used
        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.

        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.

        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
        occur once the buffer is filled and at the end of the scope of the with-block.

        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
        to flush the buffer.

        Example usage:

        ```python
        # With client-side validations
        with ingestion_service.buffered_ingestion() as buffered_ingestion:
            for _ in range(10_000):
                buffered_ingestion.try_ingest_flows({
                    "flow_name": "readings",
                    "timestamp": datetime.now(timezone.utc),
                    "channel_values": [
                        {
                            "channel_name": "my-channel",
                    ],
                })

        # Without client-side validations and a custom buffer size
        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
            for _ in range(6_000):
                buffered_ingestion.ingest_flows({
                    "flow_name": "readings",
                    "timestamp": datetime.now(timezone.utc),
                    "channel_values": [double_value(3)]
                })

        # With default buffer size and periodic flushes of 3.2 seconds
        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
            for _ in range(6_000):
                buffered_ingestion.ingest_flows({
                    "flow_name": "readings",
                    "timestamp": datetime.now(timezone.utc),
                    "channel_values": [double_value(3)]
                })

        # Custom code to run when error
        def on_error_calback(err, buffer, flush):
            # Save contents of buffer to disk
            ...
            # Try once more to flush the buffer
            flush()

        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
            ...
        ```
        )r   )r(   Úbuffer_sizeÚflush_interval_secÚon_errors       r*   Úbuffered_ingestionz#IngestionService.buffered_ingestion¥   s   € ôN (¨¨kÐ;MÈxÓXÐXr+   c                ó   •— t        ‰|   |Ž  y)z±
        Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists
        a flow with the name of the `flow_config` argument.
        N©r&   Úcreate_flow©r(   Úflow_configr)   s     €r*   rM   zIngestionService.create_flowî   s   ø€ ô
 	‰Ñ˜[Ò)r+   c                ó   •— t        ‰|   |Ž  y)z$
        See `create_flow`.
        NrL   ©r(   Úflow_configsr)   s     €r*   Úcreate_flowszIngestionService.create_flowsõ   s   ø€ ô 	‰Ñ˜\Ò*r+   c                ó   •— t        ‰|   |Ž  y)zµ
        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
        a flow with the name of the `flow_config` argument.
        N©r&   Útry_create_flowrN   s     €r*   rV   z IngestionService.try_create_flowû   s   ø€ ô
 	‰Ñ Ò-r+   c                ó   •— t        ‰|   |Ž  y)z)
        See `try_create_flows`.
        NrU   rQ   s     €r*   Útry_create_flowsz!IngestionService.try_create_flows  s   ø€ ô 	‰Ñ Ò.r+   )NFF)
r#   r   r$   r   r   r   r    r   r%   r   )r.   r
   )NNNNF)r#   r   r1   r   r2   r   r   r   r3   zOptional[List[str]]r4   z,Optional[Dict[str, Union[str, float, bool]]]r5   r   )r;   r   r<   r   r=   zAUnion[List[ChannelValue], List[IngestWithConfigDataChannelValue]]Úreturnr
   )r;   r   r<   r   r=   z&List[IngestWithConfigDataChannelValue]rY   r
   )rC   r   )rC   r   )NNN)rG   zOptional[int]rH   zOptional[float]rI   zOptional[OnErrorCallback]rY   r   )rO   r   )rR   r   )Ú__name__Ú
__module__Ú__qualname__Ú__doc__Ú__annotations__r'   r-   r0   r7   r9   r?   rA   rE   rJ   rM   rS   rV   rX   Ú__classcell__)r)   s   @r*   r   r      s¢  ø… ñð. #Ó"Ø%Ó%ØƒOØ/Ó/ØÓØ"Ó"ØÓØÓð !%Ø$)Ø).ð
àð
ð  ð
ð ð	
ð
 "ð
ð #'õ
õ "ð &*Ø)-Ø$(ØAEØð
àð
ð ð
ð #ð	
ð
 'ð
ð "ð
ð ?ð
ð õ
ô&ðZàðZð ðZð Zð	Zð
 
+õZð4VàðVð ðVð ?ð	Vð
 
+õVõ0,õ0ð &*Ø.2Ø.2ð	GYà"ðGYð ,ðGYð ,ð	GYð
 
"óGYõR*õ+õ.÷/ñ /r+   r   N)Ú
__future__r   r   Útypingr   r   r   r   Úsift.ingest.v1.ingest_pb2r	   r
   Ú/sift.ingestion_configs.v2.ingestion_configs_pb2r   Úsift_py.grpc.transportr   Ú"sift_py.ingestion._internal.ingestr   Úsift_py.ingestion.bufferr   r   Úsift_py.ingestion.channelr   Ú"sift_py.ingestion.config.telemetryr   Úsift_py.ingestion.flowr   r   r   r   © r+   r*   ú<module>rk      s@   ðÝ "å ß .Ó .÷õ Lå .Ý Dß NÝ 2Ý >ß MÑ Môr/Ð,õ r/r+   