notify.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. package gokb
  2. // Package kb is a pure Go Kingbase driver for the database/sql package.
  3. // This module contains support for Kingbase LISTEN/NOTIFY.
  4. import (
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. // Notification represents a single notification from the database.
  12. type Notification struct {
  13. // Process ID (PID) of the notifying kingbase backend.
  14. BePid int
  15. // Name of the channel the notification was sent on.
  16. Channel string
  17. // Payload, or the empty string if unspecified.
  18. Extra string
  19. }
  20. func recvNotification(r *readBuf) *Notification {
  21. bePid := r.int32()
  22. channel := r.string()
  23. extra := r.string()
  24. return &Notification{bePid, channel, extra}
  25. }
  26. const (
  27. connStateIdle int32 = iota
  28. connStateExpectResponse
  29. connStateExpectReadyForQuery
  30. )
  31. type message struct {
  32. typ byte
  33. err error
  34. }
  35. var errListenerConnClosed = errors.New("kb: ListenerConn has been closed")
  36. // ListenerConn is a low-level interface for waiting for notifications. You
  37. // should use Listener instead.
  38. type ListenerConn struct {
  39. // guards cn and err
  40. connectionLock sync.Mutex
  41. cn *conn
  42. err error
  43. connState int32
  44. // the sending goroutine will be holding this lock
  45. senderLock sync.Mutex
  46. notificationChan chan<- *Notification
  47. replyChan chan message
  48. }
  49. // NewListenerConn creates a new ListenerConn. Use NewListener instead.
  50. func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
  51. return newDialListenerConn(defaultDialer{}, name, notificationChan)
  52. }
  53. func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
  54. cn, err := DialOpen(d, name)
  55. if err != nil {
  56. return nil, err
  57. }
  58. l := &ListenerConn{
  59. cn: cn.(*conn),
  60. notificationChan: c,
  61. connState: connStateIdle,
  62. replyChan: make(chan message, 2),
  63. }
  64. go l.listenerConnMain()
  65. return l, nil
  66. }
  67. // We can only allow one goroutine at a time to be running a query on the
  68. // connection for various reasons, so the goroutine sending on the connection
  69. // must be holding senderLock.
  70. //
  71. // Returns an error if an unrecoverable error has occurred and the ListenerConn
  72. // should be abandoned.
  73. func (l *ListenerConn) acquireSenderLock() error {
  74. // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
  75. l.senderLock.Lock()
  76. l.connectionLock.Lock()
  77. err := l.err
  78. l.connectionLock.Unlock()
  79. if err != nil {
  80. l.senderLock.Unlock()
  81. return err
  82. }
  83. return nil
  84. }
  85. func (l *ListenerConn) releaseSenderLock() {
  86. l.senderLock.Unlock()
  87. }
  88. // setState advances the protocol state to newState. Returns false if moving
  89. // to that state from the current state is not allowed.
  90. func (l *ListenerConn) setState(newState int32) bool {
  91. var expectedState int32
  92. switch newState {
  93. case connStateIdle:
  94. expectedState = connStateExpectReadyForQuery
  95. case connStateExpectResponse:
  96. expectedState = connStateIdle
  97. case connStateExpectReadyForQuery:
  98. expectedState = connStateExpectResponse
  99. default:
  100. panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
  101. }
  102. return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
  103. }
  104. // Main logic is here: receive messages from the kingbase backend, forward
  105. // notifications and query replies and keep the internal state in sync with the
  106. // protocol state. Returns when the connection has been lost, is about to go
  107. // away or should be discarded because we couldn't agree on the state with the
  108. // server backend.
  109. func (l *ListenerConn) listenerConnLoop() (err error) {
  110. defer errRecoverNoErrBadConn(&err)
  111. r := &readBuf{}
  112. for {
  113. t, err := l.cn.recvMessage(r)
  114. if err != nil {
  115. return err
  116. }
  117. switch t {
  118. case 'A':
  119. // recvNotification copies all the data so we don't need to worry
  120. // about the scratch buffer being overwritten.
  121. l.notificationChan <- recvNotification(r)
  122. case 'T', 'D':
  123. // only used by tests; ignore
  124. case 'E':
  125. // We might receive an ErrorResponse even when not in a query; it
  126. // is expected that the server will close the connection after
  127. // that, but we should make sure that the error we display is the
  128. // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
  129. if !l.setState(connStateExpectReadyForQuery) {
  130. return parseError(r)
  131. }
  132. l.replyChan <- message{t, parseError(r)}
  133. case 'C', 'I':
  134. if !l.setState(connStateExpectReadyForQuery) {
  135. // protocol out of sync
  136. return fmt.Errorf("unexpected CommandComplete")
  137. }
  138. // ExecSimpleQuery doesn't need to know about this message
  139. case 'Z':
  140. if !l.setState(connStateIdle) {
  141. // protocol out of sync
  142. return fmt.Errorf("unexpected ReadyForQuery")
  143. }
  144. l.replyChan <- message{t, nil}
  145. case 'S':
  146. // ignore
  147. case 'N':
  148. if n := l.cn.noticeHandler; n != nil {
  149. n(parseError(r))
  150. }
  151. default:
  152. return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
  153. }
  154. }
  155. }
  156. // This is the main routine for the goroutine receiving on the database
  157. // connection. Most of the main logic is in listenerConnLoop.
  158. func (l *ListenerConn) listenerConnMain() {
  159. err := l.listenerConnLoop()
  160. // listenerConnLoop terminated; we're done, but we still have to clean up.
  161. // Make sure nobody tries to start any new queries by making sure the err
  162. // pointer is set. It is important that we do not overwrite its value; a
  163. // connection could be closed by either this goroutine or one sending on
  164. // the connection -- whoever closes the connection is assumed to have the
  165. // more meaningful error message (as the other one will probably get
  166. // net.errClosed), so that goroutine sets the error we expose while the
  167. // other error is discarded. If the connection is lost while two
  168. // goroutines are operating on the socket, it probably doesn't matter which
  169. // error we expose so we don't try to do anything more complex.
  170. l.connectionLock.Lock()
  171. if l.err == nil {
  172. l.err = err
  173. }
  174. l.cn.Close()
  175. l.connectionLock.Unlock()
  176. // There might be a query in-flight; make sure nobody's waiting for a
  177. // response to it, since there's not going to be one.
  178. close(l.replyChan)
  179. // let the listener know we're done
  180. close(l.notificationChan)
  181. // this ListenerConn is done
  182. }
  183. // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
  184. func (l *ListenerConn) Listen(channel string) (bool, error) {
  185. return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
  186. }
  187. // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
  188. func (l *ListenerConn) Unlisten(channel string) (bool, error) {
  189. return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
  190. }
  191. // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
  192. func (l *ListenerConn) UnlistenAll() (bool, error) {
  193. return l.ExecSimpleQuery("UNLISTEN *")
  194. }
  195. // Ping the remote server to make sure it's alive. Non-nil error means the
  196. // connection has failed and should be abandoned.
  197. func (l *ListenerConn) Ping() error {
  198. sent, err := l.ExecSimpleQuery("")
  199. if !sent {
  200. return err
  201. }
  202. if err != nil {
  203. // shouldn't happen
  204. panic(err)
  205. }
  206. return nil
  207. }
  208. // Attempt to send a query on the connection. Returns an error if sending the
  209. // query failed, and the caller should initiate closure of this connection.
  210. // The caller must be holding senderLock (see acquireSenderLock and
  211. // releaseSenderLock).
  212. func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
  213. defer errRecoverNoErrBadConn(&err)
  214. // must set connection state before sending the query
  215. if !l.setState(connStateExpectResponse) {
  216. panic("two queries running at the same time")
  217. }
  218. // Can't use l.cn.writeBuf here because it uses the scratch buffer which
  219. // might get overwritten by listenerConnLoop.
  220. b := &writeBuf{
  221. buf: []byte("Q\x00\x00\x00\x00"),
  222. pos: 1,
  223. }
  224. b.string(q)
  225. l.cn.send(b)
  226. return nil
  227. }
  228. // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
  229. // parameters) on the connection. The possible return values are:
  230. // 1) "executed" is true; the query was executed to completion on the
  231. // database server. If the query failed, err will be set to the error
  232. // returned by the database, otherwise err will be nil.
  233. // 2) If "executed" is false, the query could not be executed on the remote
  234. // server. err will be non-nil.
  235. //
  236. // After a call to ExecSimpleQuery has returned an executed=false value, the
  237. // connection has either been closed or will be closed shortly thereafter, and
  238. // all subsequently executed queries will return an error.
  239. func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
  240. if err = l.acquireSenderLock(); err != nil {
  241. return false, err
  242. }
  243. defer l.releaseSenderLock()
  244. err = l.sendSimpleQuery(q)
  245. if err != nil {
  246. // We can't know what state the protocol is in, so we need to abandon
  247. // this connection.
  248. l.connectionLock.Lock()
  249. // Set the error pointer if it hasn't been set already; see
  250. // listenerConnMain.
  251. if l.err == nil {
  252. l.err = err
  253. }
  254. l.connectionLock.Unlock()
  255. l.cn.c.Close()
  256. return false, err
  257. }
  258. // now we just wait for a reply..
  259. for {
  260. m, ok := <-l.replyChan
  261. if !ok {
  262. // We lost the connection to server, don't bother waiting for a
  263. // a response. err should have been set already.
  264. l.connectionLock.Lock()
  265. err := l.err
  266. l.connectionLock.Unlock()
  267. return false, err
  268. }
  269. switch m.typ {
  270. case 'Z':
  271. // sanity check
  272. if m.err != nil {
  273. panic("m.err != nil")
  274. }
  275. // done; err might or might not be set
  276. return true, err
  277. case 'E':
  278. // sanity check
  279. if m.err == nil {
  280. panic("m.err == nil")
  281. }
  282. // server responded with an error; ReadyForQuery to follow
  283. err = m.err
  284. default:
  285. return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
  286. }
  287. }
  288. }
  289. // Close closes the connection.
  290. func (l *ListenerConn) Close() error {
  291. l.connectionLock.Lock()
  292. if l.err != nil {
  293. l.connectionLock.Unlock()
  294. return errListenerConnClosed
  295. }
  296. l.err = errListenerConnClosed
  297. l.connectionLock.Unlock()
  298. // We can't send anything on the connection without holding senderLock.
  299. // Simply close the net.Conn to wake up everyone operating on it.
  300. return l.cn.c.Close()
  301. }
  302. // Err returns the reason the connection was closed. It is not safe to call
  303. // this function until l.Notify has been closed.
  304. func (l *ListenerConn) Err() error {
  305. return l.err
  306. }
  307. var errListenerClosed = errors.New("kb: Listener has been closed")
  308. // ErrChannelAlreadyOpen is returned from Listen when a channel is already
  309. // open.
  310. var ErrChannelAlreadyOpen = errors.New("kb: channel is already open")
  311. // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
  312. var ErrChannelNotOpen = errors.New("kb: channel is not open")
  313. // ListenerEventType is an enumeration of listener event types.
  314. type ListenerEventType int
  315. const (
  316. // ListenerEventConnected is emitted only when the database connection
  317. // has been initially initialized. The err argument of the callback
  318. // will always be nil.
  319. ListenerEventConnected ListenerEventType = iota
  320. // ListenerEventDisconnected is emitted after a database connection has
  321. // been lost, either because of an error or because Close has been
  322. // called. The err argument will be set to the reason the database
  323. // connection was lost.
  324. ListenerEventDisconnected
  325. // ListenerEventReconnected is emitted after a database connection has
  326. // been re-established after connection loss. The err argument of the
  327. // callback will always be nil. After this event has been emitted, a
  328. // nil kb.Notification is sent on the Listener.Notify channel.
  329. ListenerEventReconnected
  330. // ListenerEventConnectionAttemptFailed is emitted after a connection
  331. // to the database was attempted, but failed. The err argument will be
  332. // set to an error describing why the connection attempt did not
  333. // succeed.
  334. ListenerEventConnectionAttemptFailed
  335. )
  336. // EventCallbackType is the event callback type. See also ListenerEventType
  337. // constants' documentation.
  338. type EventCallbackType func(event ListenerEventType, err error)
  339. // Listener provides an interface for listening to notifications from a
  340. // Kingbase database. For general usage information, see section
  341. // "Notifications".
  342. //
  343. // Listener can safely be used from concurrently running goroutines.
  344. type Listener struct {
  345. // Channel for receiving notifications from the database. In some cases a
  346. // nil value will be sent. See section "Notifications" above.
  347. Notify chan *Notification
  348. name string
  349. minReconnectInterval time.Duration
  350. maxReconnectInterval time.Duration
  351. dialer Dialer
  352. eventCallback EventCallbackType
  353. lock sync.Mutex
  354. isClosed bool
  355. reconnectCond *sync.Cond
  356. cn *ListenerConn
  357. connNotificationChan <-chan *Notification
  358. channels map[string]struct{}
  359. }
  360. // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
  361. //
  362. // name should be set to a connection string to be used to establish the
  363. // database connection (see section "Connection String Parameters" above).
  364. //
  365. // minReconnectInterval controls the duration to wait before trying to
  366. // re-establish the database connection after connection loss. After each
  367. // consecutive failure this interval is doubled, until maxReconnectInterval is
  368. // reached. Successfully completing the connection establishment procedure
  369. // resets the interval back to minReconnectInterval.
  370. //
  371. // The last parameter eventCallback can be set to a function which will be
  372. // called by the Listener when the state of the underlying database connection
  373. // changes. This callback will be called by the goroutine which dispatches the
  374. // notifications over the Notify channel, so you should try to avoid doing
  375. // potentially time-consuming operations from the callback.
  376. func NewListener(name string,
  377. minReconnectInterval time.Duration,
  378. maxReconnectInterval time.Duration,
  379. eventCallback EventCallbackType) *Listener {
  380. return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
  381. }
  382. // NewDialListener is like NewListener but it takes a Dialer.
  383. func NewDialListener(d Dialer,
  384. name string,
  385. minReconnectInterval time.Duration,
  386. maxReconnectInterval time.Duration,
  387. eventCallback EventCallbackType) *Listener {
  388. l := &Listener{
  389. name: name,
  390. minReconnectInterval: minReconnectInterval,
  391. maxReconnectInterval: maxReconnectInterval,
  392. dialer: d,
  393. eventCallback: eventCallback,
  394. channels: make(map[string]struct{}),
  395. Notify: make(chan *Notification, 32),
  396. }
  397. l.reconnectCond = sync.NewCond(&l.lock)
  398. go l.listenerMain()
  399. return l
  400. }
  401. // NotificationChannel returns the notification channel for this listener.
  402. // This is the same channel as Notify, and will not be recreated during the
  403. // life time of the Listener.
  404. func (l *Listener) NotificationChannel() <-chan *Notification {
  405. return l.Notify
  406. }
  407. // Listen starts listening for notifications on a channel. Calls to this
  408. // function will block until an acknowledgement has been received from the
  409. // server. Note that Listener automatically re-establishes the connection
  410. // after connection loss, so this function may block indefinitely if the
  411. // connection can not be re-established.
  412. //
  413. // Listen will only fail in three conditions:
  414. // 1) The channel is already open. The returned error will be
  415. // ErrChannelAlreadyOpen.
  416. // 2) The query was executed on the remote server, but Kingbase returned an
  417. // error message in response to the query. The returned error will be a
  418. // kb.Error containing the information the server supplied.
  419. // 3) Close is called on the Listener before the request could be completed.
  420. //
  421. // The channel name is case-sensitive.
  422. func (l *Listener) Listen(channel string) error {
  423. l.lock.Lock()
  424. defer l.lock.Unlock()
  425. if l.isClosed {
  426. return errListenerClosed
  427. }
  428. // The server allows you to issue a LISTEN on a channel which is already
  429. // open, but it seems useful to be able to detect this case to spot for
  430. // mistakes in application logic. If the application genuinely does't
  431. // care, it can check the exported error and ignore it.
  432. _, exists := l.channels[channel]
  433. if exists {
  434. return ErrChannelAlreadyOpen
  435. }
  436. if l.cn != nil {
  437. // If gotResponse is true but error is set, the query was executed on
  438. // the remote server, but resulted in an error. This should be
  439. // relatively rare, so it's fine if we just pass the error to our
  440. // caller. However, if gotResponse is false, we could not complete the
  441. // query on the remote server and our underlying connection is about
  442. // to go away, so we only add relname to l.channels, and wait for
  443. // resync() to take care of the rest.
  444. gotResponse, err := l.cn.Listen(channel)
  445. if gotResponse && err != nil {
  446. return err
  447. }
  448. }
  449. l.channels[channel] = struct{}{}
  450. for l.cn == nil {
  451. l.reconnectCond.Wait()
  452. // we let go of the mutex for a while
  453. if l.isClosed {
  454. return errListenerClosed
  455. }
  456. }
  457. return nil
  458. }
  459. // Unlisten removes a channel from the Listener's channel list. Returns
  460. // ErrChannelNotOpen if the Listener is not listening on the specified channel.
  461. // Returns immediately with no error if there is no connection. Note that you
  462. // might still get notifications for this channel even after Unlisten has
  463. // returned.
  464. //
  465. // The channel name is case-sensitive.
  466. func (l *Listener) Unlisten(channel string) error {
  467. l.lock.Lock()
  468. defer l.lock.Unlock()
  469. if l.isClosed {
  470. return errListenerClosed
  471. }
  472. // Similarly to LISTEN, this is not an error in Kingbase, but it seems
  473. // useful to distinguish from the normal conditions.
  474. _, exists := l.channels[channel]
  475. if !exists {
  476. return ErrChannelNotOpen
  477. }
  478. if l.cn != nil {
  479. // Similarly to Listen (see comment in that function), the caller
  480. // should only be bothered with an error if it came from the backend as
  481. // a response to our query.
  482. gotResponse, err := l.cn.Unlisten(channel)
  483. if gotResponse && err != nil {
  484. return err
  485. }
  486. }
  487. // Don't bother waiting for resync if there's no connection.
  488. delete(l.channels, channel)
  489. return nil
  490. }
  491. // UnlistenAll removes all channels from the Listener's channel list. Returns
  492. // immediately with no error if there is no connection. Note that you might
  493. // still get notifications for any of the deleted channels even after
  494. // UnlistenAll has returned.
  495. func (l *Listener) UnlistenAll() error {
  496. l.lock.Lock()
  497. defer l.lock.Unlock()
  498. if l.isClosed {
  499. return errListenerClosed
  500. }
  501. if l.cn != nil {
  502. // Similarly to Listen (see comment in that function), the caller
  503. // should only be bothered with an error if it came from the backend as
  504. // a response to our query.
  505. gotResponse, err := l.cn.UnlistenAll()
  506. if gotResponse && err != nil {
  507. return err
  508. }
  509. }
  510. // Don't bother waiting for resync if there's no connection.
  511. l.channels = make(map[string]struct{})
  512. return nil
  513. }
  514. // Ping the remote server to make sure it's alive. Non-nil return value means
  515. // that there is no active connection.
  516. func (l *Listener) Ping() error {
  517. l.lock.Lock()
  518. defer l.lock.Unlock()
  519. if l.isClosed {
  520. return errListenerClosed
  521. }
  522. if l.cn == nil {
  523. return errors.New("no connection")
  524. }
  525. return l.cn.Ping()
  526. }
  527. // Clean up after losing the server connection. Returns l.cn.Err(), which
  528. // should have the reason the connection was lost.
  529. func (l *Listener) disconnectCleanup() error {
  530. l.lock.Lock()
  531. defer l.lock.Unlock()
  532. // sanity check; can't look at Err() until the channel has been closed
  533. select {
  534. case _, ok := <-l.connNotificationChan:
  535. if ok {
  536. panic("connNotificationChan not closed")
  537. }
  538. default:
  539. panic("connNotificationChan not closed")
  540. }
  541. err := l.cn.Err()
  542. l.cn.Close()
  543. l.cn = nil
  544. return err
  545. }
  546. // Synchronize the list of channels we want to be listening on with the server
  547. // after the connection has been established.
  548. func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
  549. doneChan := make(chan error)
  550. go func(notificationChan <-chan *Notification) {
  551. for channel := range l.channels {
  552. // If we got a response, return that error to our caller as it's
  553. // going to be more descriptive than cn.Err().
  554. gotResponse, err := cn.Listen(channel)
  555. if gotResponse && err != nil {
  556. doneChan <- err
  557. return
  558. }
  559. // If we couldn't reach the server, wait for notificationChan to
  560. // close and then return the error message from the connection, as
  561. // per ListenerConn's interface.
  562. if err != nil {
  563. for range notificationChan {
  564. }
  565. doneChan <- cn.Err()
  566. return
  567. }
  568. }
  569. doneChan <- nil
  570. }(notificationChan)
  571. // Ignore notifications while synchronization is going on to avoid
  572. // deadlocks. We have to send a nil notification over Notify anyway as
  573. // we can't possibly know which notifications (if any) were lost while
  574. // the connection was down, so there's no reason to try and process
  575. // these messages at all.
  576. for {
  577. select {
  578. case _, ok := <-notificationChan:
  579. if !ok {
  580. notificationChan = nil
  581. }
  582. case err := <-doneChan:
  583. return err
  584. }
  585. }
  586. }
  587. // caller should NOT be holding l.lock
  588. func (l *Listener) closed() bool {
  589. l.lock.Lock()
  590. defer l.lock.Unlock()
  591. return l.isClosed
  592. }
  593. func (l *Listener) connect() error {
  594. notificationChan := make(chan *Notification, 32)
  595. cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
  596. if err != nil {
  597. return err
  598. }
  599. l.lock.Lock()
  600. defer l.lock.Unlock()
  601. err = l.resync(cn, notificationChan)
  602. if err != nil {
  603. cn.Close()
  604. return err
  605. }
  606. l.cn = cn
  607. l.connNotificationChan = notificationChan
  608. l.reconnectCond.Broadcast()
  609. return nil
  610. }
  611. // Close disconnects the Listener from the database and shuts it down.
  612. // Subsequent calls to its methods will return an error. Close returns an
  613. // error if the connection has already been closed.
  614. func (l *Listener) Close() error {
  615. l.lock.Lock()
  616. defer l.lock.Unlock()
  617. if l.isClosed {
  618. return errListenerClosed
  619. }
  620. if l.cn != nil {
  621. l.cn.Close()
  622. }
  623. l.isClosed = true
  624. // Unblock calls to Listen()
  625. l.reconnectCond.Broadcast()
  626. return nil
  627. }
  628. func (l *Listener) emitEvent(event ListenerEventType, err error) {
  629. if l.eventCallback != nil {
  630. l.eventCallback(event, err)
  631. }
  632. }
  633. // Main logic here: maintain a connection to the server when possible, wait
  634. // for notifications and emit events.
  635. func (l *Listener) listenerConnLoop() {
  636. var nextReconnect time.Time
  637. reconnectInterval := l.minReconnectInterval
  638. for {
  639. for {
  640. err := l.connect()
  641. if err == nil {
  642. break
  643. }
  644. if l.closed() {
  645. return
  646. }
  647. l.emitEvent(ListenerEventConnectionAttemptFailed, err)
  648. time.Sleep(reconnectInterval)
  649. reconnectInterval *= 2
  650. if reconnectInterval > l.maxReconnectInterval {
  651. reconnectInterval = l.maxReconnectInterval
  652. }
  653. }
  654. if nextReconnect.IsZero() {
  655. l.emitEvent(ListenerEventConnected, nil)
  656. } else {
  657. l.emitEvent(ListenerEventReconnected, nil)
  658. l.Notify <- nil
  659. }
  660. reconnectInterval = l.minReconnectInterval
  661. nextReconnect = time.Now().Add(reconnectInterval)
  662. for {
  663. notification, ok := <-l.connNotificationChan
  664. if !ok {
  665. // lost connection, loop again
  666. break
  667. }
  668. l.Notify <- notification
  669. }
  670. err := l.disconnectCleanup()
  671. if l.closed() {
  672. return
  673. }
  674. l.emitEvent(ListenerEventDisconnected, err)
  675. time.Sleep(time.Until(nextReconnect))
  676. }
  677. }
  678. func (l *Listener) listenerMain() {
  679. l.listenerConnLoop()
  680. close(l.Notify)
  681. }