embassy_time/
timer.rs

1use core::future::{poll_fn, Future};
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_util::stream::FusedStream;
6use futures_util::Stream;
7
8use crate::{Duration, Instant};
9
10/// Error returned by [`with_timeout`] and [`with_deadline`] on timeout.
11#[derive(Debug, Clone, PartialEq, Eq)]
12#[cfg_attr(feature = "defmt", derive(defmt::Format))]
13pub struct TimeoutError;
14
15/// Runs a given future with a timeout.
16///
17/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
18/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
19pub fn with_timeout<F: Future>(timeout: Duration, fut: F) -> TimeoutFuture<F> {
20    TimeoutFuture {
21        timer: Timer::after(timeout),
22        fut,
23    }
24}
25
26/// Runs a given future with a deadline time.
27///
28/// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
29/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
30pub fn with_deadline<F: Future>(at: Instant, fut: F) -> TimeoutFuture<F> {
31    TimeoutFuture {
32        timer: Timer::at(at),
33        fut,
34    }
35}
36
37/// Provides functions to run a given future with a timeout or a deadline.
38pub trait WithTimeout: Sized {
39    /// Output type of the future.
40    type Output;
41
42    /// Runs a given future with a timeout.
43    ///
44    /// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
45    /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
46    fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self>;
47
48    /// Runs a given future with a deadline time.
49    ///
50    /// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
51    /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
52    fn with_deadline(self, at: Instant) -> TimeoutFuture<Self>;
53}
54
55impl<F: Future> WithTimeout for F {
56    type Output = F::Output;
57
58    fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self> {
59        with_timeout(timeout, self)
60    }
61
62    fn with_deadline(self, at: Instant) -> TimeoutFuture<Self> {
63        with_deadline(at, self)
64    }
65}
66
67/// Future for the [`with_timeout`] and [`with_deadline`] functions.
68#[must_use = "futures do nothing unless you `.await` or poll them"]
69pub struct TimeoutFuture<F> {
70    timer: Timer,
71    fut: F,
72}
73
74impl<F: Unpin> Unpin for TimeoutFuture<F> {}
75
76impl<F: Future> Future for TimeoutFuture<F> {
77    type Output = Result<F::Output, TimeoutError>;
78
79    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        let this = unsafe { self.get_unchecked_mut() };
81        let fut = unsafe { Pin::new_unchecked(&mut this.fut) };
82        let timer = unsafe { Pin::new_unchecked(&mut this.timer) };
83        if let Poll::Ready(x) = fut.poll(cx) {
84            return Poll::Ready(Ok(x));
85        }
86        if let Poll::Ready(_) = timer.poll(cx) {
87            return Poll::Ready(Err(TimeoutError));
88        }
89        Poll::Pending
90    }
91}
92
93/// A future that completes at a specified [Instant](struct.Instant.html).
94#[must_use = "futures do nothing unless you `.await` or poll them"]
95pub struct Timer {
96    expires_at: Instant,
97    yielded_once: bool,
98}
99
100impl Timer {
101    /// Expire at specified [Instant](struct.Instant.html)
102    pub fn at(expires_at: Instant) -> Self {
103        Self {
104            expires_at,
105            yielded_once: false,
106        }
107    }
108
109    /// Expire after specified [Duration](struct.Duration.html).
110    /// This can be used as a `sleep` abstraction.
111    ///
112    /// Example:
113    /// ``` no_run
114    /// use embassy_time::{Duration, Timer};
115    ///
116    /// #[embassy_executor::task]
117    /// async fn demo_sleep_seconds() {
118    ///     // suspend this task for one second.
119    ///     Timer::after(Duration::from_secs(1)).await;
120    /// }
121    /// ```
122    pub fn after(duration: Duration) -> Self {
123        Self {
124            expires_at: Instant::now() + duration,
125            yielded_once: false,
126        }
127    }
128
129    /// Expire after the specified number of ticks.
130    ///
131    /// This method is a convenience wrapper for calling `Timer::after(Duration::from_ticks())`.
132    /// For more details, refer to [`Timer::after()`] and [`Duration::from_ticks()`].
133    #[inline]
134    pub fn after_ticks(ticks: u64) -> Self {
135        Self::after(Duration::from_ticks(ticks))
136    }
137
138    /// Expire after the specified number of nanoseconds.
139    ///
140    /// This method is a convenience wrapper for calling `Timer::after(Duration::from_nanos())`.
141    /// For more details, refer to [`Timer::after()`] and [`Duration::from_nanos()`].
142    #[inline]
143    pub fn after_nanos(nanos: u64) -> Self {
144        Self::after(Duration::from_nanos(nanos))
145    }
146
147    /// Expire after the specified number of microseconds.
148    ///
149    /// This method is a convenience wrapper for calling `Timer::after(Duration::from_micros())`.
150    /// For more details, refer to [`Timer::after()`] and [`Duration::from_micros()`].
151    #[inline]
152    pub fn after_micros(micros: u64) -> Self {
153        Self::after(Duration::from_micros(micros))
154    }
155
156    /// Expire after the specified number of milliseconds.
157    ///
158    /// This method is a convenience wrapper for calling `Timer::after(Duration::from_millis())`.
159    /// For more details, refer to [`Timer::after`] and [`Duration::from_millis()`].
160    #[inline]
161    pub fn after_millis(millis: u64) -> Self {
162        Self::after(Duration::from_millis(millis))
163    }
164
165    /// Expire after the specified number of seconds.
166    ///
167    /// This method is a convenience wrapper for calling `Timer::after(Duration::from_secs())`.
168    /// For more details, refer to [`Timer::after`] and [`Duration::from_secs()`].
169    #[inline]
170    pub fn after_secs(secs: u64) -> Self {
171        Self::after(Duration::from_secs(secs))
172    }
173}
174
175impl Unpin for Timer {}
176
177impl Future for Timer {
178    type Output = ();
179    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180        if self.yielded_once && self.expires_at <= Instant::now() {
181            Poll::Ready(())
182        } else {
183            embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
184            self.yielded_once = true;
185            Poll::Pending
186        }
187    }
188}
189
190/// Asynchronous stream that yields every Duration, indefinitely.
191///
192/// This stream will tick at uniform intervals, even if blocking work is performed between ticks.
193///
194/// For instance, consider the following code fragment.
195/// ``` no_run
196/// use embassy_time::{Duration, Timer};
197/// # fn foo() {}
198///
199/// #[embassy_executor::task]
200/// async fn ticker_example_0() {
201///     loop {
202///         foo();
203///         Timer::after(Duration::from_secs(1)).await;
204///     }
205/// }
206/// ```
207///
208/// This fragment will not call `foo` every second.
209/// Instead, it will call it every second + the time it took to previously call `foo`.
210///
211/// Example using ticker, which will consistently call `foo` once a second.
212///
213/// ``` no_run
214/// use embassy_time::{Duration, Ticker};
215/// # fn foo(){}
216///
217/// #[embassy_executor::task]
218/// async fn ticker_example_1() {
219///     let mut ticker = Ticker::every(Duration::from_secs(1));
220///     loop {
221///         foo();
222///         ticker.next().await;
223///     }
224/// }
225/// ```
226///
227/// ## Cancel safety
228/// It is safe to cancel waiting for the next tick,
229/// meaning no tick is lost if the Future is dropped.
230pub struct Ticker {
231    expires_at: Instant,
232    duration: Duration,
233}
234
235impl Ticker {
236    /// Creates a new ticker that ticks at the specified duration interval.
237    pub fn every(duration: Duration) -> Self {
238        let expires_at = Instant::now() + duration;
239        Self { expires_at, duration }
240    }
241
242    /// Resets the ticker back to its original state.
243    /// This causes the ticker to go back to zero, even if the current tick isn't over yet.
244    pub fn reset(&mut self) {
245        self.expires_at = Instant::now() + self.duration;
246    }
247
248    /// Reset the ticker at the deadline.
249    /// If the deadline is in the past, the ticker will fire instantly.
250    pub fn reset_at(&mut self, deadline: Instant) {
251        self.expires_at = deadline + self.duration;
252    }
253
254    /// Resets the ticker, after the specified duration has passed.
255    /// If the specified duration is zero, the next tick will be after the duration of the ticker.
256    pub fn reset_after(&mut self, after: Duration) {
257        self.expires_at = Instant::now() + after + self.duration;
258    }
259
260    /// Waits for the next tick.
261    ///
262    /// ## Cancel safety
263    /// The produced Future is cancel safe, meaning no tick is lost if the Future is dropped.
264    pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ {
265        poll_fn(|cx| {
266            if self.expires_at <= Instant::now() {
267                let dur = self.duration;
268                self.expires_at += dur;
269                Poll::Ready(())
270            } else {
271                embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
272                Poll::Pending
273            }
274        })
275    }
276}
277
278impl Unpin for Ticker {}
279
280impl Stream for Ticker {
281    type Item = ();
282    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
283        if self.expires_at <= Instant::now() {
284            let dur = self.duration;
285            self.expires_at += dur;
286            Poll::Ready(Some(()))
287        } else {
288            embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
289            Poll::Pending
290        }
291    }
292}
293
294impl FusedStream for Ticker {
295    fn is_terminated(&self) -> bool {
296        // `Ticker` keeps yielding values until dropped, it never terminates.
297        false
298    }
299}