+\h"RdZddlmZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl m Z ddlmZmZmZmZmZmZddlmZmZmZmZddlmZmZddlmZmZdd l m!Z!dd l"m#Z#dd l$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-dd l.m/Z/dd l0m1Z1m2Z2m3Z3ddl4m5Z5m6Z6m7Z7m8Z8m9Z9ddl:m;Z;ddlm?Z?m@Z@mAZAmBZBmCZCddlDmEZEmFZFmGZGmHZHmIZIerddlJmKZKddlLmMZMddlNmOZOmPZPdZQeRe eSjZUddZVGddZWGddZX d dZYd!dZZ d" d#dZ[y)$z>cBeZdZdZd/dZd0dZd1dZ d2 d3dZ d4dZ d5 d6dZ d5 d6d Z d7 d8d Z d9 d:d Z d9 d:d Z d;d Zd;dZddZ d?dZd@dZd@dZdAdZdBdZdBdZdCdDdZdEdZd0dZd0dZedFdZdGdZdHdZ dIdZ!dJd Z"d0d!Z#dKd"Z$dLd#Z%dLd$Z&d0d%Z'd0d&Z(dMd'Z)dMd(Z*dNd)Z+d0d*Z,dOd+Z-dPd,Z.dQd-Z/dRd.Z0y)STopologyz*Monitor a topology of one or more servers.c  |j|_|jj|_|jduxr|jj|_|jduxr|jj |_d|_d|_ |j s |jrtjd|_tjtjr*t!tt"j$|j|jrJ|jJ|jj'|jj(|jff||_t-|j/|j1|j2dd|}||_t-t6j8iddd|j*}|jrV|jJ|jj'|jj:||j4|jfftjtjrHt!tt"j<|jt?|t?|j4|j@D]}|j rK|jJ|jj'|jjB||jfftjtjs~t!tt"jD|j|d|dtG|jI|_%d|_&d|_'tQ|_)tU|jRtVr|j*jXnd|_-i|_.d|_/d|_0tc|_2|j s |jr|jJd fd }tgjhtjjltjjn|d }tqjr|j|jt||_ |jwd|_<|j*jz1|j*j|st||j*|_<g|_@y) Nd)maxsizemessage topologyIdrHrIpreviousDescriptionnewDescriptionr)rHrI serverHost serverPortFc"KtSwN)rA)weaksr@targetz!Topology.__init__..targets+D11s pymongo_events_thread)interval min_intervalrSname)returnbool)A _topology_id _pool_options_event_listeners _listenersenabled_for_server_publish_serverenabled_for_topology _publish_tp_events_Topology__events_executorr9Queuer" isEnabledForloggingDEBUGr$r%START_TOPOLOGYputpublish_topology_opened _settingsr0get_topology_typeget_server_descriptionsreplica_set_name _descriptionr/Unknown$publish_topology_description_changedTOPOLOGY_CHANGEreprseedspublish_server_opened START_SERVERlistserver_descriptions_seed_addresses_opened_closedr!_lockr _IS_SYNCcondition_class _condition_servers_pid_max_cluster_timer _session_poolrAsyncPeriodicExecutorr EVENTS_QUEUE_FREQUENCYMIN_HEARTBEAT_INTERVALweakrefrefcloseopen _srv_monitorfqdn load_balancedr_monitor_tasks)selftopology_settingstopology_description initial_tdseedrSexecutorrRs @r@__init__zTopology.__init__ks-::+99JJ#d:at?a?a??$6_4??;_;_ &*   4#3#3 ;;s3DL  $ $W]] 3 *99,,    <<+ ++ LL  dooEEHYHYG[\ ]*2  / / 1  5 5 7  . .     1(  ! !2tT4    <<+ ++ LL  OOHH!2!2D4E4EF   $ $W]] 3 *::,,$($4#D$5$56  &++ D##||///   $//"G"G$PTPaPaIb!cd((7 .;;#00#Aw#Aw   $$8$L$L$NO  ') 1 JJ(66 13 #' 8</1   4#3#3<<+ ++ 2)>>66#::, H;;t||X^^>   *4>>3O3O *4 @D 24cKtj}|j||_n||jk7r||_tjdddk\r dt fi}nddi}t j di||j4d{|jjD]}|jd{|jjdddd{|j4d{|jd{dddd{y77p7F#1d{7swYVxYw7H727$#1d{7swYyxYww)aStart monitoring, or restart after a fork. No effect if called multiple times. .. warning:: Topology is shared among multiple threads and is protected by mutual exclusion. Using Topology from a process other than the one that initialized it will emit a warning and may result in deadlock. To prevent this from happening, AsyncMongoClient must be created after any forking. N) skip_file_prefixes stacklevel)zAsyncMongoClient opened before fork. May not be entirely fork-safe, proceed with caution. See PyMongo's documentation for details: https://dochub.mongodb.org/core/pymongo-fork-deadlock)osgetpidrsys version_info _pymongo_dirwarningswarnr|rvaluesrrreset_ensure_opened)rpidkwargsservers r@rz Topology.opensOiik 99 DI DII DI#w..@&* MMH    zz + +"mm224)F ,,.(()""((*  + +:: ( (%%' ' ' ( ( ( +) + + + + ( ' ( ( ( (sB E% D/ E%1D5D1 D5! E%,D3-E%E E%EE E E%)E*E%1D53E%5E;D> <EE% EE%E"E E"E%c^tj}||jjS|SrQ)r remainingrkserver_selection_timeout)rtimeouts r@get_server_selection_timeoutz%Topology.get_server_selection_timeouts(//# ?>>:: :rNc K||j}n|}ts$|jr|jd{|j4d{|j |||||d{}|Dcgc]+}t t|j|j-c}cdddd{S7~7k7Pcc}w7#1d{7swYyxYww)aReturn a list of Servers matching selector, or time out. :param selector: function that takes a list of Servers and returns a subset of them. :param operation: The name of the operation that the server is being selected for. :param server_selection_timeout: maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. :param address: optional server address to select. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. N) rr}rcleanup_monitorsr|_select_servers_loopr rget_server_by_addressaddress) rselector operationrr operation_idserver_timeoutrxsds r@select_serverszTopology.select_serverss. $ +!>>@N5ND//'') ) )::  (,(A(A.)\7)#  PcIKVT77 CD     * #     s{;CB<CB>CC /C0C 80C(C * C6C7C>CC C C CC CCcKtj}||z}d}tjtj rLt ttj||||j|jjj|jj|||jj} | s|dk(s||kDrtjtj r\t ttj ||||j|jjj|j#|t%|j#|d|d|j|spt ttj&||||j|jjjt)d|tjz z d }|j+d {|j-t/|j0t2j4d {|jj7tj}|jj|||jj} | s|jj7| S77w) z7select_servers() guts. Hold the lock when calling this.F)rHrr operationIdtopologyDescriptionclientId)custom_selectorr)rHrrrrrfailurez , Timeout: zs, Topology Description: i)rHrrrrrremainingTimeMSTN)time monotonicr#rerfrgr$r&STARTED description_topology_settingsrZroapply_selectorrkserver_selectorFAILED_error_messagerWAITINGintr_request_check_allrrr rcheck_compatible) rrrrrrnowend_timelogged_waitingrxs r@rzTopology._select_servers_loop.sVnn= # 0 0 ? (5==!#($($4$4))<<II #//>> gt~~/M/M? &!|sX~+88G0 = D D!)"+$0,0,<,>;K0K(L$M "&%%' ' '  # # % #4??F4Q4QR R R    . . 0.."C"&"3"3"B"B'4>>3Q3Q#C# S&Z **,"" ( Ss+G:K<J==;K8J?9A&K!K?KcK|j|||||d{}t||}t|dk(r|dStj|d\}} |j j | j j kr|S| S7mw)NrMrr)r_filter_serverslenrandomsamplepooloperation_count) rrrrrdeprioritized_serversrserversserver1server2s r@_select_serverzTopology._select_serverzs++ i!97L  "'+@A w<1 1: !==!4 << ' '7<<+G+G GNN sB B A.B c K|j||||||d{}tjr)tj|jj t jtjr|tt tj||||j|jjj|jjd|jjd |S7w)zALike select_servers, but choose a random server if several match.rNrrM)rHrrrrrrNrO)rr get_timeoutset_rttrmin_round_trip_timer#rerfrgr$r& SUCCEEDEDrrZr)rrrrrrrrs r@ select_serverzTopology.select_servers**   $  !% +       MM&,,@@ A # 0 0 ? (5??!#($($4$4))<<II!--55a8!--55a8  - sDDC#DcRK|jt||||d{S7w)a=Return a Server for "address", reconnecting if necessary. If the server's type is not known, request an immediate check of all servers. Time out after "server_selection_timeout" if the server cannot be reached. :param address: A (host, port) pair. :param operation: The name of the operation that the server is being selected for. :param server_selection_timeout: maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. :param operation_id: The unique id of the current operation being performed. Defaults to None if not provided. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. rN)rr*)rrrrrs r@select_server_by_addressz!Topology.select_server_by_addresss92''   $ % (    s '%'c K|j}|j|j}t||ryt |j|}|j s)|j rf|jtjk(rI|jj|j}|r"|jjd{||k(}|jrY|sW|jJ|jj!|j"j$|||j|j&ff||_|j)d{|j*rX|sV|jJ|jj!|j"j,||j|j&fft/j0t2j4rJ|sHt7t.t8j:|j&t=|t=|j|j>r|jtj@k(ri|jjtBvrM|j>jEd{tFs%|jHjK|j>|jLjOy7 77Pw)ziProcess a new ServerDescription on an opened topology. Hold the lock when calling this. NrJ)(ro_server_descriptionsr_is_stale_server_descriptionr2 is_readableis_server_type_known topology_typer/Singlergetrreadyr_rbrir]"publish_server_description_changedrZ_update_serversrarqr"rerfrgr$r%rrrsrrpr.rr}rappendr notify_all) rserver_description reset_poolinterrupt_connectionstd_oldsd_oldnew_tdrsuppress_events r@_process_changezTopology._process_changesO"",,-?-G-GH '0B C -d.?.?AST  ) )  3 38L8LP]PdPd8d]]&&'9'A'ABFkk'')))#55   <<+ ++ LL  OOFF/1C1K1KTM^M^_ #""$$$   N<<+ ++ LL  OOHHT..0A0AB   $ $W]] 3N *::,,$(L#D$5$56       M$9$9 9!!//7MM##))+ + +##**4+<+<= ""$Y* %4 ,s9CKKBK KD7KKA KKKcK|j4d{|jr@|jj|jr|j |||d{dddd{|rM|j j|j}|r%|jj|d{yyy77f7X#1d{7swYhxYw7"w)z>Process a new ServerDescription after an hello call completes.N)r) r|rzro has_serverrrrrrr)rrrrrs r@ on_changezTopology.on_changes:: b b|| 1 1 < <=O=W=W X**+=zK`aaa b b ]]&&'9'A'ABFkk''>S'TTT  bb b b b bUsdC!CC!AC C C $ C!/C0A C!<C=C!C C! CC CC!c PK|j}|jtvryt|j||_|j d{|j rV|j J|j j|jj||j|jfftjtjrIttt j"|jt%|t%|jyy7ӭw)z_Process a new seedlist on an opened topology. Hold the lock when calling this. NrJ)rorr.r1rrarbrir]rqrZr"rerfrgr$r%rrrs)rseedlistrs r@_process_srv_updatezTopology._process_srv_update0s""   '= = EdFWFWYab""$$$   <<+ ++ LL  OOHHT..0A0AB   $ $W]] 3 *::,,$(L#D$5$56   4 %sAD&D$CD&cK|j4d{|jr|j|d{dddd{y7:77 #1d{7swYyxYww)z?Process a new list of nodes obtained from scanning SRV records.N)r|rzr)rrs r@ on_srv_updatezTopology.on_srv_updateLs]:: 9 9||..x888 9 9 98 9 9 9 9sSA)AA)!AAA A)A A)AA)A&A A&"A)c8|jj|S)aJGet a Server or None. Returns the current version of the server immediately, even if it's Unknown or absent from the topology. Only use this in unittests. In driver code, use select_server_by_address, since then you're assured a recent view of the server's type and wire protocol version. )rrrrs r@rzTopology.get_server_by_addressSs}}  ))rc||jvSrQ)rrs r@rzTopology.has_server]s$--''rcNK|j4d{|jj}|tjk7r dddd{yt |j djcdddd{S7v7>7 #1d{7swYyxYww)z!Return primary's address or None.Nr)r|rorr/ReplicaSetWithPrimaryr-_new_selectionr)rrs r@ get_primaryzTopology.get_primary`s:: N N --;;M C CC N N N ,D,?,?,AB1EMM  N N N N N N N NsbB%B B%+B B% B B%%B8 B%BB% B%B%B"B B"B%cK|j4d{|jj}|tjtj fvrt cdddd{St||jDchc]}|jc}cdddd{S77Occ}w7#1d{7swYyxYww)z+Return set of replica set member addresses.N) r|rorr/rReplicaSetNoPrimarysetiterr r)rrrrs r@_get_replica_set_membersz!Topology._get_replica_set_membersjs :: P P --;;M3311%u  P P P*.ht7J7J7L.M)NO2BJJO P P P PP P P P PsuCB3CAB> C%B5&C+!B> B7B>! C-B<.C5C7B><C>CC C CcHK|jtd{S7w)z"Return set of secondary addresses.N)rr,rs r@get_secondarieszTopology.get_secondariesys223LMMMM " "cHK|jtd{S7w)z Return set of arbiter addresses.N)rr+rs r@ get_arbiterszTopology.get_arbiters}s223JKKKKrc|jS)z1Return a document, the highest seen $clusterTime.rrs r@max_cluster_timezTopology.max_cluster_times%%%rc\|r*|jr|d|jdkDr||_yyy)N clusterTimerr cluster_times r@_receive_cluster_time_no_lockz&Topology._receive_cluster_time_no_locks= ** .1G1G 1VV)5&W rcK|j4d{|j|dddd{y7&7#1d{7swYyxYwwrQ)r|rrs r@receive_cluster_timezTopology.receive_cluster_timesJ:: = =  . .| < = = = = = = =s<A:A> A<AAAA A AcK|j4d{|jt|j|d{dddd{y7C77 #1d{7swYyxYww)z=Wake all monitors, wait for at least one to check its server.N)r|rrr)r wait_times r@request_check_allzTopology.request_check_alls`:: ? ?  # # %"4??I> > > ? ? ? > ? ? ? ?sVA2AA2*AAA A2AA2AA2A/#A& $A/+A2c|jjtjk(r|jjS|jj S)z~Return a list of all data-bearing servers. This includes any server that might be selected for an operation. )rorr/r known_serversreadable_serversrs r@data_bearing_serverszTopology.data_bearing_serverssB    * *m.B.B B$$22 2  111rc RKg}|j4d{|jD]P}|j|j}|j ||j j jfRdddd{|D])\}} |j j|d{+y775#1d{7swYExYw7 #t$rD}t|d|dd}|j|jj|d{7d}~wwxYww)NrF) r|r&rrrrgen get_overallremove_stale_socketsr _ErrorContext handle_errorr)rrrr generationexcctxs r@ update_poolzTopology.update_pools:: H H//1 Hrzz2 (C(C(EFG H H H #*  FJ kk66zBBB   H H H H HC #CJtD''(:(:(B(BCHHH sD'B<D'A$C= D'B> D'C4C5C9D'>D'CC CD'C D$ 8DDDD$$D'c xK|j4d{|j}|jjD]F}|j d{t r"|j j|jH|jj|_|jjjD](\}}||jvs||j|_ *|jrM|jj d{t s%|j j|jd|_d|_dddd{|j r|j"Jt%t&j(i|jj*|jj,|jj.|jj0|_|j"j3|j4j6|j|j8ff|j"j3|j4j:|j8fft=j>t@jBrrtEt<tFjH|j8tKtK|jtEt<tFjL|j8|jNs |j rf|jPj |jPjSdd{tUtWjX|j"yy77E7z74#1d{7swYExYw7Pw)zClear pools and terminate monitors. Topology does not reopen on demand. Any further operations will raise :exc:`~.errors.InvalidOperation`. NFTrJrGrM)-r|rorrrr}rr_monitorrrxitemsrrrzr{rarbr0r/rprnmax_set_versionmax_election_idrrir]rqrZpublish_topology_closedr"rerfrgr$r%rrrs STOP_TOPOLOGYr_rcjoinrArr)rold_tdrrrs r@rzTopology.closes ::  &&F--..0 @lln$$''..v? @ !% 1 1 7 7 9D #00DDFLLN < dmm+9;DMM'*6 <   ''--///''..t/@/@A DLDL)  .   <<+ ++ 3%%!!22!!11!!11!!44 !D  LL  OOHH))))  LL  dooEEHYHYG[\ ]  $ $W]] 3 *::,,$(L#D$5$56   &8&F&FSWSdSd    4#3#3  " " ( ( *((--a0 0 0 T\\!: ; $4q %0    v 1sN:NN:=N"N N" BN"#?N""N#=N" N:+N,G;N:'N8(/N:N"N"N:"N5(N+ )N50 N:c|jSrQ)rors r@rzTopology.descriptions   rc6|jjS)z"Pop all session ids from the pool.)rpop_allrs r@pop_all_sessionszTopology.pop_all_sessionss!!))++rc8|jj|S)z>Start or resume a server session, or raise ConfigurationError.)rget_server_session)rsession_timeout_minutess r@r?zTopology.get_server_sessions!!445LMMrc:|jj|yrQ)rreturn_server_session)rserver_sessions r@rBzTopology.return_server_sessions 00@rc@tj|jS)zmA Selection object, initially including all known servers. Hold the lock when calling this. )r)from_topology_descriptionrors r@r zTopology._new_selection s 2243D3DEErc K|jr td|jsd|_|jd{|js |j r|j j|jr6|jjtvr|jj|jjrG|jt|j dt#d|j$ddd{|j&j)D]}|jd{y77?7 w)z[Start monitors, or restart after a fork. Hold the lock when calling this. z'Cannot use AsyncMongoClient after closeTNrrM )ok serviceIdmaxWireVersion)r{rrzrrar_rcrrrrr.rkrrr(ryrrZrrrrs r@rzTopology._ensure_openeds  <<"#LM M||DL&&( ( (4#7#7&&++-  d&6&6&D&DH^&^!!&&(~~++**%,,Q/QT5F5FZ\]^mm**, F++-   ) ) s6>EECEE4EEEEEc|jj|}|y|jj|j|j ry|j j}|j}d}|rAt|dr5t|jtr|jjd}t||S)NTdetailstopologyVersion)rr_poolstale_generationsock_generation service_idrtopology_versionerrorhasattr isinstancerMdict _is_stale_error_topology_version)rrerr_ctxrcur_tvrTerror_tvs r@_is_stale_errorzTopology._is_stale_error2s""7+ > << ( ()@)@'BTBT U##44  WUI.%--. ==,,->?/AArcK|j||ry|j|}|j}|j}|jj r|s |j syt|tr |j ryt|tryt|ttfrCt|dr |j}n0t|trdnd}|jjd|}|t j"vr|t j$v}|jj s$|j't)||d{|s|j*dkr|j-|d{|j/y|j sT|jj s$|j't)||d{|j-|d{yyt|t0rt|t2ry|jj s$|j't)||d{|j-|d{|j4j7yy717 777C7,w)Ncodei{'rT)r\rrTrRrkrcompleted_handshakerVrrrrrUr^rMrr _NOT_PRIMARY_CODES_SHUTDOWN_CODESrr(max_wire_versionr request_checkrrr2 cancel_check) rrrYrrTrRerr_codedefaultis_shutting_downs r@ _handle_errorzTopology._handle_errorFs    1 w' '' >> ' ' 7C^C^  e^ ,1L1L  z *  1AB Cuf% ::$.e_#E%4 ==,,VW=><<<#+~/M/M#M ~~33../@PU/VWWW#(@(@A(E ,,z222$$&00~~33../@PU/VWWWll:... 1 0 1%!67>>//**+# $ 1 1"oo *0 g&kkm## $}}W5AAMM 57 g&22>>1--055HHXXX= Y@ $DMM$7$7$9: +OGV$$//8lln$$''..v? !!'*  +$Y%sEDH?H8A H?5H? H; A H?H?*H=+AH?;H?=H?cz|jj||jj|jS)N) client_id)rk pool_class pool_optionsrZrs r@ruz Topology._create_pool_for_servers5~~(( T^^00D>)) ~~(( )44CTCT)  rc0|jjtjtjfv}|rd}n,|jjtj k(rd}nd}|jj r|tur|ryd|zSd|d|dSt|jj}t|jjj}|s-|r&d j||jjSd |zS|d jtfd |d dD}|r=d|zS|r)t!|j#|j$sd|zSt'Sdj)d|DS)zeFormat an error message if server selection fails. Hold the lock when calling this. zreplica set membersmongosesrzNo primary available for writeszNo %s available for writeszNo z match selector ""z)No {} available for replica set name "{}"zNo %s availablerc3<K|]}|jk(ywrQr_).0rrTs r@ z*Topology._error_message..sGv||u,GsrMNzNo %s found yetz\Could not reach any servers in %s. Replica set is configured with internal hostnames or IPs?,c3`K|]&}|jst|j(ywrQ)rTstr)rrs r@rz*Topology._error_message..sXf6<<FLL 1Xs..)rorr/rr Shardedr$r-rwrxrformatrkrnrTallr  intersectionryrr8)rris_replica_set server_plural addressesrsamerTs @r@rzTopology._error_messages **88  / /  - -=   1M    , , 0E0E E&M%M    * *33!<7-GG]O+)rz __class____name__ro)rmsgs r@__repr__zTopology.__repr__"s>||C4>>**+1SE$2C2C1FaHHrc|j}tt|j|j|j |j fS)zDThe properties to use for AsyncMongoClient/Topology equality checks.)rktuplesortedrtrnrsrv_service_name)rtss r@eq_propszTopology.eq_props(s8 ^^fRXX&')<)tuple[tuple[_Address, ...], Optional[str], Optional[str], str])robjectrXrY)rXr)1r __module__ __qualname____doc__rrrrrrrrrrrrrrr rrrrrrr"r&r0rpropertyrr=r?rBr rr\rjr,rrrurtrrrrrrrr@rCrChs4j4X%(N59&*&* '2''#2 ' $ ' $ ' 'RJ#2J#J# J# $ J# $ J# !J#`59&*8<&*2#2  $  6 $ 259&*8<&* 2  #2  $  6 $   L37&*    #0  $    H!&+ C%-C%C% $ C%  C%P!&+ U-UU $ U  U289*(N P8 P  PNL& 6=? 2"ArsC" HHDDR@*.     -8>5 4>(() "K%K%\%%$8+87R8 85OS " "5K " "r