1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 RoomLoadSettings, SentRequestKey, ThreadSubscription,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
66}
67
68pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
70
71const DATABASE_VERSION: u8 = 13;
77
78#[derive(Clone)]
80pub struct SqliteStateStore {
81 store_cipher: Option<Arc<StoreCipher>>,
82 pool: SqlitePool,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for SqliteStateStore {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
89 }
90}
91
92impl SqliteStateStore {
93 pub async fn open(
96 path: impl AsRef<Path>,
97 passphrase: Option<&str>,
98 ) -> Result<Self, OpenStoreError> {
99 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
100 }
101
102 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
104 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
105
106 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
107
108 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
109 config.pool = Some(pool_config);
110
111 let pool = config.create_pool(Runtime::Tokio1)?;
112
113 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
114 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
115
116 Ok(this)
117 }
118
119 pub async fn open_with_pool(
122 pool: SqlitePool,
123 passphrase: Option<&str>,
124 ) -> Result<Self, OpenStoreError> {
125 let conn = pool.get().await?;
126
127 let mut version = conn.db_version().await?;
128
129 if version == 0 {
130 init(&conn).await?;
131 version = 1;
132 }
133
134 let store_cipher = match passphrase {
135 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
136 None => None,
137 };
138 let this = Self { store_cipher, pool };
139 this.run_migrations(&conn, version, None).await?;
140
141 Ok(this)
142 }
143
144 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
149 let to = to.unwrap_or(DATABASE_VERSION);
150
151 if from < to {
152 debug!(version = from, new_version = to, "Upgrading database");
153 } else {
154 return Ok(());
155 }
156
157 if from < 2 && to >= 2 {
158 let this = self.clone();
159 conn.with_transaction(move |txn| {
160 txn.execute_batch(include_str!(
162 "../migrations/state_store/002_a_create_new_room_info.sql"
163 ))?;
164
165 for data in txn
167 .prepare("SELECT data FROM room_info")?
168 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
169 {
170 let data = data?;
171 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
172
173 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
174 let state = this
175 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
176 txn.prepare_cached(
177 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
178 VALUES (?, ?, ?)",
179 )?
180 .execute((room_id, state, data))?;
181 }
182
183 txn.execute_batch(include_str!(
185 "../migrations/state_store/002_b_replace_room_info.sql"
186 ))?;
187
188 txn.set_db_version(2)?;
189 Result::<_, Error>::Ok(())
190 })
191 .await?;
192 }
193
194 if from < 3 && to >= 3 {
196 let this = self.clone();
197 conn.with_transaction(move |txn| {
198 for data in txn
200 .prepare("SELECT data FROM room_info")?
201 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
202 {
203 let data = data?;
204 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
205
206 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
208 let event_type =
209 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
210 let create_res = txn
211 .prepare(
212 "SELECT stripped, data FROM state_event
213 WHERE room_id = ? AND event_type = ?",
214 )?
215 .query_row([room_id, event_type], |row| {
216 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
217 })
218 .optional()?;
219
220 let create = create_res.and_then(|(stripped, data)| {
221 let create = if stripped {
222 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
223 this.deserialize_json(&data).ok()?,
224 )
225 } else {
226 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
227 };
228 Some(create)
229 });
230
231 let migrated_room_info = room_info_v1.migrate(create.as_ref());
232
233 let data = this.serialize_json(&migrated_room_info)?;
234 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
235 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
236 .execute((data, room_id))?;
237 }
238
239 txn.set_db_version(3)?;
240 Result::<_, Error>::Ok(())
241 })
242 .await?;
243 }
244
245 if from < 4 && to >= 4 {
246 conn.with_transaction(move |txn| {
247 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
249 txn.set_db_version(4)
250 })
251 .await?;
252 }
253
254 if from < 5 && to >= 5 {
255 conn.with_transaction(move |txn| {
256 txn.execute_batch(include_str!(
258 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
259 ))?;
260 txn.set_db_version(4)
261 })
262 .await?;
263 }
264
265 if from < 6 && to >= 6 {
266 conn.with_transaction(move |txn| {
267 txn.execute_batch(include_str!(
269 "../migrations/state_store/005_send_queue_dependent_events.sql"
270 ))?;
271 txn.set_db_version(6)
272 })
273 .await?;
274 }
275
276 if from < 7 && to >= 7 {
277 conn.with_transaction(move |txn| {
278 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
280 txn.set_db_version(7)
281 })
282 .await?;
283 }
284
285 if from < 8 && to >= 8 {
286 let error = QueueWedgeError::GenericApiError {
288 msg: "local echo failed to send in a previous session".into(),
289 };
290 let default_err = self.serialize_value(&error)?;
291
292 conn.with_transaction(move |txn| {
293 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
295
296 for wedged_entries in txn
299 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
300 .query_map((), |row| {
301 Ok(
302 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
303 )
304 })? {
305
306 let (room_id, transaction_id) = wedged_entries?;
307
308 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
309 .execute((default_err.clone(), room_id, transaction_id))?;
310 }
311
312
313 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
315
316 txn.set_db_version(8)
317 })
318 .await?;
319 }
320
321 if from < 9 && to >= 9 {
322 conn.with_transaction(move |txn| {
323 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
325 txn.set_db_version(9)
326 })
327 .await?;
328 }
329
330 if from < 10 && to >= 10 {
331 conn.with_transaction(move |txn| {
332 txn.execute_batch(include_str!(
334 "../migrations/state_store/009_send_queue_priority.sql"
335 ))?;
336 txn.set_db_version(10)
337 })
338 .await?;
339 }
340
341 if from < 11 && to >= 11 {
342 conn.with_transaction(move |txn| {
343 txn.execute_batch(include_str!(
345 "../migrations/state_store/010_send_queue_enqueue_time.sql"
346 ))?;
347 txn.set_db_version(11)
348 })
349 .await?;
350 }
351
352 if from < 12 && to >= 12 {
353 conn.vacuum().await?;
357 conn.set_kv("version", vec![12]).await?;
358 }
359
360 if from < 13 && to >= 13 {
361 conn.with_transaction(move |txn| {
362 txn.execute_batch(include_str!(
364 "../migrations/state_store/011_thread_subscriptions.sql"
365 ))?;
366 txn.set_db_version(13)
367 })
368 .await?;
369 }
370
371 Ok(())
372 }
373
374 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
375 let key_s = match key {
376 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
377 StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
378 StateStoreDataKey::Filter(f) => {
379 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
380 }
381 StateStoreDataKey::UserAvatarUrl(u) => {
382 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
383 }
384 StateStoreDataKey::RecentlyVisitedRooms(b) => {
385 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
386 }
387 StateStoreDataKey::UtdHookManagerData => {
388 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
389 }
390 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
391 if let Some(thread_root) = thread_root {
392 Cow::Owned(format!(
393 "{}:{room_id}:{thread_root}",
394 StateStoreDataKey::COMPOSER_DRAFT
395 ))
396 } else {
397 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
398 }
399 }
400 StateStoreDataKey::SeenKnockRequests(room_id) => {
401 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
402 }
403 };
404
405 self.encode_key(keys::KV_BLOB, &*key_s)
406 }
407
408 fn encode_presence_key(&self, user_id: &UserId) -> Key {
409 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
410 }
411
412 fn encode_custom_key(&self, key: &[u8]) -> Key {
413 let mut full_key = b"custom:".to_vec();
414 full_key.extend(key);
415 self.encode_key(keys::KV_BLOB, full_key)
416 }
417
418 async fn acquire(&self) -> Result<SqliteAsyncConn> {
419 Ok(self.pool.get().await?)
420 }
421
422 fn remove_maybe_stripped_room_data(
423 &self,
424 txn: &Transaction<'_>,
425 room_id: &RoomId,
426 stripped: bool,
427 ) -> rusqlite::Result<()> {
428 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
429 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
430
431 let member_room_id = self.encode_key(keys::MEMBER, room_id);
432 txn.remove_room_members(&member_room_id, Some(stripped))
433 }
434}
435
436impl EncryptableStore for SqliteStateStore {
437 fn get_cypher(&self) -> Option<&StoreCipher> {
438 self.store_cipher.as_deref()
439 }
440}
441
442async fn init(conn: &SqliteAsyncConn) -> Result<()> {
444 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
447 conn.with_transaction(|txn| {
448 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
449 txn.set_db_version(1)?;
450
451 Ok(())
452 })
453 .await
454}
455
456trait SqliteConnectionStateStoreExt {
457 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
458
459 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
460
461 fn set_room_account_data(
462 &self,
463 room_id: &[u8],
464 event_type: &[u8],
465 data: &[u8],
466 ) -> rusqlite::Result<()>;
467 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
468
469 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
470 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
471 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
472
473 fn set_state_event(
474 &self,
475 room_id: &[u8],
476 event_type: &[u8],
477 state_key: &[u8],
478 stripped: bool,
479 event_id: Option<&[u8]>,
480 data: &[u8],
481 ) -> rusqlite::Result<()>;
482 fn get_state_event_by_id(
483 &self,
484 room_id: &[u8],
485 event_id: &[u8],
486 ) -> rusqlite::Result<Option<Vec<u8>>>;
487 fn remove_room_state_events(
488 &self,
489 room_id: &[u8],
490 stripped: Option<bool>,
491 ) -> rusqlite::Result<()>;
492
493 fn set_member(
494 &self,
495 room_id: &[u8],
496 user_id: &[u8],
497 membership: &[u8],
498 stripped: bool,
499 data: &[u8],
500 ) -> rusqlite::Result<()>;
501 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
502
503 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
504 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
505 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
506
507 fn set_receipt(
508 &self,
509 room_id: &[u8],
510 user_id: &[u8],
511 receipt_type: &[u8],
512 thread_id: &[u8],
513 event_id: &[u8],
514 data: &[u8],
515 ) -> rusqlite::Result<()>;
516 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
517
518 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
519 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
520 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
521 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
522 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
523}
524
525impl SqliteConnectionStateStoreExt for rusqlite::Connection {
526 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
527 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
528 Ok(())
529 }
530
531 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
532 self.prepare_cached(
533 "INSERT OR REPLACE INTO global_account_data (event_type, data)
534 VALUES (?, ?)",
535 )?
536 .execute((event_type, data))?;
537 Ok(())
538 }
539
540 fn set_room_account_data(
541 &self,
542 room_id: &[u8],
543 event_type: &[u8],
544 data: &[u8],
545 ) -> rusqlite::Result<()> {
546 self.prepare_cached(
547 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
548 VALUES (?, ?, ?)",
549 )?
550 .execute((room_id, event_type, data))?;
551 Ok(())
552 }
553
554 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
555 self.prepare(
556 "DELETE FROM room_account_data
557 WHERE room_id = ?",
558 )?
559 .execute((room_id,))?;
560 Ok(())
561 }
562
563 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
564 self.prepare_cached(
565 "INSERT OR REPLACE INTO room_info (room_id, state, data)
566 VALUES (?, ?, ?)",
567 )?
568 .execute((room_id, state, data))?;
569 Ok(())
570 }
571
572 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
573 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
574 .optional()
575 }
576
577 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
579 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
580 Ok(())
581 }
582
583 fn set_state_event(
584 &self,
585 room_id: &[u8],
586 event_type: &[u8],
587 state_key: &[u8],
588 stripped: bool,
589 event_id: Option<&[u8]>,
590 data: &[u8],
591 ) -> rusqlite::Result<()> {
592 self.prepare_cached(
593 "INSERT OR REPLACE
594 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
595 VALUES (?, ?, ?, ?, ?, ?)",
596 )?
597 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
598 Ok(())
599 }
600
601 fn get_state_event_by_id(
602 &self,
603 room_id: &[u8],
604 event_id: &[u8],
605 ) -> rusqlite::Result<Option<Vec<u8>>> {
606 self.query_row(
607 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
608 (room_id, event_id),
609 |row| row.get(0),
610 )
611 .optional()
612 }
613
614 fn remove_room_state_events(
620 &self,
621 room_id: &[u8],
622 stripped: Option<bool>,
623 ) -> rusqlite::Result<()> {
624 if let Some(stripped) = stripped {
625 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
626 .execute((room_id, stripped))?;
627 } else {
628 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
629 .execute((room_id,))?;
630 }
631 Ok(())
632 }
633
634 fn set_member(
635 &self,
636 room_id: &[u8],
637 user_id: &[u8],
638 membership: &[u8],
639 stripped: bool,
640 data: &[u8],
641 ) -> rusqlite::Result<()> {
642 self.prepare_cached(
643 "INSERT OR REPLACE
644 INTO member (room_id, user_id, membership, stripped, data)
645 VALUES (?, ?, ?, ?, ?)",
646 )?
647 .execute((room_id, user_id, membership, stripped, data))?;
648 Ok(())
649 }
650
651 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
656 if let Some(stripped) = stripped {
657 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
658 .execute((room_id, stripped))?;
659 } else {
660 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
661 }
662 Ok(())
663 }
664
665 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
666 self.prepare_cached(
667 "INSERT OR REPLACE
668 INTO profile (room_id, user_id, data)
669 VALUES (?, ?, ?)",
670 )?
671 .execute((room_id, user_id, data))?;
672 Ok(())
673 }
674
675 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
676 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
677 Ok(())
678 }
679
680 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
681 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
682 .execute((room_id, user_id))?;
683 Ok(())
684 }
685
686 fn set_receipt(
687 &self,
688 room_id: &[u8],
689 user_id: &[u8],
690 receipt_type: &[u8],
691 thread: &[u8],
692 event_id: &[u8],
693 data: &[u8],
694 ) -> rusqlite::Result<()> {
695 self.prepare_cached(
696 "INSERT OR REPLACE
697 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
698 VALUES (?, ?, ?, ?, ?, ?)",
699 )?
700 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
701 Ok(())
702 }
703
704 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
705 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
706 Ok(())
707 }
708
709 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
710 self.prepare_cached(
711 "INSERT OR REPLACE
712 INTO display_name (room_id, name, data)
713 VALUES (?, ?, ?)",
714 )?
715 .execute((room_id, name, data))?;
716 Ok(())
717 }
718
719 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
720 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
721 .execute((room_id, name))?;
722 Ok(())
723 }
724
725 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
726 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
727 Ok(())
728 }
729
730 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
731 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
732 Ok(())
733 }
734
735 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
736 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
737 .execute((room_id,))?;
738 Ok(())
739 }
740}
741
742#[async_trait]
743trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
744 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
745 Ok(self
746 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
747 .await
748 .optional()?)
749 }
750
751 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
752 let keys_length = keys.len();
753
754 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
755 let sql_params = repeat_vars(keys.len());
756 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
757
758 let params = rusqlite::params_from_iter(keys);
759
760 Ok(txn
761 .prepare(&sql)?
762 .query(params)?
763 .mapped(|row| row.get(0))
764 .collect::<Result<_, _>>()?)
765 })
766 .await
767 }
768
769 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
770
771 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
772 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
773 Ok(())
774 }
775
776 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
777 Ok(match room_id {
778 None => {
779 self.prepare("SELECT data FROM room_info", move |mut stmt| {
780 stmt.query_map((), |row| row.get(0))?.collect()
781 })
782 .await?
783 }
784
785 Some(room_id) => {
786 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
787 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
788 })
789 .await?
790 }
791 })
792 }
793
794 async fn get_maybe_stripped_state_events_for_keys(
795 &self,
796 room_id: Key,
797 event_type: Key,
798 state_keys: Vec<Key>,
799 ) -> Result<Vec<(bool, Vec<u8>)>> {
800 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
801 let sql_params = repeat_vars(state_keys.len());
802 let sql = format!(
803 "SELECT stripped, data FROM state_event
804 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
805 );
806
807 let params = rusqlite::params_from_iter(
808 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
809 );
810
811 Ok(txn
812 .prepare(&sql)?
813 .query(params)?
814 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
815 .collect::<Result<_, _>>()?)
816 })
817 .await
818 }
819
820 async fn get_maybe_stripped_state_events(
821 &self,
822 room_id: Key,
823 event_type: Key,
824 ) -> Result<Vec<(bool, Vec<u8>)>> {
825 Ok(self
826 .prepare(
827 "SELECT stripped, data FROM state_event
828 WHERE room_id = ? AND event_type = ?",
829 |mut stmt| {
830 stmt.query((room_id, event_type))?
831 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
832 .collect()
833 },
834 )
835 .await?)
836 }
837
838 async fn get_profiles(
839 &self,
840 room_id: Key,
841 user_ids: Vec<Key>,
842 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
843 let user_ids_length = user_ids.len();
844
845 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
846 let sql_params = repeat_vars(user_ids.len());
847 let sql = format!(
848 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
849 );
850
851 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
852
853 Ok(txn
854 .prepare(&sql)?
855 .query(params)?
856 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
857 .collect::<Result<_, _>>()?)
858 })
859 .await
860 }
861
862 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
863 let res = if memberships.is_empty() {
864 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
865 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
866 })
867 .await?
868 } else {
869 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
870 let sql_params = repeat_vars(memberships.len());
871 let sql = format!(
872 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
873 );
874
875 let params =
876 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
877
878 Ok(txn
879 .prepare(&sql)?
880 .query(params)?
881 .mapped(|row| row.get(0))
882 .collect::<Result<_, _>>()?)
883 })
884 .await?
885 };
886
887 Ok(res)
888 }
889
890 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
891 Ok(self
892 .query_row(
893 "SELECT data FROM global_account_data WHERE event_type = ?",
894 (event_type,),
895 |row| row.get(0),
896 )
897 .await
898 .optional()?)
899 }
900
901 async fn get_room_account_data(
902 &self,
903 room_id: Key,
904 event_type: Key,
905 ) -> Result<Option<Vec<u8>>> {
906 Ok(self
907 .query_row(
908 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
909 (room_id, event_type),
910 |row| row.get(0),
911 )
912 .await
913 .optional()?)
914 }
915
916 async fn get_display_names(
917 &self,
918 room_id: Key,
919 names: Vec<Key>,
920 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
921 let names_length = names.len();
922
923 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
924 let sql_params = repeat_vars(names.len());
925 let sql = format!(
926 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
927 );
928
929 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
930
931 Ok(txn
932 .prepare(&sql)?
933 .query(params)?
934 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
935 .collect::<Result<_, _>>()?)
936 })
937 .await
938 }
939
940 async fn get_user_receipt(
941 &self,
942 room_id: Key,
943 receipt_type: Key,
944 thread: Key,
945 user_id: Key,
946 ) -> Result<Option<Vec<u8>>> {
947 Ok(self
948 .query_row(
949 "SELECT data FROM receipt
950 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
951 (room_id, receipt_type, thread, user_id),
952 |row| row.get(0),
953 )
954 .await
955 .optional()?)
956 }
957
958 async fn get_event_receipts(
959 &self,
960 room_id: Key,
961 receipt_type: Key,
962 thread: Key,
963 event_id: Key,
964 ) -> Result<Vec<Vec<u8>>> {
965 Ok(self
966 .prepare(
967 "SELECT data FROM receipt
968 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
969 |mut stmt| {
970 stmt.query((room_id, receipt_type, thread, event_id))?
971 .mapped(|row| row.get(0))
972 .collect()
973 },
974 )
975 .await?)
976 }
977}
978
979#[async_trait]
980impl SqliteObjectStateStoreExt for SqliteAsyncConn {
981 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
982 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
983 }
984}
985
986#[async_trait]
987impl StateStore for SqliteStateStore {
988 type Error = Error;
989
990 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
991 self.acquire()
992 .await?
993 .get_kv_blob(self.encode_state_store_data_key(key))
994 .await?
995 .map(|data| {
996 Ok(match key {
997 StateStoreDataKey::SyncToken => {
998 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
999 }
1000 StateStoreDataKey::ServerInfo => {
1001 StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1002 }
1003 StateStoreDataKey::Filter(_) => {
1004 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1005 }
1006 StateStoreDataKey::UserAvatarUrl(_) => {
1007 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1008 }
1009 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1010 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1011 }
1012 StateStoreDataKey::UtdHookManagerData => {
1013 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1014 }
1015 StateStoreDataKey::ComposerDraft(_, _) => {
1016 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1017 }
1018 StateStoreDataKey::SeenKnockRequests(_) => {
1019 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1020 }
1021 })
1022 })
1023 .transpose()
1024 }
1025
1026 async fn set_kv_data(
1027 &self,
1028 key: StateStoreDataKey<'_>,
1029 value: StateStoreDataValue,
1030 ) -> Result<()> {
1031 let serialized_value = match key {
1032 StateStoreDataKey::SyncToken => self.serialize_value(
1033 &value.into_sync_token().expect("Session data not a sync token"),
1034 )?,
1035 StateStoreDataKey::ServerInfo => self.serialize_value(
1036 &value.into_server_info().expect("Session data not containing server info"),
1037 )?,
1038 StateStoreDataKey::Filter(_) => {
1039 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1040 }
1041 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1042 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1043 )?,
1044 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1045 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1046 )?,
1047 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1048 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1049 )?,
1050 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1051 &value.into_composer_draft().expect("Session data not a composer draft"),
1052 )?,
1053 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1054 &value
1055 .into_seen_knock_requests()
1056 .expect("Session data is not a set of seen knock request ids"),
1057 )?,
1058 };
1059
1060 self.acquire()
1061 .await?
1062 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1063 .await
1064 }
1065
1066 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1067 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1068 }
1069
1070 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1071 let changes = changes.to_owned();
1072 let this = self.clone();
1073 self.acquire()
1074 .await?
1075 .with_transaction(move |txn| {
1076 let StateChanges {
1077 sync_token,
1078 account_data,
1079 presence,
1080 profiles,
1081 profiles_to_delete,
1082 state,
1083 room_account_data,
1084 room_infos,
1085 receipts,
1086 redactions,
1087 stripped_state,
1088 ambiguity_maps,
1089 } = changes;
1090
1091 if let Some(sync_token) = sync_token {
1092 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1093 let value = this.serialize_value(&sync_token)?;
1094 txn.set_kv_blob(&key, &value)?;
1095 }
1096
1097 for (event_type, event) in account_data {
1098 let event_type =
1099 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1100 let data = this.serialize_json(&event)?;
1101 txn.set_global_account_data(&event_type, &data)?;
1102 }
1103
1104 for (room_id, events) in room_account_data {
1105 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1106 for (event_type, event) in events {
1107 let event_type =
1108 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1109 let data = this.serialize_json(&event)?;
1110 txn.set_room_account_data(&room_id, &event_type, &data)?;
1111 }
1112 }
1113
1114 for (user_id, event) in presence {
1115 let key = this.encode_presence_key(&user_id);
1116 let value = this.serialize_json(&event)?;
1117 txn.set_kv_blob(&key, &value)?;
1118 }
1119
1120 for (room_id, room_info) in room_infos {
1121 let stripped = room_info.state() == RoomState::Invited;
1122 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1124
1125 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1126 let state = this
1127 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1128 let data = this.serialize_json(&room_info)?;
1129 txn.set_room_info(&room_id, &state, &data)?;
1130 }
1131
1132 for (room_id, user_ids) in profiles_to_delete {
1133 let room_id = this.encode_key(keys::PROFILE, room_id);
1134 for user_id in user_ids {
1135 let user_id = this.encode_key(keys::PROFILE, user_id);
1136 txn.remove_room_profile(&room_id, &user_id)?;
1137 }
1138 }
1139
1140 for (room_id, state_event_types) in state {
1141 let profiles = profiles.get(&room_id);
1142 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1143
1144 for (event_type, state_events) in state_event_types {
1145 let encoded_event_type =
1146 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1147
1148 for (state_key, raw_state_event) in state_events {
1149 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1150 let data = this.serialize_json(&raw_state_event)?;
1151
1152 let event_id: Option<String> =
1153 raw_state_event.get_field("event_id").ok().flatten();
1154 let encoded_event_id =
1155 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1156
1157 txn.set_state_event(
1158 &encoded_room_id,
1159 &encoded_event_type,
1160 &encoded_state_key,
1161 false,
1162 encoded_event_id.as_deref(),
1163 &data,
1164 )?;
1165
1166 if event_type == StateEventType::RoomMember {
1167 let member_event = match raw_state_event
1168 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1169 {
1170 Ok(ev) => ev,
1171 Err(e) => {
1172 debug!(event_id, "Failed to deserialize member event: {e}");
1173 continue;
1174 }
1175 };
1176
1177 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1178 let user_id = this.encode_key(keys::MEMBER, &state_key);
1179 let membership = this
1180 .encode_key(keys::MEMBER, member_event.membership().as_str());
1181 let data = this.serialize_value(&state_key)?;
1182
1183 txn.set_member(
1184 &encoded_room_id,
1185 &user_id,
1186 &membership,
1187 false,
1188 &data,
1189 )?;
1190
1191 if let Some(profile) =
1192 profiles.and_then(|p| p.get(member_event.state_key()))
1193 {
1194 let room_id = this.encode_key(keys::PROFILE, &room_id);
1195 let user_id = this.encode_key(keys::PROFILE, &state_key);
1196 let data = this.serialize_json(&profile)?;
1197 txn.set_profile(&room_id, &user_id, &data)?;
1198 }
1199 }
1200 }
1201 }
1202 }
1203
1204 for (room_id, stripped_state_event_types) in stripped_state {
1205 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1206
1207 for (event_type, stripped_state_events) in stripped_state_event_types {
1208 let encoded_event_type =
1209 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1210
1211 for (state_key, raw_stripped_state_event) in stripped_state_events {
1212 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1213 let data = this.serialize_json(&raw_stripped_state_event)?;
1214 txn.set_state_event(
1215 &encoded_room_id,
1216 &encoded_event_type,
1217 &encoded_state_key,
1218 true,
1219 None,
1220 &data,
1221 )?;
1222
1223 if event_type == StateEventType::RoomMember {
1224 let member_event = match raw_stripped_state_event
1225 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1226 ) {
1227 Ok(ev) => ev,
1228 Err(e) => {
1229 debug!("Failed to deserialize stripped member event: {e}");
1230 continue;
1231 }
1232 };
1233
1234 let room_id = this.encode_key(keys::MEMBER, &room_id);
1235 let user_id = this.encode_key(keys::MEMBER, &state_key);
1236 let membership = this.encode_key(
1237 keys::MEMBER,
1238 member_event.content.membership.as_str(),
1239 );
1240 let data = this.serialize_value(&state_key)?;
1241
1242 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1243 }
1244 }
1245 }
1246 }
1247
1248 for (room_id, receipt_event) in receipts {
1249 let room_id = this.encode_key(keys::RECEIPT, room_id);
1250
1251 for (event_id, receipt_types) in receipt_event {
1252 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1253
1254 for (receipt_type, receipt_users) in receipt_types {
1255 let receipt_type =
1256 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1257
1258 for (user_id, receipt) in receipt_users {
1259 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1260 let thread = this.encode_key(
1263 keys::RECEIPT,
1264 rmp_serde::to_vec_named(&receipt.thread)?,
1265 );
1266 let data = this.serialize_json(&ReceiptData {
1267 receipt,
1268 event_id: event_id.clone(),
1269 user_id,
1270 })?;
1271
1272 txn.set_receipt(
1273 &room_id,
1274 &encoded_user_id,
1275 &receipt_type,
1276 &thread,
1277 &encoded_event_id,
1278 &data,
1279 )?;
1280 }
1281 }
1282 }
1283 }
1284
1285 for (room_id, redactions) in redactions {
1286 let make_redaction_rules = || {
1287 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1288 txn.get_room_info(&encoded_room_id)
1289 .ok()
1290 .flatten()
1291 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1292 .map(|info| info.room_version_rules_or_default())
1293 .unwrap_or_else(|| {
1294 warn!(
1295 ?room_id,
1296 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1297 );
1298 ROOM_VERSION_RULES_FALLBACK
1299 }).redaction
1300 };
1301
1302 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1303 let mut redaction_rules = None;
1304
1305 for (event_id, redaction) in redactions {
1306 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1307
1308 if let Some(Ok(raw_event)) = txn
1309 .get_state_event_by_id(&encoded_room_id, &event_id)?
1310 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1311 {
1312 let event = raw_event.deserialize()?;
1313 let redacted = redact(
1314 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1315 redaction_rules.get_or_insert_with(make_redaction_rules),
1316 Some(RedactedBecause::from_raw_event(&redaction)?),
1317 )
1318 .map_err(Error::Redaction)?;
1319 let data = this.serialize_json(&redacted)?;
1320
1321 let event_type =
1322 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1323 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1324
1325 txn.set_state_event(
1326 &encoded_room_id,
1327 &event_type,
1328 &state_key,
1329 false,
1330 Some(&event_id),
1331 &data,
1332 )?;
1333 }
1334 }
1335 }
1336
1337 for (room_id, display_names) in ambiguity_maps {
1338 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1339
1340 for (name, user_ids) in display_names {
1341 let encoded_name = this.encode_key(
1342 keys::DISPLAY_NAME,
1343 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1344 );
1345 let data = this.serialize_json(&user_ids)?;
1346
1347 if user_ids.is_empty() {
1348 txn.remove_display_name(&room_id, &encoded_name)?;
1349
1350 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1365 txn.remove_display_name(&room_id, &raw_name)?;
1366 } else {
1367 txn.set_display_name(&room_id, &encoded_name, &data)?;
1369 }
1370 }
1371 }
1372
1373 Ok::<_, Error>(())
1374 })
1375 .await?;
1376
1377 Ok(())
1378 }
1379
1380 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1381 self.acquire()
1382 .await?
1383 .get_kv_blob(self.encode_presence_key(user_id))
1384 .await?
1385 .map(|data| self.deserialize_json(&data))
1386 .transpose()
1387 }
1388
1389 async fn get_presence_events(
1390 &self,
1391 user_ids: &[OwnedUserId],
1392 ) -> Result<Vec<Raw<PresenceEvent>>> {
1393 if user_ids.is_empty() {
1394 return Ok(Vec::new());
1395 }
1396
1397 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1398 self.acquire()
1399 .await?
1400 .get_kv_blobs(user_ids)
1401 .await?
1402 .into_iter()
1403 .map(|data| self.deserialize_json(&data))
1404 .collect()
1405 }
1406
1407 async fn get_state_event(
1408 &self,
1409 room_id: &RoomId,
1410 event_type: StateEventType,
1411 state_key: &str,
1412 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1413 Ok(self
1414 .get_state_events_for_keys(room_id, event_type, &[state_key])
1415 .await?
1416 .into_iter()
1417 .next())
1418 }
1419
1420 async fn get_state_events(
1421 &self,
1422 room_id: &RoomId,
1423 event_type: StateEventType,
1424 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1425 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1426 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1427 self.acquire()
1428 .await?
1429 .get_maybe_stripped_state_events(room_id, event_type)
1430 .await?
1431 .into_iter()
1432 .map(|(stripped, data)| {
1433 let ev = if stripped {
1434 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1435 } else {
1436 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1437 };
1438
1439 Ok(ev)
1440 })
1441 .collect()
1442 }
1443
1444 async fn get_state_events_for_keys(
1445 &self,
1446 room_id: &RoomId,
1447 event_type: StateEventType,
1448 state_keys: &[&str],
1449 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1450 if state_keys.is_empty() {
1451 return Ok(Vec::new());
1452 }
1453
1454 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1455 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1456 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1457 self.acquire()
1458 .await?
1459 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1460 .await?
1461 .into_iter()
1462 .map(|(stripped, data)| {
1463 let ev = if stripped {
1464 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1465 } else {
1466 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1467 };
1468
1469 Ok(ev)
1470 })
1471 .collect()
1472 }
1473
1474 async fn get_profile(
1475 &self,
1476 room_id: &RoomId,
1477 user_id: &UserId,
1478 ) -> Result<Option<MinimalRoomMemberEvent>> {
1479 let room_id = self.encode_key(keys::PROFILE, room_id);
1480 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1481
1482 self.acquire()
1483 .await?
1484 .get_profiles(room_id, user_ids)
1485 .await?
1486 .into_iter()
1487 .next()
1488 .map(|(_, data)| self.deserialize_json(&data))
1489 .transpose()
1490 }
1491
1492 async fn get_profiles<'a>(
1493 &self,
1494 room_id: &RoomId,
1495 user_ids: &'a [OwnedUserId],
1496 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1497 if user_ids.is_empty() {
1498 return Ok(BTreeMap::new());
1499 }
1500
1501 let room_id = self.encode_key(keys::PROFILE, room_id);
1502 let mut user_ids_map = user_ids
1503 .iter()
1504 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1505 .collect::<BTreeMap<_, _>>();
1506 let user_ids = user_ids_map.keys().cloned().collect();
1507
1508 self.acquire()
1509 .await?
1510 .get_profiles(room_id, user_ids)
1511 .await?
1512 .into_iter()
1513 .map(|(user_id, data)| {
1514 Ok((
1515 user_ids_map
1516 .remove(user_id.as_slice())
1517 .expect("returned user IDs were requested"),
1518 self.deserialize_json(&data)?,
1519 ))
1520 })
1521 .collect()
1522 }
1523
1524 async fn get_user_ids(
1525 &self,
1526 room_id: &RoomId,
1527 membership: RoomMemberships,
1528 ) -> Result<Vec<OwnedUserId>> {
1529 let room_id = self.encode_key(keys::MEMBER, room_id);
1530 let memberships = membership
1531 .as_vec()
1532 .into_iter()
1533 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1534 .collect();
1535 self.acquire()
1536 .await?
1537 .get_user_ids(room_id, memberships)
1538 .await?
1539 .iter()
1540 .map(|data| self.deserialize_value(data))
1541 .collect()
1542 }
1543
1544 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1545 self.acquire()
1546 .await?
1547 .get_room_infos(match room_load_settings {
1548 RoomLoadSettings::All => None,
1549 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1550 })
1551 .await?
1552 .into_iter()
1553 .map(|data| self.deserialize_json(&data))
1554 .collect()
1555 }
1556
1557 async fn get_users_with_display_name(
1558 &self,
1559 room_id: &RoomId,
1560 display_name: &DisplayName,
1561 ) -> Result<BTreeSet<OwnedUserId>> {
1562 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1563 let names = vec![self.encode_key(
1564 keys::DISPLAY_NAME,
1565 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1566 )];
1567
1568 Ok(self
1569 .acquire()
1570 .await?
1571 .get_display_names(room_id, names)
1572 .await?
1573 .into_iter()
1574 .next()
1575 .map(|(_, data)| self.deserialize_json(&data))
1576 .transpose()?
1577 .unwrap_or_default())
1578 }
1579
1580 async fn get_users_with_display_names<'a>(
1581 &self,
1582 room_id: &RoomId,
1583 display_names: &'a [DisplayName],
1584 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1585 let mut result = HashMap::new();
1586
1587 if display_names.is_empty() {
1588 return Ok(result);
1589 }
1590
1591 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1592 let mut names_map = display_names
1593 .iter()
1594 .flat_map(|display_name| {
1595 let raw =
1605 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1606 let normalized = display_name.as_normalized_str().map(|normalized| {
1607 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1608 });
1609
1610 iter::once(raw).chain(normalized.into_iter())
1611 })
1612 .collect::<BTreeMap<_, _>>();
1613 let names = names_map.keys().cloned().collect();
1614
1615 for (name, data) in
1616 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1617 {
1618 let display_name =
1619 names_map.remove(name.as_slice()).expect("returned display names were requested");
1620 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1621
1622 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1623 }
1624
1625 Ok(result)
1626 }
1627
1628 async fn get_account_data_event(
1629 &self,
1630 event_type: GlobalAccountDataEventType,
1631 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1632 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1633 self.acquire()
1634 .await?
1635 .get_global_account_data(event_type)
1636 .await?
1637 .map(|value| self.deserialize_json(&value))
1638 .transpose()
1639 }
1640
1641 async fn get_room_account_data_event(
1642 &self,
1643 room_id: &RoomId,
1644 event_type: RoomAccountDataEventType,
1645 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1646 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1647 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1648 self.acquire()
1649 .await?
1650 .get_room_account_data(room_id, event_type)
1651 .await?
1652 .map(|value| self.deserialize_json(&value))
1653 .transpose()
1654 }
1655
1656 async fn get_user_room_receipt_event(
1657 &self,
1658 room_id: &RoomId,
1659 receipt_type: ReceiptType,
1660 thread: ReceiptThread,
1661 user_id: &UserId,
1662 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1663 let room_id = self.encode_key(keys::RECEIPT, room_id);
1664 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1665 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1668 let user_id = self.encode_key(keys::RECEIPT, user_id);
1669
1670 self.acquire()
1671 .await?
1672 .get_user_receipt(room_id, receipt_type, thread, user_id)
1673 .await?
1674 .map(|value| {
1675 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1676 })
1677 .transpose()
1678 }
1679
1680 async fn get_event_room_receipt_events(
1681 &self,
1682 room_id: &RoomId,
1683 receipt_type: ReceiptType,
1684 thread: ReceiptThread,
1685 event_id: &EventId,
1686 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1687 let room_id = self.encode_key(keys::RECEIPT, room_id);
1688 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1689 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1692 let event_id = self.encode_key(keys::RECEIPT, event_id);
1693
1694 self.acquire()
1695 .await?
1696 .get_event_receipts(room_id, receipt_type, thread, event_id)
1697 .await?
1698 .iter()
1699 .map(|value| {
1700 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1701 })
1702 .collect()
1703 }
1704
1705 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1706 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1707 }
1708
1709 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1710 let conn = self.acquire().await?;
1711 let key = self.encode_custom_key(key);
1712 conn.set_kv_blob(key, value).await?;
1713 Ok(())
1714 }
1715
1716 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1717 let conn = self.acquire().await?;
1718 let key = self.encode_custom_key(key);
1719 let previous = conn.get_kv_blob(key.clone()).await?;
1720 conn.set_kv_blob(key, value).await?;
1721 Ok(previous)
1722 }
1723
1724 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1725 let conn = self.acquire().await?;
1726 let key = self.encode_custom_key(key);
1727 let previous = conn.get_kv_blob(key.clone()).await?;
1728 if previous.is_some() {
1729 conn.delete_kv_blob(key).await?;
1730 }
1731 Ok(previous)
1732 }
1733
1734 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1735 let this = self.clone();
1736 let room_id = room_id.to_owned();
1737
1738 let conn = self.acquire().await?;
1739
1740 conn.with_transaction(move |txn| -> Result<()> {
1741 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1742 txn.remove_room_info(&room_info_room_id)?;
1743
1744 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1745 txn.remove_room_state_events(&state_event_room_id, None)?;
1746
1747 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1748 txn.remove_room_members(&member_room_id, None)?;
1749
1750 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1751 txn.remove_room_profiles(&profile_room_id)?;
1752
1753 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1754 txn.remove_room_account_data(&room_account_data_room_id)?;
1755
1756 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1757 txn.remove_room_receipts(&receipt_room_id)?;
1758
1759 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1760 txn.remove_room_display_names(&display_name_room_id)?;
1761
1762 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1763 txn.remove_room_send_queue(&send_queue_room_id)?;
1764
1765 let dependent_send_queue_room_id =
1766 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1767 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1768
1769 let thread_subscriptions_room_id =
1770 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1771 txn.execute(
1772 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1773 (thread_subscriptions_room_id,),
1774 )?;
1775
1776 Ok(())
1777 })
1778 .await?;
1779
1780 conn.vacuum().await
1781 }
1782
1783 async fn save_send_queue_request(
1784 &self,
1785 room_id: &RoomId,
1786 transaction_id: OwnedTransactionId,
1787 created_at: MilliSecondsSinceUnixEpoch,
1788 content: QueuedRequestKind,
1789 priority: usize,
1790 ) -> Result<(), Self::Error> {
1791 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1792 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1793
1794 let content = self.serialize_json(&content)?;
1795 let created_at_ts: u64 = created_at.0.into();
1801 self.acquire()
1802 .await?
1803 .with_transaction(move |txn| {
1804 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1805 Ok(())
1806 })
1807 .await
1808 }
1809
1810 async fn update_send_queue_request(
1811 &self,
1812 room_id: &RoomId,
1813 transaction_id: &TransactionId,
1814 content: QueuedRequestKind,
1815 ) -> Result<bool, Self::Error> {
1816 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1817
1818 let content = self.serialize_json(&content)?;
1819 let transaction_id = transaction_id.to_string();
1822
1823 let num_updated = self.acquire()
1824 .await?
1825 .with_transaction(move |txn| {
1826 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1827 })
1828 .await?;
1829
1830 Ok(num_updated > 0)
1831 }
1832
1833 async fn remove_send_queue_request(
1834 &self,
1835 room_id: &RoomId,
1836 transaction_id: &TransactionId,
1837 ) -> Result<bool, Self::Error> {
1838 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1839
1840 let transaction_id = transaction_id.to_string();
1842
1843 let num_deleted = self
1844 .acquire()
1845 .await?
1846 .with_transaction(move |txn| {
1847 txn.prepare_cached(
1848 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1849 )?
1850 .execute((room_id, &transaction_id))
1851 })
1852 .await?;
1853
1854 Ok(num_deleted > 0)
1855 }
1856
1857 async fn load_send_queue_requests(
1858 &self,
1859 room_id: &RoomId,
1860 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1861 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1862
1863 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1867 .acquire()
1868 .await?
1869 .prepare(
1870 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1871 |mut stmt| {
1872 stmt.query((room_id,))?
1873 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1874 .collect()
1875 },
1876 )
1877 .await?;
1878
1879 let mut requests = Vec::with_capacity(res.len());
1880 for entry in res {
1881 let created_at = entry
1882 .4
1883 .and_then(UInt::new)
1884 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1885 requests.push(QueuedRequest {
1886 transaction_id: entry.0.into(),
1887 kind: self.deserialize_json(&entry.1)?,
1888 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1889 priority: entry.3,
1890 created_at,
1891 });
1892 }
1893
1894 Ok(requests)
1895 }
1896
1897 async fn update_send_queue_request_status(
1898 &self,
1899 room_id: &RoomId,
1900 transaction_id: &TransactionId,
1901 error: Option<QueueWedgeError>,
1902 ) -> Result<(), Self::Error> {
1903 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1904
1905 let transaction_id = transaction_id.to_string();
1907
1908 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1910
1911 self.acquire()
1912 .await?
1913 .with_transaction(move |txn| {
1914 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1915 Ok(())
1916 })
1917 .await
1918 }
1919
1920 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1921 let res: Vec<Vec<u8>> = self
1926 .acquire()
1927 .await?
1928 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1929 stmt.query(())?.mapped(|row| row.get(0)).collect()
1930 })
1931 .await?;
1932
1933 Ok(res
1936 .into_iter()
1937 .map(|entry| self.deserialize_value(&entry))
1938 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1939 .into_iter()
1940 .collect())
1941 }
1942
1943 async fn save_dependent_queued_request(
1944 &self,
1945 room_id: &RoomId,
1946 parent_txn_id: &TransactionId,
1947 own_txn_id: ChildTransactionId,
1948 created_at: MilliSecondsSinceUnixEpoch,
1949 content: DependentQueuedRequestKind,
1950 ) -> Result<()> {
1951 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1952 let content = self.serialize_json(&content)?;
1953
1954 let parent_txn_id = parent_txn_id.to_string();
1956 let own_txn_id = own_txn_id.to_string();
1957
1958 let created_at_ts: u64 = created_at.0.into();
1959 self.acquire()
1960 .await?
1961 .with_transaction(move |txn| {
1962 txn.prepare_cached(
1963 r#"INSERT INTO dependent_send_queue_events
1964 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1965 VALUES (?, ?, ?, ?, ?)"#,
1966 )?
1967 .execute((
1968 room_id,
1969 parent_txn_id,
1970 own_txn_id,
1971 content,
1972 created_at_ts,
1973 ))?;
1974 Ok(())
1975 })
1976 .await
1977 }
1978
1979 async fn update_dependent_queued_request(
1980 &self,
1981 room_id: &RoomId,
1982 own_transaction_id: &ChildTransactionId,
1983 new_content: DependentQueuedRequestKind,
1984 ) -> Result<bool> {
1985 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1986 let content = self.serialize_json(&new_content)?;
1987
1988 let own_txn_id = own_transaction_id.to_string();
1990
1991 let num_updated = self
1992 .acquire()
1993 .await?
1994 .with_transaction(move |txn| {
1995 txn.prepare_cached(
1996 r#"UPDATE dependent_send_queue_events
1997 SET content = ?
1998 WHERE own_transaction_id = ?
1999 AND room_id = ?"#,
2000 )?
2001 .execute((content, own_txn_id, room_id))
2002 })
2003 .await?;
2004
2005 if num_updated > 1 {
2006 return Err(Error::InconsistentUpdate);
2007 }
2008
2009 Ok(num_updated == 1)
2010 }
2011
2012 async fn mark_dependent_queued_requests_as_ready(
2013 &self,
2014 room_id: &RoomId,
2015 parent_txn_id: &TransactionId,
2016 parent_key: SentRequestKey,
2017 ) -> Result<usize> {
2018 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2019 let parent_key = self.serialize_value(&parent_key)?;
2020
2021 let parent_txn_id = parent_txn_id.to_string();
2023
2024 self.acquire()
2025 .await?
2026 .with_transaction(move |txn| {
2027 Ok(txn.prepare_cached(
2028 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2029 )?
2030 .execute((parent_key, parent_txn_id, room_id))?)
2031 })
2032 .await
2033 }
2034
2035 async fn remove_dependent_queued_request(
2036 &self,
2037 room_id: &RoomId,
2038 txn_id: &ChildTransactionId,
2039 ) -> Result<bool> {
2040 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2041
2042 let txn_id = txn_id.to_string();
2044
2045 let num_deleted = self
2046 .acquire()
2047 .await?
2048 .with_transaction(move |txn| {
2049 txn.prepare_cached(
2050 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2051 )?
2052 .execute((txn_id, room_id))
2053 })
2054 .await?;
2055
2056 Ok(num_deleted > 0)
2057 }
2058
2059 async fn load_dependent_queued_requests(
2060 &self,
2061 room_id: &RoomId,
2062 ) -> Result<Vec<DependentQueuedRequest>> {
2063 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2064
2065 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2067 .acquire()
2068 .await?
2069 .prepare(
2070 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2071 |mut stmt| {
2072 stmt.query((room_id,))?
2073 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2074 .collect()
2075 },
2076 )
2077 .await?;
2078
2079 let mut dependent_events = Vec::with_capacity(res.len());
2080 for entry in res {
2081 let created_at = entry
2082 .4
2083 .and_then(UInt::new)
2084 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2085 dependent_events.push(DependentQueuedRequest {
2086 own_transaction_id: entry.0.into(),
2087 parent_transaction_id: entry.1.into(),
2088 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2089 kind: self.deserialize_json(&entry.3)?,
2090 created_at,
2091 });
2092 }
2093
2094 Ok(dependent_events)
2095 }
2096
2097 async fn upsert_thread_subscription(
2098 &self,
2099 room_id: &RoomId,
2100 thread_id: &EventId,
2101 subscription: ThreadSubscription,
2102 ) -> Result<(), Self::Error> {
2103 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2104 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2105 let status = subscription.as_str();
2106
2107 self.acquire()
2108 .await?
2109 .with_transaction(move |txn| {
2110 txn.prepare_cached(
2111 "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status)
2112 VALUES (?, ?, ?)",
2113 )?
2114 .execute((room_id, thread_id, status))
2115 })
2116 .await?;
2117 Ok(())
2118 }
2119
2120 async fn load_thread_subscription(
2121 &self,
2122 room_id: &RoomId,
2123 thread_id: &EventId,
2124 ) -> Result<Option<ThreadSubscription>, Self::Error> {
2125 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2126 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2127
2128 Ok(self
2129 .acquire()
2130 .await?
2131 .query_row(
2132 "SELECT status FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2133 (room_id, thread_id),
2134 |row| row.get::<_, String>(0),
2135 )
2136 .await
2137 .optional()?
2138 .map(|data| {
2139 ThreadSubscription::from_value(&data).ok_or_else(|| Error::InvalidData {
2140 details: format!("Invalid thread status: {data}"),
2141 })
2142 })
2143 .transpose()?)
2144 }
2145
2146 async fn remove_thread_subscription(
2147 &self,
2148 room_id: &RoomId,
2149 thread_id: &EventId,
2150 ) -> Result<(), Self::Error> {
2151 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2152 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2153
2154 self.acquire()
2155 .await?
2156 .execute(
2157 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2158 (room_id, thread_id),
2159 )
2160 .await?;
2161
2162 Ok(())
2163 }
2164}
2165
2166#[derive(Debug, Clone, Serialize, Deserialize)]
2167struct ReceiptData {
2168 receipt: Receipt,
2169 event_id: OwnedEventId,
2170 user_id: OwnedUserId,
2171}
2172
2173#[cfg(test)]
2174mod tests {
2175 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2176
2177 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2178 use once_cell::sync::Lazy;
2179 use tempfile::{tempdir, TempDir};
2180
2181 use super::SqliteStateStore;
2182
2183 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2184 static NUM: AtomicU32 = AtomicU32::new(0);
2185
2186 async fn get_store() -> Result<impl StateStore, StoreError> {
2187 let name = NUM.fetch_add(1, SeqCst).to_string();
2188 let tmpdir_path = TMP_DIR.path().join(name);
2189
2190 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2191
2192 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2193 }
2194
2195 statestore_integration_tests!();
2196}
2197
2198#[cfg(test)]
2199mod encrypted_tests {
2200 use std::{
2201 path::PathBuf,
2202 sync::atomic::{AtomicU32, Ordering::SeqCst},
2203 };
2204
2205 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2206 use matrix_sdk_test::async_test;
2207 use once_cell::sync::Lazy;
2208 use tempfile::{tempdir, TempDir};
2209
2210 use super::SqliteStateStore;
2211 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2212
2213 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2214 static NUM: AtomicU32 = AtomicU32::new(0);
2215
2216 fn new_state_store_workspace() -> PathBuf {
2217 let name = NUM.fetch_add(1, SeqCst).to_string();
2218 TMP_DIR.path().join(name)
2219 }
2220
2221 async fn get_store() -> Result<impl StateStore, StoreError> {
2222 let tmpdir_path = new_state_store_workspace();
2223
2224 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2225
2226 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2227 .await
2228 .unwrap())
2229 }
2230
2231 #[async_test]
2232 async fn test_pool_size() {
2233 let tmpdir_path = new_state_store_workspace();
2234 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2235
2236 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2237
2238 assert_eq!(store.pool.status().max_size, 42);
2239 }
2240
2241 #[async_test]
2242 async fn test_cache_size() {
2243 let tmpdir_path = new_state_store_workspace();
2244 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2245
2246 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2247
2248 let conn = store.pool.get().await.unwrap();
2249 let cache_size =
2250 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2251
2252 assert_eq!(cache_size, -(1500 / 1024));
2256 }
2257
2258 #[async_test]
2259 async fn test_journal_size_limit() {
2260 let tmpdir_path = new_state_store_workspace();
2261 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2262
2263 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2264
2265 let conn = store.pool.get().await.unwrap();
2266 let journal_size_limit = conn
2267 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2268 .await
2269 .unwrap();
2270
2271 assert_eq!(journal_size_limit, 1500);
2274 }
2275
2276 statestore_integration_tests!();
2277}
2278
2279#[cfg(test)]
2280mod migration_tests {
2281 use std::{
2282 path::{Path, PathBuf},
2283 sync::{
2284 atomic::{AtomicU32, Ordering::SeqCst},
2285 Arc,
2286 },
2287 };
2288
2289 use as_variant::as_variant;
2290 use deadpool_sqlite::Runtime;
2291 use matrix_sdk_base::{
2292 media::{MediaFormat, MediaRequestParameters},
2293 store::{
2294 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2295 SerializableEventContent,
2296 },
2297 sync::UnreadNotificationsCount,
2298 RoomState, StateStore,
2299 };
2300 use matrix_sdk_test::async_test;
2301 use once_cell::sync::Lazy;
2302 use ruma::{
2303 events::{
2304 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2305 StateEventType,
2306 },
2307 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2308 RoomId, TransactionId, UserId,
2309 };
2310 use rusqlite::Transaction;
2311 use serde::{Deserialize, Serialize};
2312 use serde_json::json;
2313 use tempfile::{tempdir, TempDir};
2314 use tokio::fs;
2315
2316 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2317 use crate::{
2318 error::{Error, Result},
2319 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2320 OpenStoreError,
2321 };
2322
2323 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2324 static NUM: AtomicU32 = AtomicU32::new(0);
2325 const SECRET: &str = "secret";
2326
2327 fn new_path() -> PathBuf {
2328 let name = NUM.fetch_add(1, SeqCst).to_string();
2329 TMP_DIR.path().join(name)
2330 }
2331
2332 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2333 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2334
2335 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2336 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2339 let conn = pool.get().await?;
2340
2341 init(&conn).await?;
2342
2343 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2344 let this = SqliteStateStore { store_cipher, pool };
2345 this.run_migrations(&conn, 1, Some(version)).await?;
2346
2347 Ok(this)
2348 }
2349
2350 fn room_info_v1_json(
2351 room_id: &RoomId,
2352 state: RoomState,
2353 name: Option<&str>,
2354 creator: Option<&UserId>,
2355 ) -> serde_json::Value {
2356 let name_content = match name {
2358 Some(name) => json!({ "name": name }),
2359 None => json!({ "name": null }),
2360 };
2361 let create_content = match creator {
2363 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2364 None => RoomCreateEventContent::new_v11(),
2365 };
2366
2367 json!({
2368 "room_id": room_id,
2369 "room_type": state,
2370 "notification_counts": UnreadNotificationsCount::default(),
2371 "summary": {
2372 "heroes": [],
2373 "joined_member_count": 0,
2374 "invited_member_count": 0,
2375 },
2376 "members_synced": false,
2377 "base_info": {
2378 "dm_targets": [],
2379 "max_power_level": 100,
2380 "name": {
2381 "Original": {
2382 "content": name_content,
2383 },
2384 },
2385 "create": {
2386 "Original": {
2387 "content": create_content,
2388 }
2389 }
2390 },
2391 })
2392 }
2393
2394 #[async_test]
2395 pub async fn test_migrating_v1_to_v2() {
2396 let path = new_path();
2397 {
2399 let db = create_fake_db(&path, 1).await.unwrap();
2400 let conn = db.pool.get().await.unwrap();
2401
2402 let this = db.clone();
2403 conn.with_transaction(move |txn| {
2404 for i in 0..5 {
2405 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2406 let (state, stripped) =
2407 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2408 let info = room_info_v1_json(&room_id, state, None, None);
2409
2410 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2411 let data = this.serialize_json(&info)?;
2412
2413 txn.prepare_cached(
2414 "INSERT INTO room_info (room_id, stripped, data)
2415 VALUES (?, ?, ?)",
2416 )?
2417 .execute((room_id, stripped, data))?;
2418 }
2419
2420 Result::<_, Error>::Ok(())
2421 })
2422 .await
2423 .unwrap();
2424 }
2425
2426 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2428
2429 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2431 }
2432
2433 fn add_room_v2(
2435 this: &SqliteStateStore,
2436 txn: &Transaction<'_>,
2437 room_id: &RoomId,
2438 name: Option<&str>,
2439 create_creator: Option<&UserId>,
2440 create_sender: Option<&UserId>,
2441 ) -> Result<(), Error> {
2442 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2443
2444 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2445 let encoded_state =
2446 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2447 let data = this.serialize_json(&room_info_json)?;
2448
2449 txn.prepare_cached(
2450 "INSERT INTO room_info (room_id, state, data)
2451 VALUES (?, ?, ?)",
2452 )?
2453 .execute((encoded_room_id, encoded_state, data))?;
2454
2455 let Some(create_sender) = create_sender else {
2457 return Ok(());
2458 };
2459
2460 let create_content = match create_creator {
2461 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2462 None => RoomCreateEventContent::new_v11(),
2463 };
2464
2465 let event_id = EventId::new(server_name!("dummy.local"));
2466 let create_event = json!({
2467 "content": create_content,
2468 "event_id": event_id,
2469 "sender": create_sender.to_owned(),
2470 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2471 "state_key": "",
2472 "type": "m.room.create",
2473 "unsigned": {},
2474 });
2475
2476 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2477 let encoded_event_type =
2478 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2479 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2480 let stripped = false;
2481 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2482 let data = this.serialize_json(&create_event)?;
2483
2484 txn.prepare_cached(
2485 "INSERT
2486 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2487 VALUES (?, ?, ?, ?, ?, ?)",
2488 )?
2489 .execute((
2490 encoded_room_id,
2491 encoded_event_type,
2492 encoded_state_key,
2493 stripped,
2494 encoded_event_id,
2495 data,
2496 ))?;
2497
2498 Ok(())
2499 }
2500
2501 #[async_test]
2502 pub async fn test_migrating_v2_to_v3() {
2503 let path = new_path();
2504
2505 let room_a_id = room_id!("!room_a:dummy.local");
2507 let room_a_name = "Room A";
2508 let room_a_creator = user_id!("@creator:dummy.local");
2509 let room_a_create_sender = user_id!("@sender:dummy.local");
2512
2513 let room_b_id = room_id!("!room_b:dummy.local");
2515
2516 let room_c_id = room_id!("!room_c:dummy.local");
2518 let room_c_create_sender = user_id!("@creator:dummy.local");
2519
2520 {
2522 let db = create_fake_db(&path, 2).await.unwrap();
2523 let conn = db.pool.get().await.unwrap();
2524
2525 let this = db.clone();
2526 conn.with_transaction(move |txn| {
2527 add_room_v2(
2528 &this,
2529 txn,
2530 room_a_id,
2531 Some(room_a_name),
2532 Some(room_a_creator),
2533 Some(room_a_create_sender),
2534 )?;
2535 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2536 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2537
2538 Result::<_, Error>::Ok(())
2539 })
2540 .await
2541 .unwrap();
2542 }
2543
2544 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2546
2547 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2549 assert_eq!(room_infos.len(), 3);
2550
2551 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2552 assert_eq!(room_a.name(), Some(room_a_name));
2553 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2554
2555 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2556 assert_eq!(room_b.name(), None);
2557 assert_eq!(room_b.creators(), None);
2558
2559 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2560 assert_eq!(room_c.name(), None);
2561 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2562 }
2563
2564 #[async_test]
2565 pub async fn test_migrating_v7_to_v9() {
2566 let path = new_path();
2567
2568 let room_id = room_id!("!room_a:dummy.local");
2569 let wedged_event_transaction_id = TransactionId::new();
2570 let local_event_transaction_id = TransactionId::new();
2571
2572 {
2574 let db = create_fake_db(&path, 7).await.unwrap();
2575 let conn = db.pool.get().await.unwrap();
2576
2577 let wedge_tx = wedged_event_transaction_id.clone();
2578 let local_tx = local_event_transaction_id.clone();
2579
2580 conn.with_transaction(move |txn| {
2581 add_dependent_send_queue_event_v7(
2582 &db,
2583 txn,
2584 room_id,
2585 &local_tx,
2586 ChildTransactionId::new(),
2587 DependentQueuedRequestKind::RedactEvent,
2588 )?;
2589 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2590 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2591 Result::<_, Error>::Ok(())
2592 })
2593 .await
2594 .unwrap();
2595 }
2596
2597 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2600
2601 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2602 assert!(requests.is_empty());
2603
2604 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2605 assert!(dependent_requests.is_empty());
2606 }
2607
2608 fn add_send_queue_event_v7(
2609 this: &SqliteStateStore,
2610 txn: &Transaction<'_>,
2611 transaction_id: &TransactionId,
2612 room_id: &RoomId,
2613 is_wedged: bool,
2614 ) -> Result<(), Error> {
2615 let content =
2616 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2617
2618 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2619 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2620
2621 let content = this.serialize_json(&content)?;
2622
2623 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2624 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2625
2626 Ok(())
2627 }
2628
2629 fn add_dependent_send_queue_event_v7(
2630 this: &SqliteStateStore,
2631 txn: &Transaction<'_>,
2632 room_id: &RoomId,
2633 parent_txn_id: &TransactionId,
2634 own_txn_id: ChildTransactionId,
2635 content: DependentQueuedRequestKind,
2636 ) -> Result<(), Error> {
2637 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2638
2639 let parent_txn_id = parent_txn_id.to_string();
2640 let own_txn_id = own_txn_id.to_string();
2641 let content = this.serialize_json(&content)?;
2642
2643 txn.prepare_cached(
2644 "INSERT INTO dependent_send_queue_events
2645 (room_id, parent_transaction_id, own_transaction_id, content)
2646 VALUES (?, ?, ?, ?)",
2647 )?
2648 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2649
2650 Ok(())
2651 }
2652
2653 #[derive(Clone, Debug, Serialize, Deserialize)]
2654 pub enum LegacyDependentQueuedRequestKind {
2655 UploadFileWithThumbnail {
2656 content_type: String,
2657 cache_key: MediaRequestParameters,
2658 related_to: OwnedTransactionId,
2659 },
2660 }
2661
2662 #[async_test]
2663 pub async fn test_dependent_queued_request_variant_renaming() {
2664 let path = new_path();
2665 let db = create_fake_db(&path, 7).await.unwrap();
2666
2667 let cache_key = MediaRequestParameters {
2668 format: MediaFormat::File,
2669 source: MediaSource::Plain("https://server.local/foobar".into()),
2670 };
2671 let related_to = TransactionId::new();
2672 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2673 content_type: "image/png".to_owned(),
2674 cache_key,
2675 related_to: related_to.clone(),
2676 };
2677
2678 let data = db
2679 .serialize_json(&request)
2680 .expect("should be able to serialize legacy dependent request");
2681 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2682 "should be able to deserialize dependent request from legacy dependent request",
2683 );
2684
2685 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2686 assert_eq!(de_related_to, related_to);
2687 });
2688 }
2689}