+\h/LdZddlmZddlZddlmZmZmZmZm Z m Z m Z ddl m Z mZddlmZddlmZddlmZmZdd lmZmZmZdd lmZdd lmZdd lmZm Z m!Z!m"Z"m#Z#dd l$m%Z%ddl&m'Z'm(Z(m)Z)dZ*e+gdZ,erddl-m.Z.ddl/m0Z0ddl1m2Z2ddl3m4Z4ddl5m6Z6ddZ7Gddee(Z8Gdde8e(Z9Gdde8e(Z:Gdde:e(Z;y) zAWatch changes on a collection, a database, or the entire cluster.) annotationsN) TYPE_CHECKINGAnyGenericMappingOptionalTypeUnion) CodecOptions _bson_to_dict)RawBSONDocument) Timestamp)_csotcommon)_AggregationCommand_CollectionAggregationCommand_DatabaseAggregationCommand)AsyncCommandCursor)validate_collation_or_none)ConnectionFailureCursorNotFoundInvalidOperationOperationFailure PyMongoError)_Op) _CollationIn _DocumentType _PipelineF)Y[ii)#i{'iP-iR-i{4i|4?iL4)AsyncClientSession)AsyncCollection) AsyncDatabase)AsyncMongoClient)AsyncConnectionct|ttfryt|trT|jy|jdk\xr|j dxs#|jdkxr|j tvSy)z5Return True if given a resumable change stream error.TF ResumableChangeStreamError) isinstancerrr_max_wire_versionhas_error_labelcode_RESUMABLE_GETMORE_ERRORS)excs e/root/niggaflix-v3/playground/venv/lib/python3.12/site-packages/pymongo/asynchronous/change_stream.py _resumabler7Ms|#)>:;#'(  (  ! !Q & \3+>+>?[+\S##a'QCHH8Q,Q S cZeZdZdZ d ddZddZeddZeddZddZ ddZ dd Z dd Z dd Z dd Zdd ZddZd dZed!dZej(d"dZeZed#dZej(d$dZd dZd%dZy)&AsyncChangeStreamaThe internal abstract base class for change stream cursors. Should not be called directly by application developers. Use :meth:`pymongo.asynchronous.collection.AsyncCollection.watch`, :meth:`pymongo.asynchronous.database.AsyncDatabase.watch`, or :meth:`pymongo.asynchronous.mongo_client.AsyncMongoClient.watch` instead. .. versionadded:: 3.6 .. seealso:: The MongoDB documentation on `changeStreams `_. Nc|g}tjd|}tjd|t|tjd|d|_|j |_|j jjr=d|_|j|j jt|_ n||_ tj||_||_| |_| du|_|du|_tj| xs||_||_||_||_||_| |_| |_d|_|jj8|_| |_y)Npipeline full_document batchSizeFT)document_class) codec_options)r validate_listvalidate_string_or_noner%validate_non_negative_integer_or_none_decode_customr@_orig_codec_options type_registry _decoder_map with_optionsr _targetcopydeepcopy _pipeline_full_document_full_document_before_change_uses_start_after_uses_resume_after _resume_token_max_await_time_ms _batch_size _collation_start_at_operation_time_session_comment_closed_timeout_show_expanded_events)selftargetr<r= resume_aftermax_await_time_ms batch_size collationstart_at_operation_timesession start_aftercommentfull_document_before_changeshow_expanded_eventss r6__init__zAsyncChangeStream.__init__fsR(  H'' H=&& F"9-44[*M#@F@T@T    - - : :"&D "..$22???_/DL"DLx0+,G)!,D!8".d":!]];+F,G"3%#(?%    -- %9"r8cJK|jd{|_y7 wN)_create_cursor_cursorr[s r6_initialize_cursorz$AsyncChangeStream._initialize_cursors!0022 2s #! #ct)z)The aggregation command class to be used.NotImplementedErrorrls r6_aggregation_command_classz,AsyncChangeStream._aggregation_command_classs "!r8ct)zjThe client against which the aggregation commands for this AsyncChangeStream will be run. rorls r6_clientzAsyncChangeStream._clients "!r8c.i}|j|j|d<|j|j|d<|j}||jr||d<n!||d<n|j|j|d<|j r|j |d<|S)z=Return the options dict for the $changeStream pipeline stage. fullDocumentfullDocumentBeforeChange startAfter resumeAfterstartAtOperationTimeshowExpandedEvents)rMrN resume_tokenrOrUrZ)r[optionsr{s r6_change_stream_optionsz(AsyncChangeStream._change_stream_optionss"$    *&*&9&9GN #  , , 8262S2SG. /((  #%%(4 %)5 &  * * 6.2.K.KG* +  % %,0,F,FG( )r8cvi}|j|j|d<|j|j|d<|S)z4Return the options dict for the aggregation command.maxAwaitTimeMSr>)rRrS)r[r|s r6_command_optionsz"AsyncChangeStream._command_optionssE  " " .(,(?(?G$ %    '#'#3#3GK r8cf|j}d|ig}|j|j|S)z@Return the full aggregation pipeline for this AsyncChangeStream.z $changeStream)r}extendrL)r[r| full_pipelines r6_aggregation_pipelinez'AsyncChangeStream._aggregation_pipelines5--/ /9: T^^,r8c|ddsd|dvr|dd|_y|j_|jdurP|jdurA|jdk\r1|j d|_|jt d|yyyyyy) aMCallback that caches the postBatchResumeToken or startAtOperationTime from a changeStream aggregate command response containing an empty batch of change documents. This is implemented as a callback because we need access to the wire version in order to determine whether to cache this value. cursor firstBatchpostBatchResumeTokenNFr operationTimez?Expected field 'operationTime' missing from command response : )rQrUrPrOmax_wire_versiongetr)r[resultconns r6_process_resultz!AsyncChangeStream._process_resultsh -%)99%+H%56L%M"--5++u4**e3))Q.06 ?0K-008*&&,Z19 /456 .r8c jK|j|jt|j|j ||j |j }|jj|j|jj||tjd{S7w)z~Run the full aggregation pipeline for this AsyncChangeStream and return the corresponding AsyncCommandCursor. )result_processorrd) operationN) rqrIrrrrrWrs_retryable_read get_cursor_read_preference_forr AGGREGATE)r[rbexplicit_sessioncmds r6_run_aggregation_cmdz&AsyncChangeStream._run_aggregation_cmds -- LL   & & (  ! ! # !11MM. \\11 NN LL - -g 6 mm 2    sB*B3,B1-B3cK|jj|jd4d{}|j||jdud{cdddd{S7<77 #1d{7swYyxYww)NFclose)rbr)rs _tmp_sessionrVr)r[ss r6rjz AsyncChangeStream._create_cursors~<<,,T]]%,H  A22DMM,E3       sV,BA+B#A1A-A1 B%A/&B-A1/B1B7A: 8B?BcK |jjd{|jd{|_y7"#t$rY+wxYw7w)z7Reestablish this change stream after a resumable error.N)rkrrrjrls r6_resumezAsyncChangeStream._resume sP ,,$$& & &"0022  '   2sAAAAAAA AA AAAAcbKd|_|jjd{y7w)zClose this AsyncChangeStream.TN)rXrkrrls r6rzAsyncChangeStream.closes# ll  """s %/-/c|Srirls r6 __aiter__zAsyncChangeStream.__aiter__s r8c@tj|jS)zThe cached resume token that will be used to resume after the most recently returned change. .. versionadded:: 3.9 )rJrKrQrls r6r{zAsyncChangeStream.resume_tokens}}T//00r8cK|jr)|jd{}||S|jr)t7w)a8Advance the cursor. This method blocks until the next change document is returned or an unrecoverable error is raised. This method is used when iterating over all changes in the cursor. For example:: try: resume_token = None pipeline = [{'$match': {'operationType': 'insert'}}] async with await db.collection.watch(pipeline) as stream: async for insert_change in stream: print(insert_change) resume_token = stream.resume_token except pymongo.errors.PyMongoError: # The AsyncChangeStream encountered an unrecoverable error or the # resume attempt failed to recreate the cursor. if resume_token is None: # There is no usable resume token because there was a # failure during AsyncChangeStream initialization. logging.error('...') else: # Use the interrupted AsyncChangeStream's resume token to create # a new AsyncChangeStream. The new stream will continue from the # last seen insert change without missing any events. async with await db.collection.watch( pipeline, resume_after=resume_token) as stream: async for insert_change in stream: print(insert_change) Raises :exc:`StopIteration` if this AsyncChangeStream is closed. N)alivetry_nextStopAsyncIteration)r[docs r6nextzAsyncChangeStream.next#s>Bjj 'C jj ! (s A>AAc|j S)zDoes this cursor have the potential to return more data? .. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise :exc:`StopIteration` and :meth:`try_next` can return ``None``. .. versionadded:: 3.8 )rXrls r6rzAsyncChangeStream.aliveMs<<r8cK|js.|jjs|jd{ |jj dd{}|jjsd|_|:|jj"|jj|_ d|_ |S |d}|jjs,|jjr|jj}d|_d|_||_ d|_ |j$r t'|j(|j*S|S77#t $rT}t |s|jd{7|jj dd{7}Yd}~Ld}~wwxYw#t $r6}t |s%|js|jd{7d}~wt$r|jd{7wxYw#t$r&|jd{7tddwxYww)aAdvance the cursor without blocking indefinitely. This method returns the next change document without waiting indefinitely for the next change. For example:: async with await db.collection.watch() as stream: while stream.alive: change = await stream.try_next() # Note that the AsyncChangeStream's resume token may be updated # even when no changes are returned. print("Current resume token: %r" % (stream.resume_token,)) if change is not None: print("Change document: %r" % (change,)) continue # We end up here when there are no recent changes. # Sleep for a while before trying again to avoid flooding # the server with getMore requests when no changes are # available. asyncio.sleep(10) If no change document is cached locally then this method runs a single getMore command. If the getMore yields any documents, the next document is returned, otherwise, if the getMore returns no documents (because there have been no changes) then ``None`` is returned. :return: The next change document or ``None`` when no document is available after running a single getMore or when the cursor is closed. .. versionadded:: 3.8 NTF_idzECannot provide resume functionality when the resume token is missing.)rXrkrr _try_nextrr7timeoutr BaseException_post_batch_resume_tokenrQrUKeyErrorr _has_nextrOrPrDr rawrE)r[changer5r{s r6rzAsyncChangeStream.try_nextXs@||DLL$6$6,,.   =#||55d;;"||!!DL > ||44@%)\\%J%J"04-M !%=L||%%'DLL,Q,Q<<@@L"'"&*(,%    T-E-EF F { ! < =!#lln$$#||55e<<<  =  c?3;;jjl""  **,    , **,  "W  s6I EI EEE"AI <HBI E F3F.>F?#F."F%#F.(F6.F33F66 H?*G0)G,*G00HHHI I 6H97I  I cK|Swrirrls r6 __aenter__zAsyncChangeStream.__aenter__s  sc@K|jd{y7wrir)r[exc_typeexc_valexc_tbs r6 __aexit__zAsyncChangeStream.__aexit__sjjls )NNN)r\zdUnion[AsyncMongoClient[_DocumentType], AsyncDatabase[_DocumentType], AsyncCollection[_DocumentType]]r<zOptional[_Pipeline]r= Optional[str]r]Optional[Mapping[str, Any]]r^ Optional[int]r_rr`zOptional[_CollationIn]razOptional[Timestamp]rbOptional[AsyncClientSession]rcrrdz Optional[Any]rerrfzOptional[bool]returnNone)rr)rzType[_AggregationCommand])rr+rzdict[str, Any])rzlist[dict[str, Any]])rzMapping[str, Any]rr,rr)rbrrboolrr)rr)rz AsyncChangeStream[_DocumentType])rr)rr)rr)rzOptional[_DocumentType])rrrrrrrr)__name__ __module__ __qualname____doc__rgrmpropertyrqrsr}rrrrrjrrrr{rapplyr __anext__rrrrrr8r6r:r:Zs 6"&59/3%5: 5:&5:%5:25:)5:"5:*5:"55:.5:15: !5:"&3#5:$-%5:& '5:n3"""" 02 3 GK  , 3# 11 [[%!%!NI    [[]]~r8r:c@eZdZUdZded<eddZeddZy) AsyncCollectionChangeStreamzA change stream that watches changes on a single collection. Should not be called directly by application developers. Use helper method :meth:`pymongo.asynchronous.collection.AsyncCollection.watch` instead. .. versionadded:: 3.7 zAsyncCollection[_DocumentType]rIctSri)rrls r6rqz6AsyncCollectionChangeStream._aggregation_command_classs,,r8cB|jjjSri)rIdatabaseclientrls r6rsz#AsyncCollectionChangeStream._clients||$$+++r8N)rz#Type[_CollectionAggregationCommand]rzAsyncMongoClient[_DocumentType]rrrr__annotations__rrqrsrr8r6rrs5,+ --,,r8rc@eZdZUdZded<eddZeddZy) AsyncDatabaseChangeStreamzA change stream that watches changes on all collections in a database. Should not be called directly by application developers. Use helper method :meth:`pymongo.asynchronous.database.AsyncDatabase.watch` instead. .. versionadded:: 3.7 zAsyncDatabase[_DocumentType]rIctSri)rrls r6rqz4AsyncDatabaseChangeStream._aggregation_command_classs**r8c.|jjSri)rIrrls r6rsz!AsyncDatabaseChangeStream._clients||"""r8N)rz!Type[_DatabaseAggregationCommand]rrrr8r6rrs5*) ++##r8rc$eZdZdZdfd ZxZS)AsyncClusterChangeStreamaA change stream that watches changes on all collections in the cluster. Should not be called directly by application developers. Use helper method :meth:`pymongo.asynchronous.mongo_client.AsyncMongoClient.watch` instead. .. versionadded:: 3.7 c.t|}d|d<|S)NTallChangesForCluster)superr})r[r| __class__s r6r}z/AsyncClusterChangeStream._change_stream_optionss '02*.&'r8r)rrrrr} __classcell__)rs@r6rrsr8r)r5rrr)rsH" NNN,)$! C8#BB &.F?;B9 b .bJ ,"3M"B,(# 1- @#( 8G r8