+\hRdZddlmZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl m Z ddlmZmZmZmZmZmZddlmZmZmZmZddlmZmZmZmZmZm Z m!Z!m"Z"m#Z#ddl$m%Z%dd l&m'Z'm(Z(m)Z)dd l*m+Z+m,Z,m-Z-m.Z.m/Z/dd l0m1Z1dd l2m3Z3dd l4m5Z5m6Z6m7Z7m8Z8m9Z9ddl:m;Z;mZ>m?Z?ddl@mAZAddlBmCZCddlDmEZEmFZFmGZGmHZHmIZIerddlJmKZKddlLmMZMddlNmOZOmPZPdZQeRe eSjZUddZVGddZWGddZX d dZYd!dZZ d" d#dZ[y)$z>c:eZdZdZd/dZd0dZd1dZ d2 d3dZ d4dZ d5 d6dZ d5 d6d Z d7 d8d Z d9 d:d Z d9 d:d Z d;d Zd;dZddZd?dZd@dZd@dZdAdZdBdZdBdZdCdDdZdEdZd0dZd0dZedFdZdGdZdHdZ dIdZ!dJd Z"d0d!Z#dKd"Z$dLd#Z%dLd$Z&d0d%Z'd0d&Z(dMd'Z)dMd(Z*dNd)Z+d0d*Z,dOd+Z-dPd,Z.dQd-Z/dRd.Z0y)STopologyz*Monitor a topology of one or more servers.c  |j|_|jj|_|jduxr|jj|_|jduxr|jj |_d|_d|_ |j s |jrtjd|_tjtjr*t!tt"j$|j|jrJ|jJ|jj'|jj(|jff||_t-|j/|j1|j2dd|}||_t-t6j8iddd|j*}|jrV|jJ|jj'|jj:||j4|jfftjtjrHt!tt"j<|jt?|t?|j4|j@D]}|j rK|jJ|jj'|jjB||jfftjtjs~t!tt"jD|j|d|dtG|jI|_%d|_&d|_'tQ|_)tU|jRtVr|j*jXnd|_-i|_.d|_/d|_0tc|_2|j s |jr|jJd fd }tgjhtjjltjjn|d }tqjr|j|jt||_ |jwd|_<|j*jz1|j*j|st||j*|_<g|_@y) Nd)maxsizemessage topologyIdrHrIpreviousDescriptionnewDescriptionr)rHrI serverHost serverPortFctSN)rA)weaksr@targetz!Topology.__init__..targets+D11pymongo_events_thread)interval min_intervalrSname)returnbool)A _topology_id _pool_options_event_listeners _listenersenabled_for_server_publish_serverenabled_for_topology _publish_tp_events_Topology__events_executorr9Queuer isEnabledForloggingDEBUGrrSTART_TOPOLOGYputpublish_topology_opened _settingsr0get_topology_typeget_server_descriptionsreplica_set_name _descriptionr/Unknown$publish_topology_description_changedTOPOLOGY_CHANGEreprseedspublish_server_opened START_SERVERlistserver_descriptions_seed_addresses_opened_closedr_lockr_IS_SYNCcondition_class _condition_servers_pid_max_cluster_timer) _session_poolrPeriodicExecutorr EVENTS_QUEUE_FREQUENCYMIN_HEARTBEAT_INTERVALweakrefrefcloseopen _srv_monitorfqdn load_balancedr+_monitor_tasks)selftopology_settingstopology_description initial_tdseedrSexecutorrRs @r@__init__zTopology.__init__ks-::+99JJ#d:at?a?a??$6_4??;_;_ &*   4#3#3 ;;s3DL  $ $W]] 3 *99,,    <<+ ++ LL  dooEEHYHYG[\ ]*2  / / 1  5 5 7  . .     1(  ! !2tT4    <<+ ++ LL  OOHH!2!2D4E4EF   $ $W]] 3 *::,,$($4#D$5$56  &++ D##||///   $//"G"G$PTPaPaIb!cd((7 .;;#00#Aw#Aw   $$8$L$L$NO  !^ + JJ(66 13 #' 8</1   4#3#3<<+ ++ 2)9966#::, H;;t||X^^>   *4>>3O3O *4 @D 24rTc*tj}|j||_n||jk7r||_tjdddk\r dt fi}nddi}t j di||j5|jjD]}|j|jjddd|j5|jdddy#1swY/xYw#1swYyxYw)aStart monitoring, or restart after a fork. No effect if called multiple times. .. warning:: Topology is shared among multiple threads and is protected by mutual exclusion. Using Topology from a process other than the one that initialized it will emit a warning and may result in deadlock. To prevent this from happening, MongoClient must be created after any forking. N) skip_file_prefixes stacklevel)zMongoClient opened before fork. May not be entirely fork-safe, proceed with caution. See PyMongo's documentation for details: https://dochub.mongodb.org/core/pymongo-fork-deadlock)osgetpidrsys version_info _pymongo_dirwarningswarnr}rvaluesrrreset_ensure_opened)rpidkwargsservers r@rz Topology.opensiik 99 DI DII DI#w..@&* MMH     +"mm224#FLLN#""((*  +ZZ "    ! " " + + " "sA C=#D =D Dc^tj}||jjS|SrQ)r remainingrlserver_selection_timeout)rtimeouts r@get_server_selection_timeoutz%Topology.get_server_selection_timeouts(//# ?>>:: :rTNc V||j}n|}ts|jr|j|j5|j |||||}|Dcgc]+}t t|j|j-c}cdddScc}w#1swYyxYw)aReturn a list of Servers matching selector, or time out. :param selector: function that takes a list of Servers and returns a subset of them. :param operation: The name of the operation that the server is being selected for. :param server_selection_timeout: maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. :param address: optional server address to select. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. N) rr~rcleanup_monitorsr}_select_servers_loopr r-get_server_by_addressaddress) rselector operationrr operation_idserver_timeoutrysds r@select_serverszTopology.select_serverss. $ +!>>@N5ND//  ! ! # ZZ "&";";.)\7#  PcIKVT77 CD       sB0BBBB(cVtj}||z}d}tjtj rLt ttj||||j|jjj|jj|||jj} | s|dk(s||kDrtjtj r\t ttj ||||j|jjj|j#|t%|j#|d|d|j|spt ttj&||||j|jjjt)d|tjz z d }|j+|j-t/|j0t2j4|jj7tj}|jj|||jj} | s|jj7| S) z7select_servers() guts. Hold the lock when calling this.F)rHrr operationIdtopologyDescriptionclientId)custom_selectorr)rHrrrrrfailurez , Timeout: zs, Topology Description: i)rHrrrrrremainingTimeMST)time monotonicrrfrgrhrr STARTED description_topology_settingsr[rpapply_selectorrlserver_selectorFAILED_error_messagerWAITINGintr_request_check_allrrr rcheck_compatible) rrrrrrnowend_timelogged_waitingrys r@rzTopology._select_servers_loop.s:nn= # 0 0 ? (5==!#($($4$4))<<II #//>> gt~~/M/M? &!|sX~+88G0 = D D!)"+$0,0,<,>;K0K(L$M "&    !  # # % t(E(E F    . . 0.."C"&"3"3"B"B'4>>3Q3Q#C# S&Z **,""rTc|j|||||}t||}t|dk(r|dStj|d\}} |j j | j j kr|S| S)NrMrr)r_filter_serverslenrandomsamplepooloperation_count) rrrrrdeprioritized_serversrserversserver1server2s r@_select_serverzTopology._select_serverzs%% i!97L "'+@A w<1 1: !==!4 << ' '7<<+G+G GNNrTc |j||||||}tjr)tj|jj t jtjr|tt tj||||j|jjj|jjd|jjd |S)zALike select_servers, but choose a random server if several match.rrrM)rHrrrrrrNrO)rr get_timeoutset_rttrmin_round_trip_timerrfrgrhrr SUCCEEDEDrr[r)rrrrrrrrs r@ select_serverzTopology.select_servers$$   $  !% %      MM&,,@@ A # 0 0 ? (5??!#($($4$4))<<II!--55a8!--55a8  rTc6|jt||||S)a=Return a Server for "address", reconnecting if necessary. If the server's type is not known, request an immediate check of all servers. Time out after "server_selection_timeout" if the server cannot be reached. :param address: A (host, port) pair. :param operation: The name of the operation that the server is being selected for. :param server_selection_timeout: maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. :param operation_id: The unique id of the current operation being performed. Defaults to None if not provided. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. r)rr$)rrrrrs r@select_server_by_addressz!Topology.select_server_by_addresss+2!!   $ % "  rTc l|j}|j|j}t||ryt |j|}|j s)|j r^|jtjk(rA|jj|j}|r|jj||k(}|jrY|sW|jJ|jj!|j"j$|||j|j&ff||_|j)|j*rX|sV|jJ|jj!|j"j,||j|j&fft/j0t2j4rJ|sHt7t.t8j:|j&t=|t=|j|j>r~|jtj@k(ra|jjtBvrE|j>jEtFs%|jHjK|j>|jLjOy)ziProcess a new ServerDescription on an opened topology. Hold the lock when calling this. NrJ)(rp_server_descriptionsr_is_stale_server_descriptionr2 is_readableis_server_type_known topology_typer/Singlergetrreadyr`rcrjr^"publish_server_description_changedr[_update_serversrbrrrrfrgrhrrrsrtrrqr.rr~rappendr notify_all) rserver_description reset_poolinterrupt_connectionstd_oldsd_oldnew_tdrsuppress_events r@_process_changezTopology._process_changes'"",,-?-G-GH '0B C -d.?.?AST  ) )  3 38L8LP]PdPd8d]]&&'9'A'ABF !!##55   <<+ ++ LL  OOFF/1C1K1KTM^M^_ #    N<<+ ++ LL  OOHHT..0A0AB   $ $W]] 3N *::,,$(L#D$5$56       M$9$9 9!!//7MM    # # %##**4+<+<= ""$rTc\|j5|jr8|jj|jr|j |||ddd|rE|j j|j}|r|jj|yyy#1swYQxYw)z>Process a new ServerDescription after an hello call completes.N)r) r}r{rp has_serverrrrrrr)rrrrrs r@ on_changezTopology.on_changesZZ \|| 1 1 < <=O=W=W X$$%7EZ[ \ ]]&&'9'A'ABF !!8M!N  \ \s AB""B+c 4|j}|jtvryt|j||_|j |j rV|j J|j j|jj||j|jfftjtjrIttt j"|jt%|t%|jyy)z_Process a new seedlist on an opened topology. Hold the lock when calling this. NrJ)rprr.r1rrbrcrjr^rrr[rrfrgrhrrrsrt)rseedlistrs r@_process_srv_updatezTopology._process_srv_update0s""   '= = EdFWFWYab    <<+ ++ LL  OOHHT..0A0AB   $ $W]] 3 *::,,$(L#D$5$56   4rTc|j5|jr|j|dddy#1swYyxYw)z?Process a new list of nodes obtained from scanning SRV records.N)r}r{r)rrs r@ on_srv_updatezTopology.on_srv_updateLs5ZZ 3||((2 3 3 3s4=c8|jj|S)aJGet a Server or None. Returns the current version of the server immediately, even if it's Unknown or absent from the topology. Only use this in unittests. In driver code, use select_server_by_address, since then you're assured a recent view of the server's type and wire protocol version. )rrrrs r@rzTopology.get_server_by_addressSs}}  ))rTc||jvSrQ)rrs r@rzTopology.has_server]s$--''rTc|j5|jj}|tjk7r dddyt |j djcdddS#1swYyxYw)z!Return primary's address or None.Nr)r}rprr/ReplicaSetWithPrimaryr'_new_selectionr)rrs r@ get_primaryzTopology.get_primary`spZZ N --;;M C CC N N ,D,?,?,AB1EMM  N N Ns+A0%A00A9cT|j5|jj}|tjtj fvrt cdddSt||jDchc]}|jc}cdddScc}w#1swYyxYw)z+Return set of replica set member addresses.N) r}rprr/rReplicaSetNoPrimarysetiterr r)rrrrs r@_get_replica_set_membersz!Topology._get_replica_set_membersjsZZ P --;;M3311%u  P P*.ht7J7J7L.M)NO2BJJO P PP P Ps$AB!B:B BBB'c,|jtS)z"Return set of secondary addresses.)rr&rs r@get_secondarieszTopology.get_secondariesws,,-FGGrTc,|jtS)z Return set of arbiter addresses.)rr%rs r@ get_arbiterszTopology.get_arbiters{s,,-DEErTc|jS)z1Return a document, the highest seen $clusterTime.rrs r@max_cluster_timezTopology.max_cluster_times%%%rTc\|r*|jr|d|jdkDr||_yyy)N clusterTimerr cluster_times r@_receive_cluster_time_no_lockz&Topology._receive_cluster_time_no_locks= ** .1G1G 1VV)5&W rTch|j5|j|dddy#1swYyxYwrQ)r}rrs r@receive_cluster_timezTopology.receive_cluster_times, ZZ =  . .| < = = =s(1c|j5|jt|j|dddy#1swYyxYw)z=Wake all monitors, wait for at least one to check its server.N)r}rrr)r wait_times r@request_check_allzTopology.request_check_alls8 ZZ 3  # # % t 2 3 3 3s '=Ac|jjtjk(r|jjS|jj S)z~Return a list of all data-bearing servers. This includes any server that might be selected for an operation. )rprr/r known_serversreadable_serversrs r@data_bearing_serverszTopology.data_bearing_serverssB    * *m.B.B B$$22 2  111rTc g}|j5|jD]P}|j|j}|j ||j j jfR ddd|D]!\}} |j j|#y#1swY0xYw#t$r;}t|d|dd}|j|jj|d}~wwxYw)NrF) r}r%rrrrgen get_overallremove_stale_socketsr _ErrorContext handle_errorr)rrrr generationexcctxs r@ update_poolzTopology.update_pools ZZ H//1 Hrzz2 (C(C(EFG H H #*  FJ  00<   H H  #CJtD!!&"4"4"<"}|j t r|j j|j@|jj|_|jjjD](\}}||jvs||j|_ *|jrE|jj t s%|j j|jd|_d|_ddd|j r|j"Jt%t&j(i|jj*|jj,|jj.|jj0|_|j"j3|j4j6|j|j8ff|j"j3|j4j:|j8fft=j>t@jBrrtEt<tFjH|j8tKtK|jtEt<tFjL|j8|jNs |j r^|jPj |jPjSdtUtWjX|j"yy#1swY(xYw)zClear pools and terminate monitors. Topology does not reopen on demand. Any further operations will raise :exc:`~.errors.InvalidOperation`. FTNrJrGrM)-r}rprrrr~rr_monitorrryitemsrrr{r|rbrcr0r/rqromax_set_versionmax_election_idrrjr^rrr[publish_topology_closedrrfrgrhrrrsrt STOP_TOPOLOGYr`rdjoinrArr)rold_tdrrrs r@rzTopology.closes ZZ &&F--..0 @ ''..v? @ !% 1 1 7 7 9D #00DDFLLN < dmm+9;DMM'*6 <   !!'')''..t/@/@A DLDL) .   <<+ ++ 3%%!!22!!11!!11!!44 !D  LL  OOHH))))  LL  dooEEHYHYG[\ ]  $ $W]] 3 *::,,$(L#D$5$56   &8&F&FSWSdSd    4#3#3  " " ( ( *  " " ' ' * T\\!: ; $4q  sAM,BM,A5M,,M6c|jSrQ)rprs r@rzTopology.descriptions   rTc6|jjS)z"Pop all session ids from the pool.)rpop_allrs r@pop_all_sessionszTopology.pop_all_sessionss!!))++rTc8|jj|S)z>Start or resume a server session, or raise ConfigurationError.)rget_server_session)rsession_timeout_minutess r@r>zTopology.get_server_sessions!!445LMMrTc:|jj|yrQ)rreturn_server_session)rserver_sessions r@rAzTopology.return_server_sessions 00@rTc@tj|jS)zmA Selection object, initially including all known servers. Hold the lock when calling this. )r#from_topology_descriptionrprs r@r zTopology._new_selections 2243D3DEErTc h|jr td|jsd|_|j|js |j r|j j|jr6|jjtvr|jj|jjr?|jt|j dt#d|j$dd|j&j)D]}|jy)z[Start monitors, or restart after a fork. Hold the lock when calling this. z"Cannot use MongoClient after closeTrrM )ok serviceIdmaxWireVersionN)r|rr{rrbr`rdrrrrr.rlrrr"rzrr[rrrrs r@rzTopology._ensure_openeds <<"#GH H||DL  "4#7#7&&++-  d&6&6&D&DH^&^!!&&(~~++$$%,,Q/QT5F5FZ\]^mm**, F KKM rTc|jj|}|y|jj|j|j ry|j j}|j}d}|rAt|dr5t|jtr|jjd}t||S)NTdetailstopologyVersion)rr_poolstale_generationsock_generation service_idrtopology_versionerrorhasattr isinstancerLdict _is_stale_error_topology_version)rrerr_ctxrcur_tvrSerror_tvs r@_is_stale_errorzTopology._is_stale_error0s""7+ > << ( ()@)@'BTBT U##44  WUI.%--. ==,,->?/AArTc|j||ry|j|}|j}|j}|jj r|s |j syt|tr |j ryt|tryt|ttfr#t|dr |j}n0t|trdnd}|jjd|}|t j"vrw|t j$v}|jj s|j't)|||s|j*dkr|j-||j/y|j sD|jj s|j't)|||j-|yyt|t0rot|t2ry|jj s|j't)|||j-||j4j7yy)Ncodei{'rS)r[rrSrQrlrcompleted_handshakerUrrrrrTr]rLrr _NOT_PRIMARY_CODES_SHUTDOWN_CODESrr"max_wire_versionr request_checkrrr1 cancel_check) rrrXrrSrQerr_codedefaultis_shutting_downs r@ _handle_errorzTopology._handle_errorDs    1 w' '' >> ' ' 7C^C^  e^ ,1L1L  z *  1AB Cuf% ::$.e_#E%4 ==,,VW=><<<#+~/M/M#M ~~33(():7%)PQ#(@(@A(ELL,$$&00~~33(():7%)PQ Z( 1 0 1%!67>>//$$%6we%LM LL $ OO ( ( *2rTcj|j5|j||dddy#1swYyxYw)zHandle an application error. May reset the server to Unknown, clear the pool, and request an immediate check depending on the error and the context. N)r}ri)rrrXs r@r+zTopology.handle_errors0 ZZ 1   w 0 1 1 1s)2cb|jjD]}|jy)z3Wake all monitors. Hold the lock when calling this.N)rrrdrJs r@rzTopology._request_check_alls*mm**, #F  " #rTc <|jjjD]S\}}||jvr|jj |||j ||j}d}|jr+|jtj|j}t||j|||j|j|}||j|<|j|j|j j"}||j|_||j"k7s"|j|j$j'|j"Vt)|jjD]w\}}|jj+|r"|j-t.s%|j0j3|j4|jj7|yy)zrSync our Servers from TopologyDescription.server_descriptions. Hold the lock while calling this. )rtopologyrrN)rrmonitor topology_id listenersevents)rpryr2rrl monitor_class_create_pool_for_monitorr`rcrrr-_create_pool_for_serverr[r^rr is_writablerupdate_is_writablerxrrr~rrr1pop)rrrrnrRr was_writables r@rzTopology._update_serverss  ,,@@BHHJ SKGRdmm+..66')!66w?&*nn 7''DLL,D";;t||4D')55g># $ 1 1"oo *0 g&  $}}W5AAMM 57 g&22>>1MM'*//BB2>>R= S@ $DMM$7$7$9: +OGV$$//8 ''..v? !!'*  +rTcz|jj||jj|jS)N) client_id)rl pool_class pool_optionsr[rs r@rtz Topology._create_pool_for_servers5~~(( T^^00D>)) ~~(( )44CTCT)  rTc0|jjtjtjfv}|rd}n,|jjtj k(rd}nd}|jj r|tur|ryd|zSd|d|dSt|jj}t|jjj}|s-|r&d j||jjSd |zS|d jtfd |d dD}|r=d|zS|r)t!|j#|j$sd|zSt'Sdj)d|DS)zeFormat an error message if server selection fails. Hold the lock when calling this. zreplica set membersmongosesrzNo primary available for writeszNo %s available for writeszNo z match selector ""z)No {} available for replica set name "{}"zNo %s availablerc3<K|]}|jk(ywrQr^).0rrSs r@ z*Topology._error_message..sGv||u,GsrMNzNo %s found yetz\Could not reach any servers in %s. Replica set is configured with internal hostnames or IPs?,c3`K|]&}|jst|j(ywrQ)rSstr)rrs r@rz*Topology._error_message..sXf6<<FLL 1Xs..)rprr/rr Shardedr#r'rxryrformatrlrorSallr  intersectionrzrr7)rris_replica_set server_plural addressesrsamerSs @r@rzTopology._error_messages **88  / /  - -=   1M    , , 0E0E E&M%M    * *33!<7-GG]O+)r{ __class____name__rp)rmsgs r@__repr__zTopology.__repr__ s>||C4>>**+1SE$2C2C1FaHHrTc|j}tt|j|j|j |j fS)z?The properties to use for MongoClient/Topology equality checks.)rltuplesortedrurorsrv_service_name)rtss r@eq_propszTopology.eq_props&s8 ^^fRXX&')<)tuple[tuple[_Address, ...], Optional[str], Optional[str], str])robjectrYrZ)rYr)1r __module__ __qualname____doc__rrrrrrrrrrrrrrr rrrrrrr!r%r/rpropertyrr<r>rAr rr[rir+rrrtrsrrrrrrrTr@rCrChs4j4X%"N59&*&* '2''#2 ' $ ' $ ' 'RJ#2J#J# J# $ J# $ J# !J#`59&*8<&*2#2  $  6 $ 259&*8<&* 2  #2  $  6 $   L37&*    #0  $    H!&+ C%-C%C% $ C%  C%P!&+ O-OO $ O  O283*(N PHF& 6=3 2"ArsC" HHDD     -8R?)-=5 4>(() "I%I%X%%$8+87R8 85OS " "5K " "rT