matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        RoomLoadSettings, SentRequestKey, ThreadSubscription,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
66}
67
68/// The filename used for the SQLITE database file used by the state store.
69pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
70
71/// Identifier of the latest database version.
72///
73/// This is used to figure whether the SQLite database requires a migration.
74/// Every new SQL migration should imply a bump of this number, and changes in
75/// the [`SqliteStateStore::run_migrations`] function.
76const DATABASE_VERSION: u8 = 13;
77
78/// An SQLite-based state store.
79#[derive(Clone)]
80pub struct SqliteStateStore {
81    store_cipher: Option<Arc<StoreCipher>>,
82    pool: SqlitePool,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for SqliteStateStore {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
89    }
90}
91
92impl SqliteStateStore {
93    /// Open the SQLite-based state store at the given path using the given
94    /// passphrase to encrypt private data.
95    pub async fn open(
96        path: impl AsRef<Path>,
97        passphrase: Option<&str>,
98    ) -> Result<Self, OpenStoreError> {
99        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
100    }
101
102    /// Open the SQLite-based state store with the config open config.
103    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
104        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
105
106        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
107
108        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
109        config.pool = Some(pool_config);
110
111        let pool = config.create_pool(Runtime::Tokio1)?;
112
113        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
114        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
115
116        Ok(this)
117    }
118
119    /// Create an SQLite-based state store using the given SQLite database pool.
120    /// The given passphrase will be used to encrypt private data.
121    pub async fn open_with_pool(
122        pool: SqlitePool,
123        passphrase: Option<&str>,
124    ) -> Result<Self, OpenStoreError> {
125        let conn = pool.get().await?;
126
127        let mut version = conn.db_version().await?;
128
129        if version == 0 {
130            init(&conn).await?;
131            version = 1;
132        }
133
134        let store_cipher = match passphrase {
135            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
136            None => None,
137        };
138        let this = Self { store_cipher, pool };
139        this.run_migrations(&conn, version, None).await?;
140
141        Ok(this)
142    }
143
144    /// Run database migrations from the given `from` version to the given `to`
145    /// version
146    ///
147    /// If `to` is `None`, the current database version will be used.
148    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
149        let to = to.unwrap_or(DATABASE_VERSION);
150
151        if from < to {
152            debug!(version = from, new_version = to, "Upgrading database");
153        } else {
154            return Ok(());
155        }
156
157        if from < 2 && to >= 2 {
158            let this = self.clone();
159            conn.with_transaction(move |txn| {
160                // Create new table.
161                txn.execute_batch(include_str!(
162                    "../migrations/state_store/002_a_create_new_room_info.sql"
163                ))?;
164
165                // Migrate data to new table.
166                for data in txn
167                    .prepare("SELECT data FROM room_info")?
168                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
169                {
170                    let data = data?;
171                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
172
173                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
174                    let state = this
175                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
176                    txn.prepare_cached(
177                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
178                         VALUES (?, ?, ?)",
179                    )?
180                    .execute((room_id, state, data))?;
181                }
182
183                // Replace old table.
184                txn.execute_batch(include_str!(
185                    "../migrations/state_store/002_b_replace_room_info.sql"
186                ))?;
187
188                txn.set_db_version(2)?;
189                Result::<_, Error>::Ok(())
190            })
191            .await?;
192        }
193
194        // Migration to v3: RoomInfo format has changed.
195        if from < 3 && to >= 3 {
196            let this = self.clone();
197            conn.with_transaction(move |txn| {
198                // Migrate data .
199                for data in txn
200                    .prepare("SELECT data FROM room_info")?
201                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
202                {
203                    let data = data?;
204                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
205
206                    // Get the `m.room.create` event from the room state.
207                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
208                    let event_type =
209                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
210                    let create_res = txn
211                        .prepare(
212                            "SELECT stripped, data FROM state_event
213                             WHERE room_id = ? AND event_type = ?",
214                        )?
215                        .query_row([room_id, event_type], |row| {
216                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
217                        })
218                        .optional()?;
219
220                    let create = create_res.and_then(|(stripped, data)| {
221                        let create = if stripped {
222                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
223                                this.deserialize_json(&data).ok()?,
224                            )
225                        } else {
226                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
227                        };
228                        Some(create)
229                    });
230
231                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
232
233                    let data = this.serialize_json(&migrated_room_info)?;
234                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
235                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
236                        .execute((data, room_id))?;
237                }
238
239                txn.set_db_version(3)?;
240                Result::<_, Error>::Ok(())
241            })
242            .await?;
243        }
244
245        if from < 4 && to >= 4 {
246            conn.with_transaction(move |txn| {
247                // Create new table.
248                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
249                txn.set_db_version(4)
250            })
251            .await?;
252        }
253
254        if from < 5 && to >= 5 {
255            conn.with_transaction(move |txn| {
256                // Create new table.
257                txn.execute_batch(include_str!(
258                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
259                ))?;
260                txn.set_db_version(4)
261            })
262            .await?;
263        }
264
265        if from < 6 && to >= 6 {
266            conn.with_transaction(move |txn| {
267                // Create new table.
268                txn.execute_batch(include_str!(
269                    "../migrations/state_store/005_send_queue_dependent_events.sql"
270                ))?;
271                txn.set_db_version(6)
272            })
273            .await?;
274        }
275
276        if from < 7 && to >= 7 {
277            conn.with_transaction(move |txn| {
278                // Drop media table.
279                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
280                txn.set_db_version(7)
281            })
282            .await?;
283        }
284
285        if from < 8 && to >= 8 {
286            // Replace all existing wedged events with a generic error.
287            let error = QueueWedgeError::GenericApiError {
288                msg: "local echo failed to send in a previous session".into(),
289            };
290            let default_err = self.serialize_value(&error)?;
291
292            conn.with_transaction(move |txn| {
293                // Update send queue table to persist the wedge reason if any.
294                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
295
296                // Migrate the data, add a generic error for currently wedged events
297
298                for wedged_entries in txn
299                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
300                    .query_map((), |row| {
301                        Ok(
302                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
303                        )
304                    })? {
305
306                    let (room_id, transaction_id) = wedged_entries?;
307
308                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
309                        .execute((default_err.clone(), room_id, transaction_id))?;
310                }
311
312
313                // Clean up the table now that data is migrated
314                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
315
316                txn.set_db_version(8)
317            })
318                .await?;
319        }
320
321        if from < 9 && to >= 9 {
322            conn.with_transaction(move |txn| {
323                // Run the migration.
324                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
325                txn.set_db_version(9)
326            })
327            .await?;
328        }
329
330        if from < 10 && to >= 10 {
331            conn.with_transaction(move |txn| {
332                // Run the migration.
333                txn.execute_batch(include_str!(
334                    "../migrations/state_store/009_send_queue_priority.sql"
335                ))?;
336                txn.set_db_version(10)
337            })
338            .await?;
339        }
340
341        if from < 11 && to >= 11 {
342            conn.with_transaction(move |txn| {
343                // Run the migration.
344                txn.execute_batch(include_str!(
345                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
346                ))?;
347                txn.set_db_version(11)
348            })
349            .await?;
350        }
351
352        if from < 12 && to >= 12 {
353            // Defragment the DB and optimize its size on the filesystem.
354            // This should have been run in the migration for version 7, to reduce the size
355            // of the DB as we removed the media cache.
356            conn.vacuum().await?;
357            conn.set_kv("version", vec![12]).await?;
358        }
359
360        if from < 13 && to >= 13 {
361            conn.with_transaction(move |txn| {
362                // Run the migration.
363                txn.execute_batch(include_str!(
364                    "../migrations/state_store/011_thread_subscriptions.sql"
365                ))?;
366                txn.set_db_version(13)
367            })
368            .await?;
369        }
370
371        Ok(())
372    }
373
374    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
375        let key_s = match key {
376            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
377            StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
378            StateStoreDataKey::Filter(f) => {
379                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
380            }
381            StateStoreDataKey::UserAvatarUrl(u) => {
382                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
383            }
384            StateStoreDataKey::RecentlyVisitedRooms(b) => {
385                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
386            }
387            StateStoreDataKey::UtdHookManagerData => {
388                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
389            }
390            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
391                if let Some(thread_root) = thread_root {
392                    Cow::Owned(format!(
393                        "{}:{room_id}:{thread_root}",
394                        StateStoreDataKey::COMPOSER_DRAFT
395                    ))
396                } else {
397                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
398                }
399            }
400            StateStoreDataKey::SeenKnockRequests(room_id) => {
401                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
402            }
403        };
404
405        self.encode_key(keys::KV_BLOB, &*key_s)
406    }
407
408    fn encode_presence_key(&self, user_id: &UserId) -> Key {
409        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
410    }
411
412    fn encode_custom_key(&self, key: &[u8]) -> Key {
413        let mut full_key = b"custom:".to_vec();
414        full_key.extend(key);
415        self.encode_key(keys::KV_BLOB, full_key)
416    }
417
418    async fn acquire(&self) -> Result<SqliteAsyncConn> {
419        Ok(self.pool.get().await?)
420    }
421
422    fn remove_maybe_stripped_room_data(
423        &self,
424        txn: &Transaction<'_>,
425        room_id: &RoomId,
426        stripped: bool,
427    ) -> rusqlite::Result<()> {
428        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
429        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
430
431        let member_room_id = self.encode_key(keys::MEMBER, room_id);
432        txn.remove_room_members(&member_room_id, Some(stripped))
433    }
434}
435
436impl EncryptableStore for SqliteStateStore {
437    fn get_cypher(&self) -> Option<&StoreCipher> {
438        self.store_cipher.as_deref()
439    }
440}
441
442/// Initialize the database.
443async fn init(conn: &SqliteAsyncConn) -> Result<()> {
444    // First turn on WAL mode, this can't be done in the transaction, it fails with
445    // the error message: "cannot change into wal mode from within a transaction".
446    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
447    conn.with_transaction(|txn| {
448        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
449        txn.set_db_version(1)?;
450
451        Ok(())
452    })
453    .await
454}
455
456trait SqliteConnectionStateStoreExt {
457    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
458
459    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
460
461    fn set_room_account_data(
462        &self,
463        room_id: &[u8],
464        event_type: &[u8],
465        data: &[u8],
466    ) -> rusqlite::Result<()>;
467    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
468
469    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
470    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
471    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
472
473    fn set_state_event(
474        &self,
475        room_id: &[u8],
476        event_type: &[u8],
477        state_key: &[u8],
478        stripped: bool,
479        event_id: Option<&[u8]>,
480        data: &[u8],
481    ) -> rusqlite::Result<()>;
482    fn get_state_event_by_id(
483        &self,
484        room_id: &[u8],
485        event_id: &[u8],
486    ) -> rusqlite::Result<Option<Vec<u8>>>;
487    fn remove_room_state_events(
488        &self,
489        room_id: &[u8],
490        stripped: Option<bool>,
491    ) -> rusqlite::Result<()>;
492
493    fn set_member(
494        &self,
495        room_id: &[u8],
496        user_id: &[u8],
497        membership: &[u8],
498        stripped: bool,
499        data: &[u8],
500    ) -> rusqlite::Result<()>;
501    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
502
503    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
504    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
505    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
506
507    fn set_receipt(
508        &self,
509        room_id: &[u8],
510        user_id: &[u8],
511        receipt_type: &[u8],
512        thread_id: &[u8],
513        event_id: &[u8],
514        data: &[u8],
515    ) -> rusqlite::Result<()>;
516    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
517
518    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
519    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
520    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
521    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
522    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
523}
524
525impl SqliteConnectionStateStoreExt for rusqlite::Connection {
526    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
527        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
528        Ok(())
529    }
530
531    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
532        self.prepare_cached(
533            "INSERT OR REPLACE INTO global_account_data (event_type, data)
534             VALUES (?, ?)",
535        )?
536        .execute((event_type, data))?;
537        Ok(())
538    }
539
540    fn set_room_account_data(
541        &self,
542        room_id: &[u8],
543        event_type: &[u8],
544        data: &[u8],
545    ) -> rusqlite::Result<()> {
546        self.prepare_cached(
547            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
548             VALUES (?, ?, ?)",
549        )?
550        .execute((room_id, event_type, data))?;
551        Ok(())
552    }
553
554    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
555        self.prepare(
556            "DELETE FROM room_account_data
557             WHERE room_id = ?",
558        )?
559        .execute((room_id,))?;
560        Ok(())
561    }
562
563    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
564        self.prepare_cached(
565            "INSERT OR REPLACE INTO room_info (room_id, state, data)
566             VALUES (?, ?, ?)",
567        )?
568        .execute((room_id, state, data))?;
569        Ok(())
570    }
571
572    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
573        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
574            .optional()
575    }
576
577    /// Remove the room info for the given room.
578    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
579        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
580        Ok(())
581    }
582
583    fn set_state_event(
584        &self,
585        room_id: &[u8],
586        event_type: &[u8],
587        state_key: &[u8],
588        stripped: bool,
589        event_id: Option<&[u8]>,
590        data: &[u8],
591    ) -> rusqlite::Result<()> {
592        self.prepare_cached(
593            "INSERT OR REPLACE
594             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
595             VALUES (?, ?, ?, ?, ?, ?)",
596        )?
597        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
598        Ok(())
599    }
600
601    fn get_state_event_by_id(
602        &self,
603        room_id: &[u8],
604        event_id: &[u8],
605    ) -> rusqlite::Result<Option<Vec<u8>>> {
606        self.query_row(
607            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
608            (room_id, event_id),
609            |row| row.get(0),
610        )
611        .optional()
612    }
613
614    /// Remove state events for the given room.
615    ///
616    /// If `stripped` is `Some()`, only removes state events for the given
617    /// stripped state. Otherwise, state events are removed regardless of the
618    /// stripped state.
619    fn remove_room_state_events(
620        &self,
621        room_id: &[u8],
622        stripped: Option<bool>,
623    ) -> rusqlite::Result<()> {
624        if let Some(stripped) = stripped {
625            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
626                .execute((room_id, stripped))?;
627        } else {
628            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
629                .execute((room_id,))?;
630        }
631        Ok(())
632    }
633
634    fn set_member(
635        &self,
636        room_id: &[u8],
637        user_id: &[u8],
638        membership: &[u8],
639        stripped: bool,
640        data: &[u8],
641    ) -> rusqlite::Result<()> {
642        self.prepare_cached(
643            "INSERT OR REPLACE
644             INTO member (room_id, user_id, membership, stripped, data)
645             VALUES (?, ?, ?, ?, ?)",
646        )?
647        .execute((room_id, user_id, membership, stripped, data))?;
648        Ok(())
649    }
650
651    /// Remove members for the given room.
652    ///
653    /// If `stripped` is `Some()`, only removes members for the given stripped
654    /// state. Otherwise, members are removed regardless of the stripped state.
655    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
656        if let Some(stripped) = stripped {
657            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
658                .execute((room_id, stripped))?;
659        } else {
660            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
661        }
662        Ok(())
663    }
664
665    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
666        self.prepare_cached(
667            "INSERT OR REPLACE
668             INTO profile (room_id, user_id, data)
669             VALUES (?, ?, ?)",
670        )?
671        .execute((room_id, user_id, data))?;
672        Ok(())
673    }
674
675    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
676        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
677        Ok(())
678    }
679
680    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
681        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
682            .execute((room_id, user_id))?;
683        Ok(())
684    }
685
686    fn set_receipt(
687        &self,
688        room_id: &[u8],
689        user_id: &[u8],
690        receipt_type: &[u8],
691        thread: &[u8],
692        event_id: &[u8],
693        data: &[u8],
694    ) -> rusqlite::Result<()> {
695        self.prepare_cached(
696            "INSERT OR REPLACE
697             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
698             VALUES (?, ?, ?, ?, ?, ?)",
699        )?
700        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
701        Ok(())
702    }
703
704    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
705        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
706        Ok(())
707    }
708
709    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
710        self.prepare_cached(
711            "INSERT OR REPLACE
712             INTO display_name (room_id, name, data)
713             VALUES (?, ?, ?)",
714        )?
715        .execute((room_id, name, data))?;
716        Ok(())
717    }
718
719    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
720        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
721            .execute((room_id, name))?;
722        Ok(())
723    }
724
725    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
726        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
727        Ok(())
728    }
729
730    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
731        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
732        Ok(())
733    }
734
735    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
736        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
737            .execute((room_id,))?;
738        Ok(())
739    }
740}
741
742#[async_trait]
743trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
744    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
745        Ok(self
746            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
747            .await
748            .optional()?)
749    }
750
751    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
752        let keys_length = keys.len();
753
754        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
755            let sql_params = repeat_vars(keys.len());
756            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
757
758            let params = rusqlite::params_from_iter(keys);
759
760            Ok(txn
761                .prepare(&sql)?
762                .query(params)?
763                .mapped(|row| row.get(0))
764                .collect::<Result<_, _>>()?)
765        })
766        .await
767    }
768
769    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
770
771    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
772        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
773        Ok(())
774    }
775
776    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
777        Ok(match room_id {
778            None => {
779                self.prepare("SELECT data FROM room_info", move |mut stmt| {
780                    stmt.query_map((), |row| row.get(0))?.collect()
781                })
782                .await?
783            }
784
785            Some(room_id) => {
786                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
787                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
788                })
789                .await?
790            }
791        })
792    }
793
794    async fn get_maybe_stripped_state_events_for_keys(
795        &self,
796        room_id: Key,
797        event_type: Key,
798        state_keys: Vec<Key>,
799    ) -> Result<Vec<(bool, Vec<u8>)>> {
800        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
801            let sql_params = repeat_vars(state_keys.len());
802            let sql = format!(
803                "SELECT stripped, data FROM state_event
804                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
805            );
806
807            let params = rusqlite::params_from_iter(
808                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
809            );
810
811            Ok(txn
812                .prepare(&sql)?
813                .query(params)?
814                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
815                .collect::<Result<_, _>>()?)
816        })
817        .await
818    }
819
820    async fn get_maybe_stripped_state_events(
821        &self,
822        room_id: Key,
823        event_type: Key,
824    ) -> Result<Vec<(bool, Vec<u8>)>> {
825        Ok(self
826            .prepare(
827                "SELECT stripped, data FROM state_event
828                 WHERE room_id = ? AND event_type = ?",
829                |mut stmt| {
830                    stmt.query((room_id, event_type))?
831                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
832                        .collect()
833                },
834            )
835            .await?)
836    }
837
838    async fn get_profiles(
839        &self,
840        room_id: Key,
841        user_ids: Vec<Key>,
842    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
843        let user_ids_length = user_ids.len();
844
845        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
846            let sql_params = repeat_vars(user_ids.len());
847            let sql = format!(
848                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
849            );
850
851            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
852
853            Ok(txn
854                .prepare(&sql)?
855                .query(params)?
856                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
857                .collect::<Result<_, _>>()?)
858        })
859        .await
860    }
861
862    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
863        let res = if memberships.is_empty() {
864            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
865                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
866            })
867            .await?
868        } else {
869            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
870                let sql_params = repeat_vars(memberships.len());
871                let sql = format!(
872                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
873                );
874
875                let params =
876                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
877
878                Ok(txn
879                    .prepare(&sql)?
880                    .query(params)?
881                    .mapped(|row| row.get(0))
882                    .collect::<Result<_, _>>()?)
883            })
884            .await?
885        };
886
887        Ok(res)
888    }
889
890    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
891        Ok(self
892            .query_row(
893                "SELECT data FROM global_account_data WHERE event_type = ?",
894                (event_type,),
895                |row| row.get(0),
896            )
897            .await
898            .optional()?)
899    }
900
901    async fn get_room_account_data(
902        &self,
903        room_id: Key,
904        event_type: Key,
905    ) -> Result<Option<Vec<u8>>> {
906        Ok(self
907            .query_row(
908                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
909                (room_id, event_type),
910                |row| row.get(0),
911            )
912            .await
913            .optional()?)
914    }
915
916    async fn get_display_names(
917        &self,
918        room_id: Key,
919        names: Vec<Key>,
920    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
921        let names_length = names.len();
922
923        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
924            let sql_params = repeat_vars(names.len());
925            let sql = format!(
926                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
927            );
928
929            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
930
931            Ok(txn
932                .prepare(&sql)?
933                .query(params)?
934                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
935                .collect::<Result<_, _>>()?)
936        })
937        .await
938    }
939
940    async fn get_user_receipt(
941        &self,
942        room_id: Key,
943        receipt_type: Key,
944        thread: Key,
945        user_id: Key,
946    ) -> Result<Option<Vec<u8>>> {
947        Ok(self
948            .query_row(
949                "SELECT data FROM receipt
950                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
951                (room_id, receipt_type, thread, user_id),
952                |row| row.get(0),
953            )
954            .await
955            .optional()?)
956    }
957
958    async fn get_event_receipts(
959        &self,
960        room_id: Key,
961        receipt_type: Key,
962        thread: Key,
963        event_id: Key,
964    ) -> Result<Vec<Vec<u8>>> {
965        Ok(self
966            .prepare(
967                "SELECT data FROM receipt
968                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
969                |mut stmt| {
970                    stmt.query((room_id, receipt_type, thread, event_id))?
971                        .mapped(|row| row.get(0))
972                        .collect()
973                },
974            )
975            .await?)
976    }
977}
978
979#[async_trait]
980impl SqliteObjectStateStoreExt for SqliteAsyncConn {
981    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
982        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
983    }
984}
985
986#[async_trait]
987impl StateStore for SqliteStateStore {
988    type Error = Error;
989
990    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
991        self.acquire()
992            .await?
993            .get_kv_blob(self.encode_state_store_data_key(key))
994            .await?
995            .map(|data| {
996                Ok(match key {
997                    StateStoreDataKey::SyncToken => {
998                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
999                    }
1000                    StateStoreDataKey::ServerInfo => {
1001                        StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1002                    }
1003                    StateStoreDataKey::Filter(_) => {
1004                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1005                    }
1006                    StateStoreDataKey::UserAvatarUrl(_) => {
1007                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1008                    }
1009                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1010                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1011                    }
1012                    StateStoreDataKey::UtdHookManagerData => {
1013                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1014                    }
1015                    StateStoreDataKey::ComposerDraft(_, _) => {
1016                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1017                    }
1018                    StateStoreDataKey::SeenKnockRequests(_) => {
1019                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1020                    }
1021                })
1022            })
1023            .transpose()
1024    }
1025
1026    async fn set_kv_data(
1027        &self,
1028        key: StateStoreDataKey<'_>,
1029        value: StateStoreDataValue,
1030    ) -> Result<()> {
1031        let serialized_value = match key {
1032            StateStoreDataKey::SyncToken => self.serialize_value(
1033                &value.into_sync_token().expect("Session data not a sync token"),
1034            )?,
1035            StateStoreDataKey::ServerInfo => self.serialize_value(
1036                &value.into_server_info().expect("Session data not containing server info"),
1037            )?,
1038            StateStoreDataKey::Filter(_) => {
1039                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1040            }
1041            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1042                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1043            )?,
1044            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1045                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1046            )?,
1047            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1048                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1049            )?,
1050            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1051                &value.into_composer_draft().expect("Session data not a composer draft"),
1052            )?,
1053            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1054                &value
1055                    .into_seen_knock_requests()
1056                    .expect("Session data is not a set of seen knock request ids"),
1057            )?,
1058        };
1059
1060        self.acquire()
1061            .await?
1062            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1063            .await
1064    }
1065
1066    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1067        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1068    }
1069
1070    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1071        let changes = changes.to_owned();
1072        let this = self.clone();
1073        self.acquire()
1074            .await?
1075            .with_transaction(move |txn| {
1076                let StateChanges {
1077                    sync_token,
1078                    account_data,
1079                    presence,
1080                    profiles,
1081                    profiles_to_delete,
1082                    state,
1083                    room_account_data,
1084                    room_infos,
1085                    receipts,
1086                    redactions,
1087                    stripped_state,
1088                    ambiguity_maps,
1089                } = changes;
1090
1091                if let Some(sync_token) = sync_token {
1092                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1093                    let value = this.serialize_value(&sync_token)?;
1094                    txn.set_kv_blob(&key, &value)?;
1095                }
1096
1097                for (event_type, event) in account_data {
1098                    let event_type =
1099                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1100                    let data = this.serialize_json(&event)?;
1101                    txn.set_global_account_data(&event_type, &data)?;
1102                }
1103
1104                for (room_id, events) in room_account_data {
1105                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1106                    for (event_type, event) in events {
1107                        let event_type =
1108                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1109                        let data = this.serialize_json(&event)?;
1110                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1111                    }
1112                }
1113
1114                for (user_id, event) in presence {
1115                    let key = this.encode_presence_key(&user_id);
1116                    let value = this.serialize_json(&event)?;
1117                    txn.set_kv_blob(&key, &value)?;
1118                }
1119
1120                for (room_id, room_info) in room_infos {
1121                    let stripped = room_info.state() == RoomState::Invited;
1122                    // Remove non-stripped data for stripped rooms and vice-versa.
1123                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1124
1125                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1126                    let state = this
1127                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1128                    let data = this.serialize_json(&room_info)?;
1129                    txn.set_room_info(&room_id, &state, &data)?;
1130                }
1131
1132                for (room_id, user_ids) in profiles_to_delete {
1133                    let room_id = this.encode_key(keys::PROFILE, room_id);
1134                    for user_id in user_ids {
1135                        let user_id = this.encode_key(keys::PROFILE, user_id);
1136                        txn.remove_room_profile(&room_id, &user_id)?;
1137                    }
1138                }
1139
1140                for (room_id, state_event_types) in state {
1141                    let profiles = profiles.get(&room_id);
1142                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1143
1144                    for (event_type, state_events) in state_event_types {
1145                        let encoded_event_type =
1146                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1147
1148                        for (state_key, raw_state_event) in state_events {
1149                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1150                            let data = this.serialize_json(&raw_state_event)?;
1151
1152                            let event_id: Option<String> =
1153                                raw_state_event.get_field("event_id").ok().flatten();
1154                            let encoded_event_id =
1155                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1156
1157                            txn.set_state_event(
1158                                &encoded_room_id,
1159                                &encoded_event_type,
1160                                &encoded_state_key,
1161                                false,
1162                                encoded_event_id.as_deref(),
1163                                &data,
1164                            )?;
1165
1166                            if event_type == StateEventType::RoomMember {
1167                                let member_event = match raw_state_event
1168                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1169                                {
1170                                    Ok(ev) => ev,
1171                                    Err(e) => {
1172                                        debug!(event_id, "Failed to deserialize member event: {e}");
1173                                        continue;
1174                                    }
1175                                };
1176
1177                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1178                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1179                                let membership = this
1180                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1181                                let data = this.serialize_value(&state_key)?;
1182
1183                                txn.set_member(
1184                                    &encoded_room_id,
1185                                    &user_id,
1186                                    &membership,
1187                                    false,
1188                                    &data,
1189                                )?;
1190
1191                                if let Some(profile) =
1192                                    profiles.and_then(|p| p.get(member_event.state_key()))
1193                                {
1194                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1195                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1196                                    let data = this.serialize_json(&profile)?;
1197                                    txn.set_profile(&room_id, &user_id, &data)?;
1198                                }
1199                            }
1200                        }
1201                    }
1202                }
1203
1204                for (room_id, stripped_state_event_types) in stripped_state {
1205                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1206
1207                    for (event_type, stripped_state_events) in stripped_state_event_types {
1208                        let encoded_event_type =
1209                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1210
1211                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1212                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1213                            let data = this.serialize_json(&raw_stripped_state_event)?;
1214                            txn.set_state_event(
1215                                &encoded_room_id,
1216                                &encoded_event_type,
1217                                &encoded_state_key,
1218                                true,
1219                                None,
1220                                &data,
1221                            )?;
1222
1223                            if event_type == StateEventType::RoomMember {
1224                                let member_event = match raw_stripped_state_event
1225                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1226                                ) {
1227                                    Ok(ev) => ev,
1228                                    Err(e) => {
1229                                        debug!("Failed to deserialize stripped member event: {e}");
1230                                        continue;
1231                                    }
1232                                };
1233
1234                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1235                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1236                                let membership = this.encode_key(
1237                                    keys::MEMBER,
1238                                    member_event.content.membership.as_str(),
1239                                );
1240                                let data = this.serialize_value(&state_key)?;
1241
1242                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1243                            }
1244                        }
1245                    }
1246                }
1247
1248                for (room_id, receipt_event) in receipts {
1249                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1250
1251                    for (event_id, receipt_types) in receipt_event {
1252                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1253
1254                        for (receipt_type, receipt_users) in receipt_types {
1255                            let receipt_type =
1256                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1257
1258                            for (user_id, receipt) in receipt_users {
1259                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1260                                // We cannot have a NULL primary key so we rely on serialization
1261                                // instead of the string representation.
1262                                let thread = this.encode_key(
1263                                    keys::RECEIPT,
1264                                    rmp_serde::to_vec_named(&receipt.thread)?,
1265                                );
1266                                let data = this.serialize_json(&ReceiptData {
1267                                    receipt,
1268                                    event_id: event_id.clone(),
1269                                    user_id,
1270                                })?;
1271
1272                                txn.set_receipt(
1273                                    &room_id,
1274                                    &encoded_user_id,
1275                                    &receipt_type,
1276                                    &thread,
1277                                    &encoded_event_id,
1278                                    &data,
1279                                )?;
1280                            }
1281                        }
1282                    }
1283                }
1284
1285                for (room_id, redactions) in redactions {
1286                    let make_redaction_rules = || {
1287                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1288                        txn.get_room_info(&encoded_room_id)
1289                            .ok()
1290                            .flatten()
1291                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1292                            .map(|info| info.room_version_rules_or_default())
1293                            .unwrap_or_else(|| {
1294                                warn!(
1295                                    ?room_id,
1296                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1297                                );
1298                                ROOM_VERSION_RULES_FALLBACK
1299                            }).redaction
1300                    };
1301
1302                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1303                    let mut redaction_rules = None;
1304
1305                    for (event_id, redaction) in redactions {
1306                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1307
1308                        if let Some(Ok(raw_event)) = txn
1309                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1310                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1311                        {
1312                            let event = raw_event.deserialize()?;
1313                            let redacted = redact(
1314                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1315                                redaction_rules.get_or_insert_with(make_redaction_rules),
1316                                Some(RedactedBecause::from_raw_event(&redaction)?),
1317                            )
1318                            .map_err(Error::Redaction)?;
1319                            let data = this.serialize_json(&redacted)?;
1320
1321                            let event_type =
1322                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1323                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1324
1325                            txn.set_state_event(
1326                                &encoded_room_id,
1327                                &event_type,
1328                                &state_key,
1329                                false,
1330                                Some(&event_id),
1331                                &data,
1332                            )?;
1333                        }
1334                    }
1335                }
1336
1337                for (room_id, display_names) in ambiguity_maps {
1338                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1339
1340                    for (name, user_ids) in display_names {
1341                        let encoded_name = this.encode_key(
1342                            keys::DISPLAY_NAME,
1343                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1344                        );
1345                        let data = this.serialize_json(&user_ids)?;
1346
1347                        if user_ids.is_empty() {
1348                            txn.remove_display_name(&room_id, &encoded_name)?;
1349
1350                            // We can't do a migration to merge the previously distinct buckets of
1351                            // user IDs since the display names themselves are hashed before they
1352                            // are persisted in the store. So the store will always retain two
1353                            // buckets: one for raw display names and one for normalised ones.
1354                            //
1355                            // We therefore do the next best thing, which is a sort of a soft
1356                            // migration: we fetch both the raw and normalised buckets, then merge
1357                            // the user IDs contained in them into a separate, temporary merged
1358                            // bucket. The SDK then operates on the merged buckets exclusively. See
1359                            // the comment in `get_users_with_display_names` for details.
1360                            //
1361                            // If the merged bucket is empty, that must mean that both the raw and
1362                            // normalised buckets were also empty, so we can remove both from the
1363                            // store.
1364                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1365                            txn.remove_display_name(&room_id, &raw_name)?;
1366                        } else {
1367                            // We only create new buckets with the normalized display name.
1368                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1369                        }
1370                    }
1371                }
1372
1373                Ok::<_, Error>(())
1374            })
1375            .await?;
1376
1377        Ok(())
1378    }
1379
1380    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1381        self.acquire()
1382            .await?
1383            .get_kv_blob(self.encode_presence_key(user_id))
1384            .await?
1385            .map(|data| self.deserialize_json(&data))
1386            .transpose()
1387    }
1388
1389    async fn get_presence_events(
1390        &self,
1391        user_ids: &[OwnedUserId],
1392    ) -> Result<Vec<Raw<PresenceEvent>>> {
1393        if user_ids.is_empty() {
1394            return Ok(Vec::new());
1395        }
1396
1397        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1398        self.acquire()
1399            .await?
1400            .get_kv_blobs(user_ids)
1401            .await?
1402            .into_iter()
1403            .map(|data| self.deserialize_json(&data))
1404            .collect()
1405    }
1406
1407    async fn get_state_event(
1408        &self,
1409        room_id: &RoomId,
1410        event_type: StateEventType,
1411        state_key: &str,
1412    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1413        Ok(self
1414            .get_state_events_for_keys(room_id, event_type, &[state_key])
1415            .await?
1416            .into_iter()
1417            .next())
1418    }
1419
1420    async fn get_state_events(
1421        &self,
1422        room_id: &RoomId,
1423        event_type: StateEventType,
1424    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1425        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1426        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1427        self.acquire()
1428            .await?
1429            .get_maybe_stripped_state_events(room_id, event_type)
1430            .await?
1431            .into_iter()
1432            .map(|(stripped, data)| {
1433                let ev = if stripped {
1434                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1435                } else {
1436                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1437                };
1438
1439                Ok(ev)
1440            })
1441            .collect()
1442    }
1443
1444    async fn get_state_events_for_keys(
1445        &self,
1446        room_id: &RoomId,
1447        event_type: StateEventType,
1448        state_keys: &[&str],
1449    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1450        if state_keys.is_empty() {
1451            return Ok(Vec::new());
1452        }
1453
1454        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1455        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1456        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1457        self.acquire()
1458            .await?
1459            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1460            .await?
1461            .into_iter()
1462            .map(|(stripped, data)| {
1463                let ev = if stripped {
1464                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1465                } else {
1466                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1467                };
1468
1469                Ok(ev)
1470            })
1471            .collect()
1472    }
1473
1474    async fn get_profile(
1475        &self,
1476        room_id: &RoomId,
1477        user_id: &UserId,
1478    ) -> Result<Option<MinimalRoomMemberEvent>> {
1479        let room_id = self.encode_key(keys::PROFILE, room_id);
1480        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1481
1482        self.acquire()
1483            .await?
1484            .get_profiles(room_id, user_ids)
1485            .await?
1486            .into_iter()
1487            .next()
1488            .map(|(_, data)| self.deserialize_json(&data))
1489            .transpose()
1490    }
1491
1492    async fn get_profiles<'a>(
1493        &self,
1494        room_id: &RoomId,
1495        user_ids: &'a [OwnedUserId],
1496    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1497        if user_ids.is_empty() {
1498            return Ok(BTreeMap::new());
1499        }
1500
1501        let room_id = self.encode_key(keys::PROFILE, room_id);
1502        let mut user_ids_map = user_ids
1503            .iter()
1504            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1505            .collect::<BTreeMap<_, _>>();
1506        let user_ids = user_ids_map.keys().cloned().collect();
1507
1508        self.acquire()
1509            .await?
1510            .get_profiles(room_id, user_ids)
1511            .await?
1512            .into_iter()
1513            .map(|(user_id, data)| {
1514                Ok((
1515                    user_ids_map
1516                        .remove(user_id.as_slice())
1517                        .expect("returned user IDs were requested"),
1518                    self.deserialize_json(&data)?,
1519                ))
1520            })
1521            .collect()
1522    }
1523
1524    async fn get_user_ids(
1525        &self,
1526        room_id: &RoomId,
1527        membership: RoomMemberships,
1528    ) -> Result<Vec<OwnedUserId>> {
1529        let room_id = self.encode_key(keys::MEMBER, room_id);
1530        let memberships = membership
1531            .as_vec()
1532            .into_iter()
1533            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1534            .collect();
1535        self.acquire()
1536            .await?
1537            .get_user_ids(room_id, memberships)
1538            .await?
1539            .iter()
1540            .map(|data| self.deserialize_value(data))
1541            .collect()
1542    }
1543
1544    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1545        self.acquire()
1546            .await?
1547            .get_room_infos(match room_load_settings {
1548                RoomLoadSettings::All => None,
1549                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1550            })
1551            .await?
1552            .into_iter()
1553            .map(|data| self.deserialize_json(&data))
1554            .collect()
1555    }
1556
1557    async fn get_users_with_display_name(
1558        &self,
1559        room_id: &RoomId,
1560        display_name: &DisplayName,
1561    ) -> Result<BTreeSet<OwnedUserId>> {
1562        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1563        let names = vec![self.encode_key(
1564            keys::DISPLAY_NAME,
1565            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1566        )];
1567
1568        Ok(self
1569            .acquire()
1570            .await?
1571            .get_display_names(room_id, names)
1572            .await?
1573            .into_iter()
1574            .next()
1575            .map(|(_, data)| self.deserialize_json(&data))
1576            .transpose()?
1577            .unwrap_or_default())
1578    }
1579
1580    async fn get_users_with_display_names<'a>(
1581        &self,
1582        room_id: &RoomId,
1583        display_names: &'a [DisplayName],
1584    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1585        let mut result = HashMap::new();
1586
1587        if display_names.is_empty() {
1588            return Ok(result);
1589        }
1590
1591        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1592        let mut names_map = display_names
1593            .iter()
1594            .flat_map(|display_name| {
1595                // We encode the display name as the `raw_str()` and the normalized string.
1596                //
1597                // This is for compatibility reasons since:
1598                //  1. Previously "Alice" and "alice" were considered to be distinct display
1599                //     names, while we now consider them to be the same so we need to merge the
1600                //     previously distinct buckets of user IDs.
1601                //  2. We can't do a migration to merge the previously distinct buckets of user
1602                //     IDs since the display names itself are hashed before they are persisted
1603                //     in the store.
1604                let raw =
1605                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1606                let normalized = display_name.as_normalized_str().map(|normalized| {
1607                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1608                });
1609
1610                iter::once(raw).chain(normalized.into_iter())
1611            })
1612            .collect::<BTreeMap<_, _>>();
1613        let names = names_map.keys().cloned().collect();
1614
1615        for (name, data) in
1616            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1617        {
1618            let display_name =
1619                names_map.remove(name.as_slice()).expect("returned display names were requested");
1620            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1621
1622            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1623        }
1624
1625        Ok(result)
1626    }
1627
1628    async fn get_account_data_event(
1629        &self,
1630        event_type: GlobalAccountDataEventType,
1631    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1632        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1633        self.acquire()
1634            .await?
1635            .get_global_account_data(event_type)
1636            .await?
1637            .map(|value| self.deserialize_json(&value))
1638            .transpose()
1639    }
1640
1641    async fn get_room_account_data_event(
1642        &self,
1643        room_id: &RoomId,
1644        event_type: RoomAccountDataEventType,
1645    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1646        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1647        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1648        self.acquire()
1649            .await?
1650            .get_room_account_data(room_id, event_type)
1651            .await?
1652            .map(|value| self.deserialize_json(&value))
1653            .transpose()
1654    }
1655
1656    async fn get_user_room_receipt_event(
1657        &self,
1658        room_id: &RoomId,
1659        receipt_type: ReceiptType,
1660        thread: ReceiptThread,
1661        user_id: &UserId,
1662    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1663        let room_id = self.encode_key(keys::RECEIPT, room_id);
1664        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1665        // We cannot have a NULL primary key so we rely on serialization instead of the
1666        // string representation.
1667        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1668        let user_id = self.encode_key(keys::RECEIPT, user_id);
1669
1670        self.acquire()
1671            .await?
1672            .get_user_receipt(room_id, receipt_type, thread, user_id)
1673            .await?
1674            .map(|value| {
1675                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1676            })
1677            .transpose()
1678    }
1679
1680    async fn get_event_room_receipt_events(
1681        &self,
1682        room_id: &RoomId,
1683        receipt_type: ReceiptType,
1684        thread: ReceiptThread,
1685        event_id: &EventId,
1686    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1687        let room_id = self.encode_key(keys::RECEIPT, room_id);
1688        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1689        // We cannot have a NULL primary key so we rely on serialization instead of the
1690        // string representation.
1691        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1692        let event_id = self.encode_key(keys::RECEIPT, event_id);
1693
1694        self.acquire()
1695            .await?
1696            .get_event_receipts(room_id, receipt_type, thread, event_id)
1697            .await?
1698            .iter()
1699            .map(|value| {
1700                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1701            })
1702            .collect()
1703    }
1704
1705    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1706        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1707    }
1708
1709    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1710        let conn = self.acquire().await?;
1711        let key = self.encode_custom_key(key);
1712        conn.set_kv_blob(key, value).await?;
1713        Ok(())
1714    }
1715
1716    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1717        let conn = self.acquire().await?;
1718        let key = self.encode_custom_key(key);
1719        let previous = conn.get_kv_blob(key.clone()).await?;
1720        conn.set_kv_blob(key, value).await?;
1721        Ok(previous)
1722    }
1723
1724    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1725        let conn = self.acquire().await?;
1726        let key = self.encode_custom_key(key);
1727        let previous = conn.get_kv_blob(key.clone()).await?;
1728        if previous.is_some() {
1729            conn.delete_kv_blob(key).await?;
1730        }
1731        Ok(previous)
1732    }
1733
1734    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1735        let this = self.clone();
1736        let room_id = room_id.to_owned();
1737
1738        let conn = self.acquire().await?;
1739
1740        conn.with_transaction(move |txn| -> Result<()> {
1741            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1742            txn.remove_room_info(&room_info_room_id)?;
1743
1744            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1745            txn.remove_room_state_events(&state_event_room_id, None)?;
1746
1747            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1748            txn.remove_room_members(&member_room_id, None)?;
1749
1750            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1751            txn.remove_room_profiles(&profile_room_id)?;
1752
1753            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1754            txn.remove_room_account_data(&room_account_data_room_id)?;
1755
1756            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1757            txn.remove_room_receipts(&receipt_room_id)?;
1758
1759            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1760            txn.remove_room_display_names(&display_name_room_id)?;
1761
1762            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1763            txn.remove_room_send_queue(&send_queue_room_id)?;
1764
1765            let dependent_send_queue_room_id =
1766                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1767            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1768
1769            let thread_subscriptions_room_id =
1770                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1771            txn.execute(
1772                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1773                (thread_subscriptions_room_id,),
1774            )?;
1775
1776            Ok(())
1777        })
1778        .await?;
1779
1780        conn.vacuum().await
1781    }
1782
1783    async fn save_send_queue_request(
1784        &self,
1785        room_id: &RoomId,
1786        transaction_id: OwnedTransactionId,
1787        created_at: MilliSecondsSinceUnixEpoch,
1788        content: QueuedRequestKind,
1789        priority: usize,
1790    ) -> Result<(), Self::Error> {
1791        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1792        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1793
1794        let content = self.serialize_json(&content)?;
1795        // The transaction id is used both as a key (in remove/update) and a value (as
1796        // it's useful for the callers), so we keep it as is, and neither hash
1797        // it (with encode_key) or encrypt it (through serialize_value). After
1798        // all, it carries no personal information, so this is considered fine.
1799
1800        let created_at_ts: u64 = created_at.0.into();
1801        self.acquire()
1802            .await?
1803            .with_transaction(move |txn| {
1804                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1805                Ok(())
1806            })
1807            .await
1808    }
1809
1810    async fn update_send_queue_request(
1811        &self,
1812        room_id: &RoomId,
1813        transaction_id: &TransactionId,
1814        content: QueuedRequestKind,
1815    ) -> Result<bool, Self::Error> {
1816        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1817
1818        let content = self.serialize_json(&content)?;
1819        // See comment in [`Self::save_send_queue_event`] to understand why the
1820        // transaction id is neither encrypted or hashed.
1821        let transaction_id = transaction_id.to_string();
1822
1823        let num_updated = self.acquire()
1824            .await?
1825            .with_transaction(move |txn| {
1826                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1827            })
1828            .await?;
1829
1830        Ok(num_updated > 0)
1831    }
1832
1833    async fn remove_send_queue_request(
1834        &self,
1835        room_id: &RoomId,
1836        transaction_id: &TransactionId,
1837    ) -> Result<bool, Self::Error> {
1838        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1839
1840        // See comment in `save_send_queue_event`.
1841        let transaction_id = transaction_id.to_string();
1842
1843        let num_deleted = self
1844            .acquire()
1845            .await?
1846            .with_transaction(move |txn| {
1847                txn.prepare_cached(
1848                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1849                )?
1850                .execute((room_id, &transaction_id))
1851            })
1852            .await?;
1853
1854        Ok(num_deleted > 0)
1855    }
1856
1857    async fn load_send_queue_requests(
1858        &self,
1859        room_id: &RoomId,
1860    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1861        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1862
1863        // Note: ROWID is always present and is an auto-incremented integer counter. We
1864        // want to maintain the insertion order, so we can sort using it.
1865        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1866        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1867            .acquire()
1868            .await?
1869            .prepare(
1870                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1871                |mut stmt| {
1872                    stmt.query((room_id,))?
1873                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1874                        .collect()
1875                },
1876            )
1877            .await?;
1878
1879        let mut requests = Vec::with_capacity(res.len());
1880        for entry in res {
1881            let created_at = entry
1882                .4
1883                .and_then(UInt::new)
1884                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1885            requests.push(QueuedRequest {
1886                transaction_id: entry.0.into(),
1887                kind: self.deserialize_json(&entry.1)?,
1888                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1889                priority: entry.3,
1890                created_at,
1891            });
1892        }
1893
1894        Ok(requests)
1895    }
1896
1897    async fn update_send_queue_request_status(
1898        &self,
1899        room_id: &RoomId,
1900        transaction_id: &TransactionId,
1901        error: Option<QueueWedgeError>,
1902    ) -> Result<(), Self::Error> {
1903        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1904
1905        // See comment in `save_send_queue_event`.
1906        let transaction_id = transaction_id.to_string();
1907
1908        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1909        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1910
1911        self.acquire()
1912            .await?
1913            .with_transaction(move |txn| {
1914                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1915                Ok(())
1916            })
1917            .await
1918    }
1919
1920    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1921        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1922        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1923        // != encrypted(X), since we use a nonce in the encryption process.
1924
1925        let res: Vec<Vec<u8>> = self
1926            .acquire()
1927            .await?
1928            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1929                stmt.query(())?.mapped(|row| row.get(0)).collect()
1930            })
1931            .await?;
1932
1933        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1934        // then rejigger that into a vector.
1935        Ok(res
1936            .into_iter()
1937            .map(|entry| self.deserialize_value(&entry))
1938            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1939            .into_iter()
1940            .collect())
1941    }
1942
1943    async fn save_dependent_queued_request(
1944        &self,
1945        room_id: &RoomId,
1946        parent_txn_id: &TransactionId,
1947        own_txn_id: ChildTransactionId,
1948        created_at: MilliSecondsSinceUnixEpoch,
1949        content: DependentQueuedRequestKind,
1950    ) -> Result<()> {
1951        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1952        let content = self.serialize_json(&content)?;
1953
1954        // See comment in `save_send_queue_event`.
1955        let parent_txn_id = parent_txn_id.to_string();
1956        let own_txn_id = own_txn_id.to_string();
1957
1958        let created_at_ts: u64 = created_at.0.into();
1959        self.acquire()
1960            .await?
1961            .with_transaction(move |txn| {
1962                txn.prepare_cached(
1963                    r#"INSERT INTO dependent_send_queue_events
1964                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1965                       VALUES (?, ?, ?, ?, ?)"#,
1966                )?
1967                .execute((
1968                    room_id,
1969                    parent_txn_id,
1970                    own_txn_id,
1971                    content,
1972                    created_at_ts,
1973                ))?;
1974                Ok(())
1975            })
1976            .await
1977    }
1978
1979    async fn update_dependent_queued_request(
1980        &self,
1981        room_id: &RoomId,
1982        own_transaction_id: &ChildTransactionId,
1983        new_content: DependentQueuedRequestKind,
1984    ) -> Result<bool> {
1985        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1986        let content = self.serialize_json(&new_content)?;
1987
1988        // See comment in `save_send_queue_event`.
1989        let own_txn_id = own_transaction_id.to_string();
1990
1991        let num_updated = self
1992            .acquire()
1993            .await?
1994            .with_transaction(move |txn| {
1995                txn.prepare_cached(
1996                    r#"UPDATE dependent_send_queue_events
1997                       SET content = ?
1998                       WHERE own_transaction_id = ?
1999                       AND room_id = ?"#,
2000                )?
2001                .execute((content, own_txn_id, room_id))
2002            })
2003            .await?;
2004
2005        if num_updated > 1 {
2006            return Err(Error::InconsistentUpdate);
2007        }
2008
2009        Ok(num_updated == 1)
2010    }
2011
2012    async fn mark_dependent_queued_requests_as_ready(
2013        &self,
2014        room_id: &RoomId,
2015        parent_txn_id: &TransactionId,
2016        parent_key: SentRequestKey,
2017    ) -> Result<usize> {
2018        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2019        let parent_key = self.serialize_value(&parent_key)?;
2020
2021        // See comment in `save_send_queue_event`.
2022        let parent_txn_id = parent_txn_id.to_string();
2023
2024        self.acquire()
2025            .await?
2026            .with_transaction(move |txn| {
2027                Ok(txn.prepare_cached(
2028                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2029                )?
2030                .execute((parent_key, parent_txn_id, room_id))?)
2031            })
2032            .await
2033    }
2034
2035    async fn remove_dependent_queued_request(
2036        &self,
2037        room_id: &RoomId,
2038        txn_id: &ChildTransactionId,
2039    ) -> Result<bool> {
2040        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2041
2042        // See comment in `save_send_queue_event`.
2043        let txn_id = txn_id.to_string();
2044
2045        let num_deleted = self
2046            .acquire()
2047            .await?
2048            .with_transaction(move |txn| {
2049                txn.prepare_cached(
2050                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2051                )?
2052                .execute((txn_id, room_id))
2053            })
2054            .await?;
2055
2056        Ok(num_deleted > 0)
2057    }
2058
2059    async fn load_dependent_queued_requests(
2060        &self,
2061        room_id: &RoomId,
2062    ) -> Result<Vec<DependentQueuedRequest>> {
2063        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2064
2065        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2066        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2067            .acquire()
2068            .await?
2069            .prepare(
2070                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2071                |mut stmt| {
2072                    stmt.query((room_id,))?
2073                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2074                        .collect()
2075                },
2076            )
2077            .await?;
2078
2079        let mut dependent_events = Vec::with_capacity(res.len());
2080        for entry in res {
2081            let created_at = entry
2082                .4
2083                .and_then(UInt::new)
2084                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2085            dependent_events.push(DependentQueuedRequest {
2086                own_transaction_id: entry.0.into(),
2087                parent_transaction_id: entry.1.into(),
2088                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2089                kind: self.deserialize_json(&entry.3)?,
2090                created_at,
2091            });
2092        }
2093
2094        Ok(dependent_events)
2095    }
2096
2097    async fn upsert_thread_subscription(
2098        &self,
2099        room_id: &RoomId,
2100        thread_id: &EventId,
2101        subscription: ThreadSubscription,
2102    ) -> Result<(), Self::Error> {
2103        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2104        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2105        let status = subscription.as_str();
2106
2107        self.acquire()
2108            .await?
2109            .with_transaction(move |txn| {
2110                txn.prepare_cached(
2111                    "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status)
2112                         VALUES (?, ?, ?)",
2113                )?
2114                .execute((room_id, thread_id, status))
2115            })
2116            .await?;
2117        Ok(())
2118    }
2119
2120    async fn load_thread_subscription(
2121        &self,
2122        room_id: &RoomId,
2123        thread_id: &EventId,
2124    ) -> Result<Option<ThreadSubscription>, Self::Error> {
2125        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2126        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2127
2128        Ok(self
2129            .acquire()
2130            .await?
2131            .query_row(
2132                "SELECT status FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2133                (room_id, thread_id),
2134                |row| row.get::<_, String>(0),
2135            )
2136            .await
2137            .optional()?
2138            .map(|data| {
2139                ThreadSubscription::from_value(&data).ok_or_else(|| Error::InvalidData {
2140                    details: format!("Invalid thread status: {data}"),
2141                })
2142            })
2143            .transpose()?)
2144    }
2145
2146    async fn remove_thread_subscription(
2147        &self,
2148        room_id: &RoomId,
2149        thread_id: &EventId,
2150    ) -> Result<(), Self::Error> {
2151        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2152        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2153
2154        self.acquire()
2155            .await?
2156            .execute(
2157                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2158                (room_id, thread_id),
2159            )
2160            .await?;
2161
2162        Ok(())
2163    }
2164}
2165
2166#[derive(Debug, Clone, Serialize, Deserialize)]
2167struct ReceiptData {
2168    receipt: Receipt,
2169    event_id: OwnedEventId,
2170    user_id: OwnedUserId,
2171}
2172
2173#[cfg(test)]
2174mod tests {
2175    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2176
2177    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2178    use once_cell::sync::Lazy;
2179    use tempfile::{tempdir, TempDir};
2180
2181    use super::SqliteStateStore;
2182
2183    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2184    static NUM: AtomicU32 = AtomicU32::new(0);
2185
2186    async fn get_store() -> Result<impl StateStore, StoreError> {
2187        let name = NUM.fetch_add(1, SeqCst).to_string();
2188        let tmpdir_path = TMP_DIR.path().join(name);
2189
2190        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2191
2192        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2193    }
2194
2195    statestore_integration_tests!();
2196}
2197
2198#[cfg(test)]
2199mod encrypted_tests {
2200    use std::{
2201        path::PathBuf,
2202        sync::atomic::{AtomicU32, Ordering::SeqCst},
2203    };
2204
2205    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2206    use matrix_sdk_test::async_test;
2207    use once_cell::sync::Lazy;
2208    use tempfile::{tempdir, TempDir};
2209
2210    use super::SqliteStateStore;
2211    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2212
2213    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2214    static NUM: AtomicU32 = AtomicU32::new(0);
2215
2216    fn new_state_store_workspace() -> PathBuf {
2217        let name = NUM.fetch_add(1, SeqCst).to_string();
2218        TMP_DIR.path().join(name)
2219    }
2220
2221    async fn get_store() -> Result<impl StateStore, StoreError> {
2222        let tmpdir_path = new_state_store_workspace();
2223
2224        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2225
2226        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2227            .await
2228            .unwrap())
2229    }
2230
2231    #[async_test]
2232    async fn test_pool_size() {
2233        let tmpdir_path = new_state_store_workspace();
2234        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2235
2236        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2237
2238        assert_eq!(store.pool.status().max_size, 42);
2239    }
2240
2241    #[async_test]
2242    async fn test_cache_size() {
2243        let tmpdir_path = new_state_store_workspace();
2244        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2245
2246        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2247
2248        let conn = store.pool.get().await.unwrap();
2249        let cache_size =
2250            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2251
2252        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2253        // converted to kibibytes. Also, it must be a negative value because it
2254        // _is_ the size in kibibytes, not in page size.
2255        assert_eq!(cache_size, -(1500 / 1024));
2256    }
2257
2258    #[async_test]
2259    async fn test_journal_size_limit() {
2260        let tmpdir_path = new_state_store_workspace();
2261        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2262
2263        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2264
2265        let conn = store.pool.get().await.unwrap();
2266        let journal_size_limit = conn
2267            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2268            .await
2269            .unwrap();
2270
2271        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2272        // bytes in SQLite.
2273        assert_eq!(journal_size_limit, 1500);
2274    }
2275
2276    statestore_integration_tests!();
2277}
2278
2279#[cfg(test)]
2280mod migration_tests {
2281    use std::{
2282        path::{Path, PathBuf},
2283        sync::{
2284            atomic::{AtomicU32, Ordering::SeqCst},
2285            Arc,
2286        },
2287    };
2288
2289    use as_variant::as_variant;
2290    use deadpool_sqlite::Runtime;
2291    use matrix_sdk_base::{
2292        media::{MediaFormat, MediaRequestParameters},
2293        store::{
2294            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2295            SerializableEventContent,
2296        },
2297        sync::UnreadNotificationsCount,
2298        RoomState, StateStore,
2299    };
2300    use matrix_sdk_test::async_test;
2301    use once_cell::sync::Lazy;
2302    use ruma::{
2303        events::{
2304            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2305            StateEventType,
2306        },
2307        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2308        RoomId, TransactionId, UserId,
2309    };
2310    use rusqlite::Transaction;
2311    use serde::{Deserialize, Serialize};
2312    use serde_json::json;
2313    use tempfile::{tempdir, TempDir};
2314    use tokio::fs;
2315
2316    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2317    use crate::{
2318        error::{Error, Result},
2319        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2320        OpenStoreError,
2321    };
2322
2323    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2324    static NUM: AtomicU32 = AtomicU32::new(0);
2325    const SECRET: &str = "secret";
2326
2327    fn new_path() -> PathBuf {
2328        let name = NUM.fetch_add(1, SeqCst).to_string();
2329        TMP_DIR.path().join(name)
2330    }
2331
2332    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2333        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2334
2335        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2336        // use default pool config
2337
2338        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2339        let conn = pool.get().await?;
2340
2341        init(&conn).await?;
2342
2343        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2344        let this = SqliteStateStore { store_cipher, pool };
2345        this.run_migrations(&conn, 1, Some(version)).await?;
2346
2347        Ok(this)
2348    }
2349
2350    fn room_info_v1_json(
2351        room_id: &RoomId,
2352        state: RoomState,
2353        name: Option<&str>,
2354        creator: Option<&UserId>,
2355    ) -> serde_json::Value {
2356        // Test with name set or not.
2357        let name_content = match name {
2358            Some(name) => json!({ "name": name }),
2359            None => json!({ "name": null }),
2360        };
2361        // Test with creator set or not.
2362        let create_content = match creator {
2363            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2364            None => RoomCreateEventContent::new_v11(),
2365        };
2366
2367        json!({
2368            "room_id": room_id,
2369            "room_type": state,
2370            "notification_counts": UnreadNotificationsCount::default(),
2371            "summary": {
2372                "heroes": [],
2373                "joined_member_count": 0,
2374                "invited_member_count": 0,
2375            },
2376            "members_synced": false,
2377            "base_info": {
2378                "dm_targets": [],
2379                "max_power_level": 100,
2380                "name": {
2381                    "Original": {
2382                        "content": name_content,
2383                    },
2384                },
2385                "create": {
2386                    "Original": {
2387                        "content": create_content,
2388                    }
2389                }
2390            },
2391        })
2392    }
2393
2394    #[async_test]
2395    pub async fn test_migrating_v1_to_v2() {
2396        let path = new_path();
2397        // Create and populate db.
2398        {
2399            let db = create_fake_db(&path, 1).await.unwrap();
2400            let conn = db.pool.get().await.unwrap();
2401
2402            let this = db.clone();
2403            conn.with_transaction(move |txn| {
2404                for i in 0..5 {
2405                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2406                    let (state, stripped) =
2407                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2408                    let info = room_info_v1_json(&room_id, state, None, None);
2409
2410                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2411                    let data = this.serialize_json(&info)?;
2412
2413                    txn.prepare_cached(
2414                        "INSERT INTO room_info (room_id, stripped, data)
2415                         VALUES (?, ?, ?)",
2416                    )?
2417                    .execute((room_id, stripped, data))?;
2418                }
2419
2420                Result::<_, Error>::Ok(())
2421            })
2422            .await
2423            .unwrap();
2424        }
2425
2426        // This transparently migrates to the latest version.
2427        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2428
2429        // Check all room infos are there.
2430        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2431    }
2432
2433    // Add a room in version 2 format of the state store.
2434    fn add_room_v2(
2435        this: &SqliteStateStore,
2436        txn: &Transaction<'_>,
2437        room_id: &RoomId,
2438        name: Option<&str>,
2439        create_creator: Option<&UserId>,
2440        create_sender: Option<&UserId>,
2441    ) -> Result<(), Error> {
2442        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2443
2444        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2445        let encoded_state =
2446            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2447        let data = this.serialize_json(&room_info_json)?;
2448
2449        txn.prepare_cached(
2450            "INSERT INTO room_info (room_id, state, data)
2451             VALUES (?, ?, ?)",
2452        )?
2453        .execute((encoded_room_id, encoded_state, data))?;
2454
2455        // Test with or without `m.room.create` event in the room state.
2456        let Some(create_sender) = create_sender else {
2457            return Ok(());
2458        };
2459
2460        let create_content = match create_creator {
2461            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2462            None => RoomCreateEventContent::new_v11(),
2463        };
2464
2465        let event_id = EventId::new(server_name!("dummy.local"));
2466        let create_event = json!({
2467            "content": create_content,
2468            "event_id": event_id,
2469            "sender": create_sender.to_owned(),
2470            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2471            "state_key": "",
2472            "type": "m.room.create",
2473            "unsigned": {},
2474        });
2475
2476        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2477        let encoded_event_type =
2478            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2479        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2480        let stripped = false;
2481        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2482        let data = this.serialize_json(&create_event)?;
2483
2484        txn.prepare_cached(
2485            "INSERT
2486             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2487             VALUES (?, ?, ?, ?, ?, ?)",
2488        )?
2489        .execute((
2490            encoded_room_id,
2491            encoded_event_type,
2492            encoded_state_key,
2493            stripped,
2494            encoded_event_id,
2495            data,
2496        ))?;
2497
2498        Ok(())
2499    }
2500
2501    #[async_test]
2502    pub async fn test_migrating_v2_to_v3() {
2503        let path = new_path();
2504
2505        // Room A: with name, creator and sender.
2506        let room_a_id = room_id!("!room_a:dummy.local");
2507        let room_a_name = "Room A";
2508        let room_a_creator = user_id!("@creator:dummy.local");
2509        // Use a different sender to check that sender is used over creator in
2510        // migration.
2511        let room_a_create_sender = user_id!("@sender:dummy.local");
2512
2513        // Room B: without name, creator and sender.
2514        let room_b_id = room_id!("!room_b:dummy.local");
2515
2516        // Room C: only with sender.
2517        let room_c_id = room_id!("!room_c:dummy.local");
2518        let room_c_create_sender = user_id!("@creator:dummy.local");
2519
2520        // Create and populate db.
2521        {
2522            let db = create_fake_db(&path, 2).await.unwrap();
2523            let conn = db.pool.get().await.unwrap();
2524
2525            let this = db.clone();
2526            conn.with_transaction(move |txn| {
2527                add_room_v2(
2528                    &this,
2529                    txn,
2530                    room_a_id,
2531                    Some(room_a_name),
2532                    Some(room_a_creator),
2533                    Some(room_a_create_sender),
2534                )?;
2535                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2536                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2537
2538                Result::<_, Error>::Ok(())
2539            })
2540            .await
2541            .unwrap();
2542        }
2543
2544        // This transparently migrates to the latest version.
2545        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2546
2547        // Check all room infos are there.
2548        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2549        assert_eq!(room_infos.len(), 3);
2550
2551        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2552        assert_eq!(room_a.name(), Some(room_a_name));
2553        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2554
2555        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2556        assert_eq!(room_b.name(), None);
2557        assert_eq!(room_b.creators(), None);
2558
2559        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2560        assert_eq!(room_c.name(), None);
2561        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2562    }
2563
2564    #[async_test]
2565    pub async fn test_migrating_v7_to_v9() {
2566        let path = new_path();
2567
2568        let room_id = room_id!("!room_a:dummy.local");
2569        let wedged_event_transaction_id = TransactionId::new();
2570        let local_event_transaction_id = TransactionId::new();
2571
2572        // Create and populate db.
2573        {
2574            let db = create_fake_db(&path, 7).await.unwrap();
2575            let conn = db.pool.get().await.unwrap();
2576
2577            let wedge_tx = wedged_event_transaction_id.clone();
2578            let local_tx = local_event_transaction_id.clone();
2579
2580            conn.with_transaction(move |txn| {
2581                add_dependent_send_queue_event_v7(
2582                    &db,
2583                    txn,
2584                    room_id,
2585                    &local_tx,
2586                    ChildTransactionId::new(),
2587                    DependentQueuedRequestKind::RedactEvent,
2588                )?;
2589                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2590                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2591                Result::<_, Error>::Ok(())
2592            })
2593            .await
2594            .unwrap();
2595        }
2596
2597        // This transparently migrates to the latest version, which clears up all
2598        // requests and dependent requests.
2599        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2600
2601        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2602        assert!(requests.is_empty());
2603
2604        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2605        assert!(dependent_requests.is_empty());
2606    }
2607
2608    fn add_send_queue_event_v7(
2609        this: &SqliteStateStore,
2610        txn: &Transaction<'_>,
2611        transaction_id: &TransactionId,
2612        room_id: &RoomId,
2613        is_wedged: bool,
2614    ) -> Result<(), Error> {
2615        let content =
2616            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2617
2618        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2619        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2620
2621        let content = this.serialize_json(&content)?;
2622
2623        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2624            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2625
2626        Ok(())
2627    }
2628
2629    fn add_dependent_send_queue_event_v7(
2630        this: &SqliteStateStore,
2631        txn: &Transaction<'_>,
2632        room_id: &RoomId,
2633        parent_txn_id: &TransactionId,
2634        own_txn_id: ChildTransactionId,
2635        content: DependentQueuedRequestKind,
2636    ) -> Result<(), Error> {
2637        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2638
2639        let parent_txn_id = parent_txn_id.to_string();
2640        let own_txn_id = own_txn_id.to_string();
2641        let content = this.serialize_json(&content)?;
2642
2643        txn.prepare_cached(
2644            "INSERT INTO dependent_send_queue_events
2645                         (room_id, parent_transaction_id, own_transaction_id, content)
2646                       VALUES (?, ?, ?, ?)",
2647        )?
2648        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2649
2650        Ok(())
2651    }
2652
2653    #[derive(Clone, Debug, Serialize, Deserialize)]
2654    pub enum LegacyDependentQueuedRequestKind {
2655        UploadFileWithThumbnail {
2656            content_type: String,
2657            cache_key: MediaRequestParameters,
2658            related_to: OwnedTransactionId,
2659        },
2660    }
2661
2662    #[async_test]
2663    pub async fn test_dependent_queued_request_variant_renaming() {
2664        let path = new_path();
2665        let db = create_fake_db(&path, 7).await.unwrap();
2666
2667        let cache_key = MediaRequestParameters {
2668            format: MediaFormat::File,
2669            source: MediaSource::Plain("https://server.local/foobar".into()),
2670        };
2671        let related_to = TransactionId::new();
2672        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2673            content_type: "image/png".to_owned(),
2674            cache_key,
2675            related_to: related_to.clone(),
2676        };
2677
2678        let data = db
2679            .serialize_json(&request)
2680            .expect("should be able to serialize legacy dependent request");
2681        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2682            "should be able to deserialize dependent request from legacy dependent request",
2683        );
2684
2685        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2686            assert_eq!(de_related_to, related_to);
2687        });
2688    }
2689}