matrix_sdk_base/store/
send_queue.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All data types related to the send queue.
16
17use std::{collections::BTreeMap, fmt, ops::Deref};
18
19use as_variant::as_variant;
20use ruma::{
21    MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
22    TransactionId, UInt,
23    events::{
24        AnyMessageLikeEventContent, MessageLikeEventContent as _, RawExt as _,
25        room::{MediaSource, message::RoomMessageEventContent},
26    },
27    serde::Raw,
28};
29use serde::{Deserialize, Serialize};
30
31use crate::media::MediaRequestParameters;
32
33/// A thin wrapper to serialize a `AnyMessageLikeEventContent`.
34#[derive(Clone, Serialize, Deserialize)]
35pub struct SerializableEventContent {
36    event: Raw<AnyMessageLikeEventContent>,
37    event_type: String,
38}
39
40#[cfg(not(tarpaulin_include))]
41impl fmt::Debug for SerializableEventContent {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        // Don't include the event in the debug display.
44        f.debug_struct("SerializedEventContent")
45            .field("event_type", &self.event_type)
46            .finish_non_exhaustive()
47    }
48}
49
50impl SerializableEventContent {
51    /// Create a [`SerializableEventContent`] from a raw
52    /// [`AnyMessageLikeEventContent`] along with its type.
53    pub fn from_raw(event: Raw<AnyMessageLikeEventContent>, event_type: String) -> Self {
54        Self { event_type, event }
55    }
56
57    /// Create a [`SerializableEventContent`] from an
58    /// [`AnyMessageLikeEventContent`].
59    pub fn new(event: &AnyMessageLikeEventContent) -> Result<Self, serde_json::Error> {
60        Ok(Self::from_raw(Raw::new(event)?, event.event_type().to_string()))
61    }
62
63    /// Convert a [`SerializableEventContent`] back into a
64    /// [`AnyMessageLikeEventContent`].
65    pub fn deserialize(&self) -> Result<AnyMessageLikeEventContent, serde_json::Error> {
66        self.event.deserialize_with_type(&self.event_type)
67    }
68
69    /// Returns the raw event content along with its type.
70    ///
71    /// Useful for callers manipulating custom events.
72    pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
73        (&self.event, &self.event_type)
74    }
75}
76
77/// The kind of a send queue request.
78#[derive(Clone, Debug, Serialize, Deserialize)]
79pub enum QueuedRequestKind {
80    /// An event to be sent via the send queue.
81    Event {
82        /// The content of the message-like event we'd like to send.
83        content: SerializableEventContent,
84    },
85
86    /// Content to upload on the media server.
87    ///
88    /// The bytes must be stored in the media cache, and are identified by the
89    /// cache key.
90    MediaUpload {
91        /// Content type of the media to be uploaded.
92        ///
93        /// Stored as a `String` because `Mime` which we'd really want to use
94        /// here, is not serializable. Oh well.
95        content_type: String,
96
97        /// The cache key used to retrieve the media's bytes in the event cache
98        /// store.
99        cache_key: MediaRequestParameters,
100
101        /// An optional media source for a thumbnail already uploaded.
102        thumbnail_source: Option<MediaSource>,
103
104        /// To which media event transaction does this upload relate?
105        related_to: OwnedTransactionId,
106
107        /// Accumulated list of infos for previously uploaded files and
108        /// thumbnails if used during a gallery transaction. Otherwise empty.
109        #[cfg(feature = "unstable-msc4274")]
110        #[serde(default)]
111        accumulated: Vec<AccumulatedSentMediaInfo>,
112    },
113}
114
115impl From<SerializableEventContent> for QueuedRequestKind {
116    fn from(content: SerializableEventContent) -> Self {
117        Self::Event { content }
118    }
119}
120
121/// A request to be sent with a send queue.
122#[derive(Clone)]
123pub struct QueuedRequest {
124    /// The kind of queued request we're going to send.
125    pub kind: QueuedRequestKind,
126
127    /// Unique transaction id for the queued request, acting as a key.
128    pub transaction_id: OwnedTransactionId,
129
130    /// Error returned when the request couldn't be sent and is stuck in the
131    /// unrecoverable state.
132    ///
133    /// `None` if the request is in the queue, waiting to be sent.
134    pub error: Option<QueueWedgeError>,
135
136    /// At which priority should this be handled?
137    ///
138    /// The bigger the value, the higher the priority at which this request
139    /// should be handled.
140    pub priority: usize,
141
142    /// The time that the request was originally attempted.
143    pub created_at: MilliSecondsSinceUnixEpoch,
144}
145
146impl QueuedRequest {
147    /// Returns `Some` if the queued request is about sending an event.
148    pub fn as_event(&self) -> Option<&SerializableEventContent> {
149        as_variant!(&self.kind, QueuedRequestKind::Event { content } => content)
150    }
151
152    /// True if the request couldn't be sent because of an unrecoverable API
153    /// error. See [`Self::error`] for more details on the reason.
154    pub fn is_wedged(&self) -> bool {
155        self.error.is_some()
156    }
157}
158
159/// Represents a failed to send unrecoverable error of an event sent via the
160/// send queue.
161///
162/// It is a serializable representation of a client error, see
163/// `From` implementation for more details. These errors can not be
164/// automatically retried, but yet some manual action can be taken before retry
165/// sending. If not the only solution is to delete the local event.
166#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
167pub enum QueueWedgeError {
168    /// This error occurs when there are some insecure devices in the room, and
169    /// the current encryption setting prohibits sharing with them.
170    #[error("There are insecure devices in the room")]
171    InsecureDevices {
172        /// The insecure devices as a Map of userID to deviceID.
173        user_device_map: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
174    },
175
176    /// This error occurs when a previously verified user is not anymore, and
177    /// the current encryption setting prohibits sharing when it happens.
178    #[error("Some users that were previously verified are not anymore")]
179    IdentityViolations {
180        /// The users that are expected to be verified but are not.
181        users: Vec<OwnedUserId>,
182    },
183
184    /// It is required to set up cross-signing and properly verify the current
185    /// session before sending.
186    #[error("Own verification is required")]
187    CrossVerificationRequired,
188
189    /// Media content was cached in the media store, but has disappeared before
190    /// we could upload it.
191    #[error("Media content disappeared")]
192    MissingMediaContent,
193
194    /// We tried to upload some media content with an unknown mime type.
195    #[error("Invalid mime type '{mime_type}' for media")]
196    InvalidMimeType {
197        /// The observed mime type that's expected to be invalid.
198        mime_type: String,
199    },
200
201    /// Other errors.
202    #[error("Other unrecoverable error: {msg}")]
203    GenericApiError {
204        /// Description of the error.
205        msg: String,
206    },
207}
208
209/// The specific user intent that characterizes a [`DependentQueuedRequest`].
210#[derive(Clone, Debug, Serialize, Deserialize)]
211pub enum DependentQueuedRequestKind {
212    /// The event should be edited.
213    EditEvent {
214        /// The new event for the content.
215        new_content: SerializableEventContent,
216    },
217
218    /// The event should be redacted/aborted/removed.
219    RedactEvent,
220
221    /// The event should be reacted to, with the given key.
222    ReactEvent {
223        /// Key used for the reaction.
224        key: String,
225    },
226
227    /// Upload a file or thumbnail depending on another file or thumbnail
228    /// upload.
229    #[serde(alias = "UploadFileWithThumbnail")]
230    UploadFileOrThumbnail {
231        /// Content type for the file or thumbnail.
232        content_type: String,
233
234        /// Media request necessary to retrieve the file or thumbnail itself.
235        cache_key: MediaRequestParameters,
236
237        /// To which media transaction id does this upload relate to?
238        related_to: OwnedTransactionId,
239
240        /// Whether the depended upon request was a thumbnail or a file upload.
241        #[serde(default = "default_parent_is_thumbnail_upload")]
242        parent_is_thumbnail_upload: bool,
243    },
244
245    /// Finish an upload by updating references to the media cache and sending
246    /// the final media event with the remote MXC URIs.
247    FinishUpload {
248        /// Local echo for the event (containing the local MXC URIs).
249        ///
250        /// `Box` the local echo so that it reduces the size of the whole enum.
251        local_echo: Box<RoomMessageEventContent>,
252
253        /// Transaction id for the file upload.
254        file_upload: OwnedTransactionId,
255
256        /// Information about the thumbnail, if present.
257        thumbnail_info: Option<FinishUploadThumbnailInfo>,
258    },
259
260    /// Finish a gallery upload by updating references to the media cache and
261    /// sending the final gallery event with the remote MXC URIs.
262    #[cfg(feature = "unstable-msc4274")]
263    FinishGallery {
264        /// Local echo for the event (containing the local MXC URIs).
265        ///
266        /// `Box` the local echo so that it reduces the size of the whole enum.
267        local_echo: Box<RoomMessageEventContent>,
268
269        /// Metadata about the gallery items.
270        item_infos: Vec<FinishGalleryItemInfo>,
271    },
272}
273
274/// If parent_is_thumbnail_upload is missing, we assume the request is for a
275/// file upload following a thumbnail upload. This was the only possible case
276/// before parent_is_thumbnail_upload was introduced.
277fn default_parent_is_thumbnail_upload() -> bool {
278    true
279}
280
281/// Detailed record about a thumbnail used when finishing a media upload.
282#[derive(Clone, Debug, Serialize, Deserialize)]
283pub struct FinishUploadThumbnailInfo {
284    /// Transaction id for the thumbnail upload.
285    pub txn: OwnedTransactionId,
286    /// Thumbnail's width.
287    ///
288    /// Used previously, kept for backwards compatibility.
289    #[serde(default, skip_serializing_if = "Option::is_none")]
290    pub width: Option<UInt>,
291    /// Thumbnail's height.
292    ///
293    /// Used previously, kept for backwards compatibility.
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub height: Option<UInt>,
296}
297
298/// Detailed record about a file and thumbnail. When finishing a gallery
299/// upload, one [`FinishGalleryItemInfo`] will be used for each media in the
300/// gallery.
301#[cfg(feature = "unstable-msc4274")]
302#[derive(Clone, Debug, Serialize, Deserialize)]
303pub struct FinishGalleryItemInfo {
304    /// Transaction id for the file upload.
305    pub file_upload: OwnedTransactionId,
306    /// Information about the thumbnail, if present.
307    pub thumbnail_info: Option<FinishUploadThumbnailInfo>,
308}
309
310/// A transaction id identifying a [`DependentQueuedRequest`] rather than its
311/// parent [`QueuedRequest`].
312///
313/// This thin wrapper adds some safety to some APIs, making it possible to
314/// distinguish between the parent's `TransactionId` and the dependent event's
315/// own `TransactionId`.
316#[repr(transparent)]
317#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
318#[serde(transparent)]
319pub struct ChildTransactionId(OwnedTransactionId);
320
321impl ChildTransactionId {
322    /// Returns a new [`ChildTransactionId`].
323    #[allow(clippy::new_without_default)]
324    pub fn new() -> Self {
325        Self(TransactionId::new())
326    }
327}
328
329impl Deref for ChildTransactionId {
330    type Target = TransactionId;
331
332    fn deref(&self) -> &Self::Target {
333        &self.0
334    }
335}
336
337impl From<String> for ChildTransactionId {
338    fn from(val: String) -> Self {
339        Self(val.into())
340    }
341}
342
343impl From<ChildTransactionId> for OwnedTransactionId {
344    fn from(val: ChildTransactionId) -> Self {
345        val.0
346    }
347}
348
349impl From<OwnedTransactionId> for ChildTransactionId {
350    fn from(val: OwnedTransactionId) -> Self {
351        Self(val)
352    }
353}
354
355/// Information about a media (and its thumbnail) that have been sent to a
356/// homeserver.
357#[derive(Clone, Debug, Serialize, Deserialize)]
358pub struct SentMediaInfo {
359    /// File that was uploaded by this request.
360    ///
361    /// If the request related to a thumbnail upload, this contains the
362    /// thumbnail media source.
363    pub file: MediaSource,
364
365    /// Optional thumbnail previously uploaded, when uploading a file.
366    ///
367    /// When uploading a thumbnail, this is set to `None`.
368    pub thumbnail: Option<MediaSource>,
369
370    /// Accumulated list of infos for previously uploaded files and thumbnails
371    /// if used during a gallery transaction. Otherwise empty.
372    #[cfg(feature = "unstable-msc4274")]
373    #[serde(default)]
374    pub accumulated: Vec<AccumulatedSentMediaInfo>,
375}
376
377/// Accumulated information about a media (and its thumbnail) that have been
378/// sent to a homeserver.
379#[cfg(feature = "unstable-msc4274")]
380#[derive(Clone, Debug, Serialize, Deserialize)]
381pub struct AccumulatedSentMediaInfo {
382    /// File that was uploaded by this request.
383    ///
384    /// If the request related to a thumbnail upload, this contains the
385    /// thumbnail media source.
386    pub file: MediaSource,
387
388    /// Optional thumbnail previously uploaded, when uploading a file.
389    ///
390    /// When uploading a thumbnail, this is set to `None`.
391    pub thumbnail: Option<MediaSource>,
392}
393
394#[cfg(feature = "unstable-msc4274")]
395impl From<AccumulatedSentMediaInfo> for SentMediaInfo {
396    fn from(value: AccumulatedSentMediaInfo) -> Self {
397        Self { file: value.file, thumbnail: value.thumbnail, accumulated: vec![] }
398    }
399}
400
401/// A unique key (identifier) indicating that a transaction has been
402/// successfully sent to the server.
403///
404/// The owning child transactions can now be resolved.
405#[derive(Clone, Debug, Serialize, Deserialize)]
406pub enum SentRequestKey {
407    /// The parent transaction returned an event when it succeeded.
408    Event(OwnedEventId),
409
410    /// The parent transaction returned an uploaded resource URL.
411    Media(SentMediaInfo),
412}
413
414impl SentRequestKey {
415    /// Converts the current parent key into an event id, if possible.
416    pub fn into_event_id(self) -> Option<OwnedEventId> {
417        as_variant!(self, Self::Event)
418    }
419
420    /// Converts the current parent key into information about a sent media, if
421    /// possible.
422    pub fn into_media(self) -> Option<SentMediaInfo> {
423        as_variant!(self, Self::Media)
424    }
425}
426
427/// A request to be sent, depending on a [`QueuedRequest`] to be sent first.
428///
429/// Depending on whether the parent request has been sent or not, this will
430/// either update the local echo in the storage, or materialize an equivalent
431/// request implementing the user intent to the homeserver.
432#[derive(Clone, Debug, Serialize, Deserialize)]
433pub struct DependentQueuedRequest {
434    /// Unique identifier for this dependent queued request.
435    ///
436    /// Useful for deletion.
437    pub own_transaction_id: ChildTransactionId,
438
439    /// The kind of user intent.
440    pub kind: DependentQueuedRequestKind,
441
442    /// Transaction id for the parent's local echo / used in the server request.
443    ///
444    /// Note: this is the transaction id used for the depended-on request, i.e.
445    /// the one that was originally sent and that's being modified with this
446    /// dependent request.
447    pub parent_transaction_id: OwnedTransactionId,
448
449    /// If the parent request has been sent, the parent's request identifier
450    /// returned by the server once the local echo has been sent out.
451    pub parent_key: Option<SentRequestKey>,
452
453    /// The time that the request was originally attempted.
454    pub created_at: MilliSecondsSinceUnixEpoch,
455}
456
457impl DependentQueuedRequest {
458    /// Does the dependent request represent a new event that is *not*
459    /// aggregated, aka it is going to be its own item in a timeline?
460    pub fn is_own_event(&self) -> bool {
461        match self.kind {
462            DependentQueuedRequestKind::EditEvent { .. }
463            | DependentQueuedRequestKind::RedactEvent
464            | DependentQueuedRequestKind::ReactEvent { .. }
465            | DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
466                // These are all aggregated events, or non-visible items (file upload producing
467                // a new MXC ID).
468                false
469            }
470            DependentQueuedRequestKind::FinishUpload { .. } => {
471                // This one graduates into a new media event.
472                true
473            }
474            #[cfg(feature = "unstable-msc4274")]
475            DependentQueuedRequestKind::FinishGallery { .. } => {
476                // This one graduates into a new gallery event.
477                true
478            }
479        }
480    }
481}
482
483#[cfg(not(tarpaulin_include))]
484impl fmt::Debug for QueuedRequest {
485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486        // Hide the content from the debug log.
487        f.debug_struct("QueuedRequest")
488            .field("transaction_id", &self.transaction_id)
489            .field("is_wedged", &self.is_wedged())
490            .finish_non_exhaustive()
491    }
492}