123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- // Copyright 2015 go-dockerclient authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package docker
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "math"
- "net"
- "net/http"
- "net/http/httputil"
- "sync"
- "sync/atomic"
- "time"
- )
- // APIEvents represents events coming from the Docker API
- // The fields in the Docker API changed in API version 1.22, and
- // events for more than images and containers are now fired off.
- // To maintain forward and backward compatibility, go-dockerclient
- // replicates the event in both the new and old format as faithfully as possible.
- //
- // For events that only exist in 1.22 in later, `Status` is filled in as
- // `"Type:Action"` instead of just `Action` to allow for older clients to
- // differentiate and not break if they rely on the pre-1.22 Status types.
- //
- // The transformEvent method can be consulted for more information about how
- // events are translated from new/old API formats
- type APIEvents struct {
- // New API Fields in 1.22
- Action string `json:"action,omitempty"`
- Type string `json:"type,omitempty"`
- Actor APIActor `json:"actor,omitempty"`
- // Old API fields for < 1.22
- Status string `json:"status,omitempty"`
- ID string `json:"id,omitempty"`
- From string `json:"from,omitempty"`
- // Fields in both
- Time int64 `json:"time,omitempty"`
- TimeNano int64 `json:"timeNano,omitempty"`
- }
- // APIActor represents an actor that accomplishes something for an event
- type APIActor struct {
- ID string `json:"id,omitempty"`
- Attributes map[string]string `json:"attributes,omitempty"`
- }
- type eventMonitoringState struct {
- sync.RWMutex
- sync.WaitGroup
- enabled bool
- lastSeen int64
- C chan *APIEvents
- errC chan error
- listeners []chan<- *APIEvents
- }
- const (
- maxMonitorConnRetries = 5
- retryInitialWaitTime = 10.
- )
- var (
- // ErrNoListeners is the error returned when no listeners are available
- // to receive an event.
- ErrNoListeners = errors.New("no listeners present to receive event")
- // ErrListenerAlreadyExists is the error returned when the listerner already
- // exists.
- ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
- // EOFEvent is sent when the event listener receives an EOF error.
- EOFEvent = &APIEvents{
- Type: "EOF",
- Status: "EOF",
- }
- )
- // AddEventListener adds a new listener to container events in the Docker API.
- //
- // The parameter is a channel through which events will be sent.
- func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
- var err error
- if !c.eventMonitor.isEnabled() {
- err = c.eventMonitor.enableEventMonitoring(c)
- if err != nil {
- return err
- }
- }
- err = c.eventMonitor.addListener(listener)
- if err != nil {
- return err
- }
- return nil
- }
- // RemoveEventListener removes a listener from the monitor.
- func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
- err := c.eventMonitor.removeListener(listener)
- if err != nil {
- return err
- }
- if len(c.eventMonitor.listeners) == 0 {
- c.eventMonitor.disableEventMonitoring()
- }
- return nil
- }
- func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
- eventState.Lock()
- defer eventState.Unlock()
- if listenerExists(listener, &eventState.listeners) {
- return ErrListenerAlreadyExists
- }
- eventState.Add(1)
- eventState.listeners = append(eventState.listeners, listener)
- return nil
- }
- func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
- eventState.Lock()
- defer eventState.Unlock()
- if listenerExists(listener, &eventState.listeners) {
- var newListeners []chan<- *APIEvents
- for _, l := range eventState.listeners {
- if l != listener {
- newListeners = append(newListeners, l)
- }
- }
- eventState.listeners = newListeners
- eventState.Add(-1)
- }
- return nil
- }
- func (eventState *eventMonitoringState) closeListeners() {
- for _, l := range eventState.listeners {
- close(l)
- eventState.Add(-1)
- }
- eventState.listeners = nil
- }
- func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
- for _, b := range *list {
- if b == a {
- return true
- }
- }
- return false
- }
- func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
- eventState.Lock()
- defer eventState.Unlock()
- if !eventState.enabled {
- eventState.enabled = true
- atomic.StoreInt64(&eventState.lastSeen, 0)
- eventState.C = make(chan *APIEvents, 100)
- eventState.errC = make(chan error, 1)
- go eventState.monitorEvents(c)
- }
- return nil
- }
- func (eventState *eventMonitoringState) disableEventMonitoring() error {
- eventState.Lock()
- defer eventState.Unlock()
- eventState.closeListeners()
- eventState.Wait()
- if eventState.enabled {
- eventState.enabled = false
- close(eventState.C)
- close(eventState.errC)
- }
- return nil
- }
- func (eventState *eventMonitoringState) monitorEvents(c *Client) {
- var err error
- for eventState.noListeners() {
- time.Sleep(10 * time.Millisecond)
- }
- if err = eventState.connectWithRetry(c); err != nil {
- // terminate if connect failed
- eventState.disableEventMonitoring()
- return
- }
- for eventState.isEnabled() {
- timeout := time.After(100 * time.Millisecond)
- select {
- case ev, ok := <-eventState.C:
- if !ok {
- return
- }
- if ev == EOFEvent {
- eventState.disableEventMonitoring()
- return
- }
- eventState.updateLastSeen(ev)
- go eventState.sendEvent(ev)
- case err = <-eventState.errC:
- if err == ErrNoListeners {
- eventState.disableEventMonitoring()
- return
- } else if err != nil {
- defer func() { go eventState.monitorEvents(c) }()
- return
- }
- case <-timeout:
- continue
- }
- }
- }
- func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
- var retries int
- eventState.RLock()
- eventChan := eventState.C
- errChan := eventState.errC
- eventState.RUnlock()
- err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
- for ; err != nil && retries < maxMonitorConnRetries; retries++ {
- waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
- time.Sleep(time.Duration(waitTime) * time.Millisecond)
- eventState.RLock()
- eventChan = eventState.C
- errChan = eventState.errC
- eventState.RUnlock()
- err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
- }
- return err
- }
- func (eventState *eventMonitoringState) noListeners() bool {
- eventState.RLock()
- defer eventState.RUnlock()
- return len(eventState.listeners) == 0
- }
- func (eventState *eventMonitoringState) isEnabled() bool {
- eventState.RLock()
- defer eventState.RUnlock()
- return eventState.enabled
- }
- func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
- eventState.RLock()
- defer eventState.RUnlock()
- eventState.Add(1)
- defer eventState.Done()
- if eventState.enabled {
- if len(eventState.listeners) == 0 {
- eventState.errC <- ErrNoListeners
- return
- }
- for _, listener := range eventState.listeners {
- listener <- event
- }
- }
- }
- func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
- eventState.Lock()
- defer eventState.Unlock()
- if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
- atomic.StoreInt64(&eventState.lastSeen, e.Time)
- }
- }
- func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
- uri := "/events"
- if startTime != 0 {
- uri += fmt.Sprintf("?since=%d", startTime)
- }
- protocol := c.endpointURL.Scheme
- address := c.endpointURL.Path
- if protocol != "unix" {
- protocol = "tcp"
- address = c.endpointURL.Host
- }
- var dial net.Conn
- var err error
- if c.TLSConfig == nil {
- dial, err = c.Dialer.Dial(protocol, address)
- } else {
- dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig)
- }
- if err != nil {
- return err
- }
- conn := httputil.NewClientConn(dial, nil)
- req, err := http.NewRequest("GET", uri, nil)
- if err != nil {
- return err
- }
- res, err := conn.Do(req)
- if err != nil {
- return err
- }
- go func(res *http.Response, conn *httputil.ClientConn) {
- defer conn.Close()
- defer res.Body.Close()
- decoder := json.NewDecoder(res.Body)
- for {
- var event APIEvents
- if err = decoder.Decode(&event); err != nil {
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- c.eventMonitor.RLock()
- if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
- // Signal that we're exiting.
- eventChan <- EOFEvent
- }
- c.eventMonitor.RUnlock()
- break
- }
- errChan <- err
- }
- if event.Time == 0 {
- continue
- }
- if !c.eventMonitor.isEnabled() || c.eventMonitor.C != eventChan {
- return
- }
- transformEvent(&event)
- eventChan <- &event
- }
- }(res, conn)
- return nil
- }
- // transformEvent takes an event and determines what version it is from
- // then populates both versions of the event
- func transformEvent(event *APIEvents) {
- // if event version is <= 1.21 there will be no Action and no Type
- if event.Action == "" && event.Type == "" {
- event.Action = event.Status
- event.Actor.ID = event.ID
- event.Actor.Attributes = map[string]string{}
- switch event.Status {
- case "delete", "import", "pull", "push", "tag", "untag":
- event.Type = "image"
- default:
- event.Type = "container"
- if event.From != "" {
- event.Actor.Attributes["image"] = event.From
- }
- }
- } else {
- if event.Status == "" {
- if event.Type == "image" || event.Type == "container" {
- event.Status = event.Action
- } else {
- // Because just the Status has been overloaded with different Types
- // if an event is not for an image or a container, we prepend the type
- // to avoid problems for people relying on actions being only for
- // images and containers
- event.Status = event.Type + ":" + event.Action
- }
- }
- if event.ID == "" {
- event.ID = event.Actor.ID
- }
- if event.From == "" {
- event.From = event.Actor.Attributes["image"]
- }
- }
- }
|