1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
use {channel, Poll, Events, Token};
use event::Evented;
use deprecated::{Handler, NotifyError};
use event_imp::{Event, Ready, PollOpt};
use timer::{self, Timer, Timeout};
use std::{io, fmt, usize};
use std::default::Default;
use std::time::Duration;

#[derive(Debug, Default, Clone)]
pub struct EventLoopBuilder {
    config: Config,
}

/// `EventLoop` configuration details
#[derive(Clone, Debug)]
struct Config {
    // == Notifications ==
    notify_capacity: usize,
    messages_per_tick: usize,

    // == Timer ==
    timer_tick: Duration,
    timer_wheel_size: usize,
    timer_capacity: usize,
}

impl Default for Config {
    fn default() -> Config {
        // Default EventLoop configuration values
        Config {
            notify_capacity: 4_096,
            messages_per_tick: 256,
            timer_tick: Duration::from_millis(100),
            timer_wheel_size: 1_024,
            timer_capacity: 65_536,
        }
    }
}

impl EventLoopBuilder {
    /// Construct a new `EventLoopBuilder` with the default configuration
    /// values.
    pub fn new() -> EventLoopBuilder {
        EventLoopBuilder::default()
    }

    /// Sets the maximum number of messages that can be buffered on the event
    /// loop's notification channel before a send will fail.
    ///
    /// The default value for this is 4096.
    pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self {
        self.config.notify_capacity = capacity;
        self
    }

    /// Sets the maximum number of messages that can be processed on any tick of
    /// the event loop.
    ///
    /// The default value for this is 256.
    pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self {
        self.config.messages_per_tick = messages;
        self
    }

    pub fn timer_tick(&mut self, val: Duration) -> &mut Self {
        self.config.timer_tick = val;
        self
    }

    pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self {
        self.config.timer_wheel_size = size;
        self
    }

    pub fn timer_capacity(&mut self, cap: usize) -> &mut Self {
        self.config.timer_capacity = cap;
        self
    }

    /// Constructs a new `EventLoop` using the configured values. The
    /// `EventLoop` will not be running.
    pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> {
        EventLoop::configured(self.config)
    }
}

/// Single threaded IO event loop.
pub struct EventLoop<H: Handler> {
    run: bool,
    poll: Poll,
    events: Events,
    timer: Timer<H::Timeout>,
    notify_tx: channel::SyncSender<H::Message>,
    notify_rx: channel::Receiver<H::Message>,
    config: Config,
}

// Token used to represent notifications
const NOTIFY: Token = Token(usize::MAX - 1);
const TIMER: Token = Token(usize::MAX - 2);

impl<H: Handler> EventLoop<H> {

    /// Constructs a new `EventLoop` using the default configuration values.
    /// The `EventLoop` will not be running.
    pub fn new() -> io::Result<EventLoop<H>> {
        EventLoop::configured(Config::default())
    }

    fn configured(config: Config) -> io::Result<EventLoop<H>> {
        // Create the IO poller
        let poll = Poll::new()?;

        let timer = timer::Builder::default()
            .tick_duration(config.timer_tick)
            .num_slots(config.timer_wheel_size)
            .capacity(config.timer_capacity)
            .build();

        // Create cross thread notification queue
        let (tx, rx) = channel::sync_channel(config.notify_capacity);

        // Register the notification wakeup FD with the IO poller
        poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;
        poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;

        Ok(EventLoop {
            run: true,
            poll,
            timer,
            notify_tx: tx,
            notify_rx: rx,
            config,
            events: Events::with_capacity(1024),
        })
    }

    /// Returns a sender that allows sending messages to the event loop in a
    /// thread-safe way, waking up the event loop if needed.
    ///
    /// # Implementation Details
    ///
    /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
    /// buffer size. The size can be changed by modifying
    /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity).
    /// When a message is sent to the EventLoop, it is first pushed on to the
    /// queue. Then, if the EventLoop is currently running, an atomic flag is
    /// set to indicate that the next loop iteration should be started without
    /// waiting.
    ///
    /// If the loop is blocked waiting for IO events, then it is woken up. The
    /// strategy for waking up the event loop is platform dependent. For
    /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
    /// is used.
    ///
    /// The strategy of setting an atomic flag if the event loop is not already
    /// sleeping allows avoiding an expensive wakeup operation if at all possible.
    pub fn channel(&self) -> Sender<H::Message> {
        Sender::new(self.notify_tx.clone())
    }

    /// Schedules a timeout after the requested time interval. When the
    /// duration has been reached,
    /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
    /// passing in the supplied token.
    ///
    /// Returns a handle to the timeout that can be used to cancel the timeout
    /// using [#clear_timeout](#method.clear_timeout).
    pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
        self.timer.set_timeout(delay, token)
    }

    /// If the supplied timeout has not been triggered, cancel it such that it
    /// will not be triggered in the future.
    pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {
        self.timer.cancel_timeout(&timeout).is_some()
    }

    /// Tells the event loop to exit after it is done handling all events in the
    /// current iteration.
    pub fn shutdown(&mut self) {
        self.run = false;
    }

    /// Indicates whether the event loop is currently running. If it's not it has either
    /// stopped or is scheduled to stop on the next tick.
    pub fn is_running(&self) -> bool {
        self.run
    }

    /// Registers an IO handle with the event loop.
    pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
        where E: Evented
    {
        self.poll.register(io, token, interest, opt)
    }

    /// Re-Registers an IO handle with the event loop.
    pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
        where E: Evented
    {
        self.poll.reregister(io, token, interest, opt)
    }

    /// Keep spinning the event loop indefinitely, and notify the handler whenever
    /// any of the registered handles are ready.
    pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
        self.run = true;

        while self.run {
            // Execute ticks as long as the event loop is running
            self.run_once(handler, None)?;
        }

        Ok(())
    }

    /// Deregisters an IO handle with the event loop.
    ///
    /// Both kqueue and epoll will automatically clear any pending events when closing a
    /// file descriptor (socket). In that case, this method does not need to be called
    /// prior to dropping a connection from the slab.
    ///
    /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
    /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
    pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
        self.poll.deregister(io)
    }

    /// Spin the event loop once, with a given timeout (forever if `None`),
    /// and notify the handler if any of the registered handles become ready
    /// during that time.
    pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
        trace!("event loop tick");

        // Check the registered IO handles for any new events. Each poll
        // is for one second, so a shutdown request can last as long as
        // one second before it takes effect.
        let events = match self.io_poll(timeout) {
            Ok(e) => e,
            Err(err) => {
                if err.kind() == io::ErrorKind::Interrupted {
                    handler.interrupted(self);
                    0
                } else {
                    return Err(err);
                }
            }
        };

        self.io_process(handler, events);
        handler.tick(self);
        Ok(())
    }

    #[inline]
    fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
        self.poll.poll(&mut self.events, timeout)
    }

    // Process IO events that have been previously polled
    fn io_process(&mut self, handler: &mut H, cnt: usize) {
        let mut i = 0;

        trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());

        // Iterate over the notifications. Each event provides the token
        // it was registered with (which usually represents, at least, the
        // handle that the event is about) as well as information about
        // what kind of event occurred (readable, writable, signal, etc.)
        while i < cnt {
            let evt = self.events.get(i).unwrap();

            trace!("event={:?}; idx={:?}", evt, i);

            match evt.token() {
                NOTIFY => self.notify(handler),
                TIMER => self.timer_process(handler),
                _ => self.io_event(handler, evt)
            }

            i += 1;
        }
    }

    fn io_event(&mut self, handler: &mut H, evt: Event) {
        handler.ready(self, evt.token(), evt.readiness());
    }

    fn notify(&mut self, handler: &mut H) {
        for _ in 0..self.config.messages_per_tick {
            match self.notify_rx.try_recv() {
                Ok(msg) => handler.notify(self, msg),
                _ => break,
            }
        }

        // Re-register
        let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());
    }

    fn timer_process(&mut self, handler: &mut H) {
        while let Some(t) = self.timer.poll() {
            handler.timeout(self, t);
        }
    }
}

impl<H: Handler> fmt::Debug for EventLoop<H> {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("EventLoop")
            .field("run", &self.run)
            .field("poll", &self.poll)
            .field("config", &self.config)
            .finish()
    }
}

/// Sends messages to the EventLoop from other threads.
pub struct Sender<M> {
    tx: channel::SyncSender<M>
}

impl<M> fmt::Debug for Sender<M> {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        write!(fmt, "Sender<?> {{ ... }}")
    }
}

impl<M> Clone for Sender <M> {
    fn clone(&self) -> Sender<M> {
        Sender { tx: self.tx.clone() }
    }
}

impl<M> Sender<M> {
    fn new(tx: channel::SyncSender<M>) -> Sender<M> {
        Sender { tx }
    }

    pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> {
        self.tx.try_send(msg)?;
        Ok(())
    }
}