1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{
25 compute_filters_string, extract_event_relation,
26 media::{
27 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28 MediaService,
29 },
30 EventCacheStore,
31 },
32 Event, Gap,
33 },
34 linked_chunk::{
35 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
36 Position, RawChunk, Update,
37 },
38 media::{MediaRequestParameters, UniqueKey},
39 timer,
40};
41use matrix_sdk_store_encryption::StoreCipher;
42use ruma::{
43 events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
44 OwnedEventId, RoomId,
45};
46use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
47use tokio::{
48 fs,
49 sync::{Mutex, OwnedMutexGuard},
50};
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54 error::{Error, Result},
55 utils::{
56 repeat_vars, time_to_timestamp, EncryptableStore, Key, SqliteAsyncConnExt,
57 SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt,
58 },
59 OpenStoreError, SqliteStoreConfig,
60};
61
62mod keys {
63 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
65 pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
66
67 pub const LINKED_CHUNKS: &str = "linked_chunks";
69 pub const MEDIA: &str = "media";
70}
71
72const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
74
75const DATABASE_VERSION: u8 = 8;
81
82const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
85const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
88
89#[derive(Clone)]
91pub struct SqliteEventCacheStore {
92 store_cipher: Option<Arc<StoreCipher>>,
93
94 pool: SqlitePool,
96
97 write_connection: Arc<Mutex<SqliteAsyncConn>>,
102
103 media_service: MediaService,
104}
105
106#[cfg(not(tarpaulin_include))]
107impl fmt::Debug for SqliteEventCacheStore {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
110 }
111}
112
113impl EncryptableStore for SqliteEventCacheStore {
114 fn get_cypher(&self) -> Option<&StoreCipher> {
115 self.store_cipher.as_deref()
116 }
117}
118
119impl SqliteEventCacheStore {
120 pub async fn open(
123 path: impl AsRef<Path>,
124 passphrase: Option<&str>,
125 ) -> Result<Self, OpenStoreError> {
126 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
127 }
128
129 #[instrument(skip(config), fields(path = ?config.path))]
131 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
132 debug!(?config);
133
134 let _timer = timer!("open_with_config");
135
136 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
137
138 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
139
140 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
141 config.pool = Some(pool_config);
142
143 let pool = config.create_pool(Runtime::Tokio1)?;
144
145 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
146 this.write().await?.apply_runtime_config(runtime_config).await?;
147
148 Ok(this)
149 }
150
151 async fn open_with_pool(
154 pool: SqlitePool,
155 passphrase: Option<&str>,
156 ) -> Result<Self, OpenStoreError> {
157 let conn = pool.get().await?;
158
159 let version = conn.db_version().await?;
160 run_migrations(&conn, version).await?;
161
162 let store_cipher = match passphrase {
163 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
164 None => None,
165 };
166
167 let media_service = MediaService::new();
168 let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
169 let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
170 media_service.restore(media_retention_policy, last_media_cleanup_time);
171
172 Ok(Self {
173 store_cipher,
174 pool,
175 write_connection: Arc::new(Mutex::new(conn)),
177 media_service,
178 })
179 }
180
181 #[instrument(skip_all)]
183 async fn read(&self) -> Result<SqliteAsyncConn> {
184 trace!("Taking a `read` connection");
185 let _timer = timer!("connection");
186
187 let connection = self.pool.get().await?;
188
189 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
194
195 Ok(connection)
196 }
197
198 #[instrument(skip_all)]
200 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
201 trace!("Taking a `write` connection");
202 let _timer = timer!("connection");
203
204 let connection = self.write_connection.clone().lock_owned().await;
205
206 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
211
212 Ok(connection)
213 }
214
215 fn map_row_to_chunk(
216 row: &rusqlite::Row<'_>,
217 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
218 Ok((
219 row.get::<_, u64>(0)?,
220 row.get::<_, Option<u64>>(1)?,
221 row.get::<_, Option<u64>>(2)?,
222 row.get::<_, String>(3)?,
223 ))
224 }
225
226 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
227 let serialized = serde_json::to_vec(event)?;
228
229 let raw_event = event.raw();
231 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
232
233 let content = self.encode_value(serialized)?;
235
236 Ok(EncodedEvent {
237 content,
238 rel_type,
239 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
240 })
241 }
242}
243
244struct EncodedEvent {
245 content: Vec<u8>,
246 rel_type: Option<String>,
247 relates_to: Option<String>,
248}
249
250trait TransactionExtForLinkedChunks {
251 fn rebuild_chunk(
252 &self,
253 store: &SqliteEventCacheStore,
254 linked_chunk_id: &Key,
255 previous: Option<u64>,
256 index: u64,
257 next: Option<u64>,
258 chunk_type: &str,
259 ) -> Result<RawChunk<Event, Gap>>;
260
261 fn load_gap_content(
262 &self,
263 store: &SqliteEventCacheStore,
264 linked_chunk_id: &Key,
265 chunk_id: ChunkIdentifier,
266 ) -> Result<Gap>;
267
268 fn load_events_content(
269 &self,
270 store: &SqliteEventCacheStore,
271 linked_chunk_id: &Key,
272 chunk_id: ChunkIdentifier,
273 ) -> Result<Vec<Event>>;
274}
275
276impl TransactionExtForLinkedChunks for Transaction<'_> {
277 fn rebuild_chunk(
278 &self,
279 store: &SqliteEventCacheStore,
280 linked_chunk_id: &Key,
281 previous: Option<u64>,
282 id: u64,
283 next: Option<u64>,
284 chunk_type: &str,
285 ) -> Result<RawChunk<Event, Gap>> {
286 let previous = previous.map(ChunkIdentifier::new);
287 let next = next.map(ChunkIdentifier::new);
288 let id = ChunkIdentifier::new(id);
289
290 match chunk_type {
291 CHUNK_TYPE_GAP_TYPE_STRING => {
292 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
294 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
295 }
296
297 CHUNK_TYPE_EVENT_TYPE_STRING => {
298 let events = self.load_events_content(store, linked_chunk_id, id)?;
300 Ok(RawChunk {
301 content: ChunkContent::Items(events),
302 previous,
303 identifier: id,
304 next,
305 })
306 }
307
308 other => {
309 Err(Error::InvalidData {
311 details: format!("a linked chunk has an unknown type {other}"),
312 })
313 }
314 }
315 }
316
317 fn load_gap_content(
318 &self,
319 store: &SqliteEventCacheStore,
320 linked_chunk_id: &Key,
321 chunk_id: ChunkIdentifier,
322 ) -> Result<Gap> {
323 let encoded_prev_token: Vec<u8> = self.query_row(
326 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
327 (chunk_id.index(), &linked_chunk_id),
328 |row| row.get(0),
329 )?;
330 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
331 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
332 Ok(Gap { prev_token })
333 }
334
335 fn load_events_content(
336 &self,
337 store: &SqliteEventCacheStore,
338 linked_chunk_id: &Key,
339 chunk_id: ChunkIdentifier,
340 ) -> Result<Vec<Event>> {
341 let mut events = Vec::new();
343
344 for event_data in self
345 .prepare(
346 r#"
347 SELECT events.content
348 FROM event_chunks ec, events
349 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
350 ORDER BY ec.position ASC
351 "#,
352 )?
353 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
354 {
355 let encoded_content = event_data?;
356 let serialized_content = store.decode_value(&encoded_content)?;
357 let event = serde_json::from_slice(&serialized_content)?;
358
359 events.push(event);
360 }
361
362 Ok(events)
363 }
364}
365
366async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
368 if version == 0 {
369 debug!("Creating database");
370 } else if version < DATABASE_VERSION {
371 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
372 } else {
373 return Ok(());
374 }
375
376 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
378
379 if version < 1 {
380 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
383 conn.with_transaction(|txn| {
384 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
385 txn.set_db_version(1)
386 })
387 .await?;
388 }
389
390 if version < 2 {
391 conn.with_transaction(|txn| {
392 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
393 txn.set_db_version(2)
394 })
395 .await?;
396 }
397
398 if version < 3 {
399 conn.with_transaction(|txn| {
400 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
401 txn.set_db_version(3)
402 })
403 .await?;
404 }
405
406 if version < 4 {
407 conn.with_transaction(|txn| {
408 txn.execute_batch(include_str!(
409 "../migrations/event_cache_store/004_ignore_policy.sql"
410 ))?;
411 txn.set_db_version(4)
412 })
413 .await?;
414 }
415
416 if version < 5 {
417 conn.with_transaction(|txn| {
418 txn.execute_batch(include_str!(
419 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
420 ))?;
421 txn.set_db_version(5)
422 })
423 .await?;
424 }
425
426 if version < 6 {
427 conn.with_transaction(|txn| {
428 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
429 txn.set_db_version(6)
430 })
431 .await?;
432 }
433
434 if version < 7 {
435 conn.with_transaction(|txn| {
436 txn.execute_batch(include_str!(
437 "../migrations/event_cache_store/007_event_chunks.sql"
438 ))?;
439 txn.set_db_version(7)
440 })
441 .await?;
442 }
443
444 if version < 8 {
445 conn.with_transaction(|txn| {
446 txn.execute_batch(include_str!(
447 "../migrations/event_cache_store/008_linked_chunk_id.sql"
448 ))?;
449 txn.set_db_version(8)
450 })
451 .await?;
452 }
453
454 Ok(())
455}
456
457#[async_trait]
458impl EventCacheStore for SqliteEventCacheStore {
459 type Error = Error;
460
461 #[instrument(skip(self))]
462 async fn try_take_leased_lock(
463 &self,
464 lease_duration_ms: u32,
465 key: &str,
466 holder: &str,
467 ) -> Result<bool> {
468 let _timer = timer!("method");
469
470 let key = key.to_owned();
471 let holder = holder.to_owned();
472
473 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
474 let expiration = now + lease_duration_ms as u64;
475
476 let num_touched = self
477 .write()
478 .await?
479 .with_transaction(move |txn| {
480 txn.execute(
481 "INSERT INTO lease_locks (key, holder, expiration)
482 VALUES (?1, ?2, ?3)
483 ON CONFLICT (key)
484 DO
485 UPDATE SET holder = ?2, expiration = ?3
486 WHERE holder = ?2
487 OR expiration < ?4
488 ",
489 (key, holder, expiration, now),
490 )
491 })
492 .await?;
493
494 Ok(num_touched == 1)
495 }
496
497 #[instrument(skip(self, updates))]
498 async fn handle_linked_chunk_updates(
499 &self,
500 linked_chunk_id: LinkedChunkId<'_>,
501 updates: Vec<Update<Event, Gap>>,
502 ) -> Result<(), Self::Error> {
503 let _timer = timer!("method");
504
505 let hashed_linked_chunk_id =
508 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
509 let linked_chunk_id = linked_chunk_id.to_owned();
510 let this = self.clone();
511
512 with_immediate_transaction(self, move |txn| {
513 for up in updates {
514 match up {
515 Update::NewItemsChunk { previous, new, next } => {
516 let previous = previous.as_ref().map(ChunkIdentifier::index);
517 let new = new.index();
518 let next = next.as_ref().map(ChunkIdentifier::index);
519
520 trace!(
521 %linked_chunk_id,
522 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
523 );
524
525 insert_chunk(
526 txn,
527 &hashed_linked_chunk_id,
528 previous,
529 new,
530 next,
531 CHUNK_TYPE_EVENT_TYPE_STRING,
532 )?;
533 }
534
535 Update::NewGapChunk { previous, new, next, gap } => {
536 let serialized = serde_json::to_vec(&gap.prev_token)?;
537 let prev_token = this.encode_value(serialized)?;
538
539 let previous = previous.as_ref().map(ChunkIdentifier::index);
540 let new = new.index();
541 let next = next.as_ref().map(ChunkIdentifier::index);
542
543 trace!(
544 %linked_chunk_id,
545 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
546 );
547
548 insert_chunk(
550 txn,
551 &hashed_linked_chunk_id,
552 previous,
553 new,
554 next,
555 CHUNK_TYPE_GAP_TYPE_STRING,
556 )?;
557
558 txn.execute(
560 r#"
561 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
562 VALUES (?, ?, ?)
563 "#,
564 (new, &hashed_linked_chunk_id, prev_token),
565 )?;
566 }
567
568 Update::RemoveChunk(chunk_identifier) => {
569 let chunk_id = chunk_identifier.index();
570
571 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
572
573 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
575 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
576 (chunk_id, &hashed_linked_chunk_id),
577 |row| Ok((row.get(0)?, row.get(1)?))
578 )?;
579
580 if let Some(previous) = previous {
582 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
583 }
584
585 if let Some(next) = next {
587 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
588 }
589
590 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
593 }
594
595 Update::PushItems { at, items } => {
596 if items.is_empty() {
597 continue;
599 }
600
601 let chunk_id = at.chunk_identifier().index();
602
603 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
604
605 let mut chunk_statement = txn.prepare(
606 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
607 )?;
608
609 let mut content_statement = txn.prepare(
614 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
615 )?;
616
617 let invalid_event = |event: TimelineEvent| {
618 let Some(event_id) = event.event_id() else {
619 error!(%linked_chunk_id, "Trying to push an event with no ID");
620 return None;
621 };
622
623 Some((event_id.to_string(), event))
624 };
625
626 let room_id = linked_chunk_id.room_id();
627 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
628
629 for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
630 let index = at.index() + i;
632 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
633
634 let encoded_event = this.encode_event(&event)?;
636 content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
637 }
638 }
639
640 Update::ReplaceItem { at, item: event } => {
641 let chunk_id = at.chunk_identifier().index();
642
643 let index = at.index();
644
645 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
646
647 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
649 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
650 continue;
651 };
652
653 let encoded_event = this.encode_event(&event)?;
657 let room_id = linked_chunk_id.room_id();
658 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
659 txn.execute(
660 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
661 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
662
663 txn.execute(
665 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
666 (event_id, &hashed_linked_chunk_id, chunk_id, index)
667 )?;
668 }
669
670 Update::RemoveItem { at } => {
671 let chunk_id = at.chunk_identifier().index();
672 let index = at.index();
673
674 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
675
676 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
678
679 txn.execute(
778 r#"
779 UPDATE event_chunks
780 SET position = -(position - 1)
781 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
782 "#,
783 (&hashed_linked_chunk_id, chunk_id, index)
784 )?;
785 txn.execute(
786 r#"
787 UPDATE event_chunks
788 SET position = -position
789 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
790 "#,
791 (&hashed_linked_chunk_id, chunk_id)
792 )?;
793 }
794
795 Update::DetachLastItems { at } => {
796 let chunk_id = at.chunk_identifier().index();
797 let index = at.index();
798
799 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
800
801 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
803 }
804
805 Update::Clear => {
806 trace!(%linked_chunk_id, "clearing items");
807
808 txn.execute(
810 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
811 (&hashed_linked_chunk_id,),
812 )?;
813 }
814
815 Update::StartReattachItems | Update::EndReattachItems => {
816 }
818 }
819 }
820
821 Ok(())
822 })
823 .await?;
824
825 Ok(())
826 }
827
828 #[instrument(skip(self))]
829 async fn load_all_chunks(
830 &self,
831 linked_chunk_id: LinkedChunkId<'_>,
832 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
833 let _timer = timer!("method");
834
835 let hashed_linked_chunk_id =
836 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
837
838 let this = self.clone();
839
840 let result = self
841 .read()
842 .await?
843 .with_transaction(move |txn| -> Result<_> {
844 let mut items = Vec::new();
845
846 for data in txn
848 .prepare(
849 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
850 )?
851 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
852 {
853 let (id, previous, next, chunk_type) = data?;
854 let new = txn.rebuild_chunk(
855 &this,
856 &hashed_linked_chunk_id,
857 previous,
858 id,
859 next,
860 chunk_type.as_str(),
861 )?;
862 items.push(new);
863 }
864
865 Ok(items)
866 })
867 .await?;
868
869 Ok(result)
870 }
871
872 #[instrument(skip(self))]
873 async fn load_all_chunks_metadata(
874 &self,
875 linked_chunk_id: LinkedChunkId<'_>,
876 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
877 let _timer = timer!("method");
878
879 let hashed_linked_chunk_id =
880 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
881
882 self.read()
883 .await?
884 .with_transaction(move |txn| -> Result<_> {
885 let num_events_by_chunk_ids = txn
912 .prepare(
913 r#"
914 SELECT ec.chunk_id, COUNT(ec.event_id)
915 FROM event_chunks as ec
916 WHERE ec.linked_chunk_id = ?
917 GROUP BY ec.chunk_id
918 "#,
919 )?
920 .query_map((&hashed_linked_chunk_id,), |row| {
921 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
922 })?
923 .collect::<Result<HashMap<_, _>, _>>()?;
924
925 txn.prepare(
926 r#"
927 SELECT
928 lc.id,
929 lc.previous,
930 lc.next,
931 lc.type
932 FROM linked_chunks as lc
933 WHERE lc.linked_chunk_id = ?
934 ORDER BY lc.id"#,
935 )?
936 .query_map((&hashed_linked_chunk_id,), |row| {
937 Ok((
938 row.get::<_, u64>(0)?,
939 row.get::<_, Option<u64>>(1)?,
940 row.get::<_, Option<u64>>(2)?,
941 row.get::<_, String>(3)?,
942 ))
943 })?
944 .map(|data| -> Result<_> {
945 let (id, previous, next, chunk_type) = data?;
946
947 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
953 0
954 } else {
955 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
956 };
957
958 Ok(ChunkMetadata {
959 identifier: ChunkIdentifier::new(id),
960 previous: previous.map(ChunkIdentifier::new),
961 next: next.map(ChunkIdentifier::new),
962 num_items,
963 })
964 })
965 .collect::<Result<Vec<_>, _>>()
966 })
967 .await
968 }
969
970 #[instrument(skip(self))]
971 async fn load_last_chunk(
972 &self,
973 linked_chunk_id: LinkedChunkId<'_>,
974 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
975 let _timer = timer!("method");
976
977 let hashed_linked_chunk_id =
978 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
979
980 let this = self.clone();
981
982 self
983 .read()
984 .await?
985 .with_transaction(move |txn| -> Result<_> {
986 let (chunk_identifier_generator, number_of_chunks) = txn
988 .prepare(
989 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
990 )?
991 .query_row(
992 (&hashed_linked_chunk_id,),
993 |row| {
994 Ok((
995 row.get::<_, Option<u64>>(0)?,
1000 row.get::<_, u64>(1)?,
1001 ))
1002 }
1003 )?;
1004
1005 let chunk_identifier_generator = match chunk_identifier_generator {
1006 Some(last_chunk_identifier) => {
1007 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1008 ChunkIdentifier::new(last_chunk_identifier)
1009 )
1010 },
1011 None => ChunkIdentifierGenerator::new_from_scratch(),
1012 };
1013
1014 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1016 .prepare(
1017 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1018 )?
1019 .query_row(
1020 (&hashed_linked_chunk_id,),
1021 |row| {
1022 Ok((
1023 row.get::<_, u64>(0)?,
1024 row.get::<_, Option<u64>>(1)?,
1025 row.get::<_, String>(2)?,
1026 ))
1027 }
1028 )
1029 .optional()?
1030 else {
1031 if number_of_chunks == 0 {
1034 return Ok((None, chunk_identifier_generator));
1035 }
1036 else {
1041 return Err(Error::InvalidData {
1042 details:
1043 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1044 .to_owned()
1045 }
1046 )
1047 }
1048 };
1049
1050 let last_chunk = txn.rebuild_chunk(
1052 &this,
1053 &hashed_linked_chunk_id,
1054 previous_chunk,
1055 chunk_identifier,
1056 None,
1057 &chunk_type
1058 )?;
1059
1060 Ok((Some(last_chunk), chunk_identifier_generator))
1061 })
1062 .await
1063 }
1064
1065 #[instrument(skip(self))]
1066 async fn load_previous_chunk(
1067 &self,
1068 linked_chunk_id: LinkedChunkId<'_>,
1069 before_chunk_identifier: ChunkIdentifier,
1070 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1071 let _timer = timer!("method");
1072
1073 let hashed_linked_chunk_id =
1074 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1075
1076 let this = self.clone();
1077
1078 self
1079 .read()
1080 .await?
1081 .with_transaction(move |txn| -> Result<_> {
1082 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1084 .prepare(
1085 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1086 )?
1087 .query_row(
1088 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1089 |row| {
1090 Ok((
1091 row.get::<_, u64>(0)?,
1092 row.get::<_, Option<u64>>(1)?,
1093 row.get::<_, Option<u64>>(2)?,
1094 row.get::<_, String>(3)?,
1095 ))
1096 }
1097 )
1098 .optional()?
1099 else {
1100 return Ok(None);
1102 };
1103
1104 let last_chunk = txn.rebuild_chunk(
1106 &this,
1107 &hashed_linked_chunk_id,
1108 previous_chunk,
1109 chunk_identifier,
1110 next_chunk,
1111 &chunk_type
1112 )?;
1113
1114 Ok(Some(last_chunk))
1115 })
1116 .await
1117 }
1118
1119 #[instrument(skip(self))]
1120 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1121 let _timer = timer!("method");
1122
1123 self.write()
1124 .await?
1125 .with_transaction(move |txn| {
1126 txn.execute("DELETE FROM linked_chunks", ())?;
1128 txn.execute("DELETE FROM events", ())
1130 })
1131 .await?;
1132
1133 Ok(())
1134 }
1135
1136 #[instrument(skip(self, events))]
1137 async fn filter_duplicated_events(
1138 &self,
1139 linked_chunk_id: LinkedChunkId<'_>,
1140 events: Vec<OwnedEventId>,
1141 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1142 let _timer = timer!("method");
1143
1144 if events.is_empty() {
1148 return Ok(Vec::new());
1149 }
1150
1151 let hashed_linked_chunk_id =
1153 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1154 let linked_chunk_id = linked_chunk_id.to_owned();
1155
1156 self.read()
1157 .await?
1158 .with_transaction(move |txn| -> Result<_> {
1159 txn.chunk_large_query_over(events, None, move |txn, events| {
1160 let query = format!(
1161 r#"
1162 SELECT event_id, chunk_id, position
1163 FROM event_chunks
1164 WHERE linked_chunk_id = ? AND event_id IN ({})
1165 ORDER BY chunk_id ASC, position ASC
1166 "#,
1167 repeat_vars(events.len()),
1168 );
1169
1170 let parameters = params_from_iter(
1171 once(
1173 hashed_linked_chunk_id
1174 .to_sql()
1175 .unwrap(),
1177 )
1178 .chain(events.iter().map(|event| {
1180 event
1181 .as_str()
1182 .to_sql()
1183 .unwrap()
1185 })),
1186 );
1187
1188 let mut duplicated_events = Vec::new();
1189
1190 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1191 Ok((
1192 row.get::<_, String>(0)?,
1193 row.get::<_, u64>(1)?,
1194 row.get::<_, usize>(2)?,
1195 ))
1196 })? {
1197 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1198
1199 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1200 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1203 continue;
1204 };
1205
1206 duplicated_events.push((
1207 duplicated_event,
1208 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1209 ));
1210 }
1211
1212 Ok(duplicated_events)
1213 })
1214 })
1215 .await
1216 }
1217
1218 #[instrument(skip(self, event_id))]
1219 async fn find_event(
1220 &self,
1221 room_id: &RoomId,
1222 event_id: &EventId,
1223 ) -> Result<Option<Event>, Self::Error> {
1224 let _timer = timer!("method");
1225
1226 let event_id = event_id.to_owned();
1227 let this = self.clone();
1228
1229 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1230
1231 self.read()
1232 .await?
1233 .with_transaction(move |txn| -> Result<_> {
1234 let Some(event) = txn
1235 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1236 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1237 .optional()?
1238 else {
1239 return Ok(None);
1241 };
1242
1243 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1244
1245 Ok(Some(event))
1246 })
1247 .await
1248 }
1249
1250 #[instrument(skip(self, event_id, filters))]
1251 async fn find_event_relations(
1252 &self,
1253 room_id: &RoomId,
1254 event_id: &EventId,
1255 filters: Option<&[RelationType]>,
1256 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1257 let _timer = timer!("method");
1258
1259 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1260
1261 let hashed_linked_chunk_id =
1262 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1263
1264 let event_id = event_id.to_owned();
1265 let filters = filters.map(ToOwned::to_owned);
1266 let store = self.clone();
1267
1268 self.read()
1269 .await?
1270 .with_transaction(move |txn| -> Result<_> {
1271 find_event_relations_transaction(
1272 store,
1273 hashed_room_id,
1274 hashed_linked_chunk_id,
1275 event_id,
1276 filters,
1277 txn,
1278 )
1279 })
1280 .await
1281 }
1282
1283 #[instrument(skip(self, event))]
1284 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1285 let _timer = timer!("method");
1286
1287 let Some(event_id) = event.event_id() else {
1288 error!(%room_id, "Trying to save an event with no ID");
1289 return Ok(());
1290 };
1291
1292 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1293 let event_id = event_id.to_string();
1294 let encoded_event = self.encode_event(&event)?;
1295
1296 self.write()
1297 .await?
1298 .with_transaction(move |txn| -> Result<_> {
1299 txn.execute(
1300 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1301 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1302
1303 Ok(())
1304 })
1305 .await
1306 }
1307
1308 #[instrument(skip_all)]
1309 async fn add_media_content(
1310 &self,
1311 request: &MediaRequestParameters,
1312 content: Vec<u8>,
1313 ignore_policy: IgnoreMediaRetentionPolicy,
1314 ) -> Result<()> {
1315 let _timer = timer!("method");
1316
1317 self.media_service.add_media_content(self, request, content, ignore_policy).await
1318 }
1319
1320 #[instrument(skip_all)]
1321 async fn replace_media_key(
1322 &self,
1323 from: &MediaRequestParameters,
1324 to: &MediaRequestParameters,
1325 ) -> Result<(), Self::Error> {
1326 let _timer = timer!("method");
1327
1328 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1329 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1330
1331 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1332 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1333
1334 let conn = self.write().await?;
1335 conn.execute(
1336 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1337 (new_uri, new_format, prev_uri, prev_format),
1338 )
1339 .await?;
1340
1341 Ok(())
1342 }
1343
1344 #[instrument(skip_all)]
1345 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1346 let _timer = timer!("method");
1347
1348 self.media_service.get_media_content(self, request).await
1349 }
1350
1351 #[instrument(skip_all)]
1352 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1353 let _timer = timer!("method");
1354
1355 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1356 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1357
1358 let conn = self.write().await?;
1359 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1360
1361 Ok(())
1362 }
1363
1364 #[instrument(skip(self))]
1365 async fn get_media_content_for_uri(
1366 &self,
1367 uri: &MxcUri,
1368 ) -> Result<Option<Vec<u8>>, Self::Error> {
1369 let _timer = timer!("method");
1370
1371 self.media_service.get_media_content_for_uri(self, uri).await
1372 }
1373
1374 #[instrument(skip(self))]
1375 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1376 let _timer = timer!("method");
1377
1378 let uri = self.encode_key(keys::MEDIA, uri);
1379
1380 let conn = self.write().await?;
1381 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1382
1383 Ok(())
1384 }
1385
1386 #[instrument(skip_all)]
1387 async fn set_media_retention_policy(
1388 &self,
1389 policy: MediaRetentionPolicy,
1390 ) -> Result<(), Self::Error> {
1391 let _timer = timer!("method");
1392
1393 self.media_service.set_media_retention_policy(self, policy).await
1394 }
1395
1396 #[instrument(skip_all)]
1397 fn media_retention_policy(&self) -> MediaRetentionPolicy {
1398 let _timer = timer!("method");
1399
1400 self.media_service.media_retention_policy()
1401 }
1402
1403 #[instrument(skip_all)]
1404 async fn set_ignore_media_retention_policy(
1405 &self,
1406 request: &MediaRequestParameters,
1407 ignore_policy: IgnoreMediaRetentionPolicy,
1408 ) -> Result<(), Self::Error> {
1409 let _timer = timer!("method");
1410
1411 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1412 }
1413
1414 #[instrument(skip_all)]
1415 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1416 let _timer = timer!("method");
1417
1418 self.media_service.clean_up_media_cache(self).await
1419 }
1420}
1421
1422#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1423#[cfg_attr(not(target_family = "wasm"), async_trait)]
1424impl EventCacheStoreMedia for SqliteEventCacheStore {
1425 type Error = Error;
1426
1427 async fn media_retention_policy_inner(
1428 &self,
1429 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1430 let conn = self.read().await?;
1431 conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1432 }
1433
1434 async fn set_media_retention_policy_inner(
1435 &self,
1436 policy: MediaRetentionPolicy,
1437 ) -> Result<(), Self::Error> {
1438 let conn = self.write().await?;
1439 conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1440 Ok(())
1441 }
1442
1443 async fn add_media_content_inner(
1444 &self,
1445 request: &MediaRequestParameters,
1446 data: Vec<u8>,
1447 last_access: SystemTime,
1448 policy: MediaRetentionPolicy,
1449 ignore_policy: IgnoreMediaRetentionPolicy,
1450 ) -> Result<(), Self::Error> {
1451 let ignore_policy = ignore_policy.is_yes();
1452 let data = self.encode_value(data)?;
1453
1454 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1455 return Ok(());
1456 }
1457
1458 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1459 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1460 let timestamp = time_to_timestamp(last_access);
1461
1462 let conn = self.write().await?;
1463 conn.execute(
1464 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1465 (uri, format, data, timestamp, ignore_policy),
1466 )
1467 .await?;
1468
1469 Ok(())
1470 }
1471
1472 async fn set_ignore_media_retention_policy_inner(
1473 &self,
1474 request: &MediaRequestParameters,
1475 ignore_policy: IgnoreMediaRetentionPolicy,
1476 ) -> Result<(), Self::Error> {
1477 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1478 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1479 let ignore_policy = ignore_policy.is_yes();
1480
1481 let conn = self.write().await?;
1482 conn.execute(
1483 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1484 (ignore_policy, uri, format),
1485 )
1486 .await?;
1487
1488 Ok(())
1489 }
1490
1491 async fn get_media_content_inner(
1492 &self,
1493 request: &MediaRequestParameters,
1494 current_time: SystemTime,
1495 ) -> Result<Option<Vec<u8>>, Self::Error> {
1496 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1497 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1498 let timestamp = time_to_timestamp(current_time);
1499
1500 let conn = self.write().await?;
1501 let data = conn
1502 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1503 txn.execute(
1507 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1508 (timestamp, &uri, &format),
1509 )?;
1510
1511 txn.query_row::<Vec<u8>, _, _>(
1512 "SELECT data FROM media WHERE uri = ? AND format = ?",
1513 (&uri, &format),
1514 |row| row.get(0),
1515 )
1516 .optional()
1517 })
1518 .await?;
1519
1520 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1521 }
1522
1523 async fn get_media_content_for_uri_inner(
1524 &self,
1525 uri: &MxcUri,
1526 current_time: SystemTime,
1527 ) -> Result<Option<Vec<u8>>, Self::Error> {
1528 let uri = self.encode_key(keys::MEDIA, uri);
1529 let timestamp = time_to_timestamp(current_time);
1530
1531 let conn = self.write().await?;
1532 let data = conn
1533 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1534 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1538
1539 txn.query_row::<Vec<u8>, _, _>(
1540 "SELECT data FROM media WHERE uri = ?",
1541 (&uri,),
1542 |row| row.get(0),
1543 )
1544 .optional()
1545 })
1546 .await?;
1547
1548 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1549 }
1550
1551 async fn clean_up_media_cache_inner(
1552 &self,
1553 policy: MediaRetentionPolicy,
1554 current_time: SystemTime,
1555 ) -> Result<(), Self::Error> {
1556 if !policy.has_limitations() {
1557 return Ok(());
1559 }
1560
1561 let conn = self.write().await?;
1562 let removed = conn
1563 .with_transaction::<_, Error, _>(move |txn| {
1564 let mut removed = false;
1565
1566 if let Some(max_file_size) = policy.computed_max_file_size() {
1568 let count = txn.execute(
1569 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1570 (max_file_size,),
1571 )?;
1572
1573 if count > 0 {
1574 removed = true;
1575 }
1576 }
1577
1578 if let Some(last_access_expiry) = policy.last_access_expiry {
1580 let current_timestamp = time_to_timestamp(current_time);
1581 let expiry_secs = last_access_expiry.as_secs();
1582 let count = txn.execute(
1583 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1584 (current_timestamp, expiry_secs),
1585 )?;
1586
1587 if count > 0 {
1588 removed = true;
1589 }
1590 }
1591
1592 if let Some(max_cache_size) = policy.max_cache_size {
1594 let cache_size = txn
1597 .query_row(
1598 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1599 (),
1600 |row| {
1601 row.get::<_, Option<u64>>(0)
1603 },
1604 )?
1605 .unwrap_or_default();
1606
1607 if cache_size > max_cache_size {
1609 let mut cached_stmt = txn.prepare_cached(
1611 "SELECT rowid, length(data) FROM media \
1612 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1613 )?;
1614 let content_sizes = cached_stmt
1615 .query(())?
1616 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1617
1618 let mut accumulated_items_size = 0u64;
1619 let mut limit_reached = false;
1620 let mut rows_to_remove = Vec::new();
1621
1622 for result in content_sizes {
1623 let (row_id, size) = match result {
1624 Ok(content_size) => content_size,
1625 Err(error) => {
1626 return Err(error.into());
1627 }
1628 };
1629
1630 if limit_reached {
1631 rows_to_remove.push(row_id);
1632 continue;
1633 }
1634
1635 match accumulated_items_size.checked_add(size) {
1636 Some(acc) if acc > max_cache_size => {
1637 limit_reached = true;
1639 rows_to_remove.push(row_id);
1640 }
1641 Some(acc) => accumulated_items_size = acc,
1642 None => {
1643 limit_reached = true;
1646 rows_to_remove.push(row_id);
1647 }
1648 }
1649 }
1650
1651 if !rows_to_remove.is_empty() {
1652 removed = true;
1653 }
1654
1655 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1656 let sql_params = repeat_vars(row_ids.len());
1657 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1658 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1659 Ok(Vec::<()>::new())
1660 })?;
1661 }
1662 }
1663
1664 txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1665
1666 Ok(removed)
1667 })
1668 .await?;
1669
1670 if removed {
1673 conn.vacuum().await?;
1674 }
1675
1676 Ok(())
1677 }
1678
1679 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1680 let conn = self.read().await?;
1681 conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1682 }
1683}
1684
1685fn find_event_relations_transaction(
1686 store: SqliteEventCacheStore,
1687 hashed_room_id: Key,
1688 hashed_linked_chunk_id: Key,
1689 event_id: OwnedEventId,
1690 filters: Option<Vec<RelationType>>,
1691 txn: &Transaction<'_>,
1692) -> Result<Vec<(Event, Option<Position>)>> {
1693 let get_rows = |row: &rusqlite::Row<'_>| {
1694 Ok((
1695 row.get::<_, Vec<u8>>(0)?,
1696 row.get::<_, Option<u64>>(1)?,
1697 row.get::<_, Option<usize>>(2)?,
1698 ))
1699 };
1700
1701 let collect_results = |transaction| {
1703 let mut related = Vec::new();
1704
1705 for result in transaction {
1706 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1707
1708 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1709
1710 let pos = chunk_id
1713 .zip(index)
1714 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1715
1716 related.push((event, pos));
1717 }
1718
1719 Ok(related)
1720 };
1721
1722 let related = if let Some(filters) = compute_filters_string(filters.as_deref()) {
1723 let question_marks = repeat_vars(filters.len());
1724 let query = format!(
1725 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1726 FROM events
1727 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1728 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1729 );
1730
1731 let filters: Vec<_> = filters.iter().map(|f| f.to_sql().unwrap()).collect();
1732 let parameters = params_from_iter(
1733 [
1734 hashed_linked_chunk_id.to_sql().expect(
1735 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1736 ),
1737 event_id
1738 .as_str()
1739 .to_sql()
1740 .expect("We should be able to convert an event ID to a SQLite value"),
1741 hashed_room_id
1742 .to_sql()
1743 .expect("We should be able to convert a room ID to a SQLite value"),
1744 ]
1745 .into_iter()
1746 .chain(filters),
1747 );
1748
1749 let mut transaction = txn.prepare(&query)?;
1750 let transaction = transaction.query_map(parameters, get_rows)?;
1751
1752 collect_results(transaction)
1753 } else {
1754 let query =
1755 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1756 FROM events
1757 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1758 WHERE relates_to = ? AND room_id = ?";
1759 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1760
1761 let mut transaction = txn.prepare(query)?;
1762 let transaction = transaction.query_map(parameters, get_rows)?;
1763
1764 collect_results(transaction)
1765 };
1766
1767 related
1768}
1769
1770async fn with_immediate_transaction<
1775 T: Send + 'static,
1776 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1777>(
1778 this: &SqliteEventCacheStore,
1779 f: F,
1780) -> Result<T, Error> {
1781 this.write()
1782 .await?
1783 .interact(move |conn| -> Result<T, Error> {
1784 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1788
1789 let code = || -> Result<T, Error> {
1790 let txn = conn.transaction()?;
1791 let res = f(&txn)?;
1792 txn.commit()?;
1793 Ok(res)
1794 };
1795
1796 let res = code();
1797
1798 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1801
1802 res
1803 })
1804 .await
1805 .unwrap()
1807}
1808
1809fn insert_chunk(
1810 txn: &Transaction<'_>,
1811 linked_chunk_id: &Key,
1812 previous: Option<u64>,
1813 new: u64,
1814 next: Option<u64>,
1815 type_str: &str,
1816) -> rusqlite::Result<()> {
1817 txn.execute(
1819 r#"
1820 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1821 VALUES (?, ?, ?, ?, ?)
1822 "#,
1823 (new, linked_chunk_id, previous, next, type_str),
1824 )?;
1825
1826 if let Some(previous) = previous {
1828 txn.execute(
1829 r#"
1830 UPDATE linked_chunks
1831 SET next = ?
1832 WHERE id = ? AND linked_chunk_id = ?
1833 "#,
1834 (new, previous, linked_chunk_id),
1835 )?;
1836 }
1837
1838 if let Some(next) = next {
1840 txn.execute(
1841 r#"
1842 UPDATE linked_chunks
1843 SET previous = ?
1844 WHERE id = ? AND linked_chunk_id = ?
1845 "#,
1846 (new, next, linked_chunk_id),
1847 )?;
1848 }
1849
1850 Ok(())
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855 use std::{
1856 path::PathBuf,
1857 sync::atomic::{AtomicU32, Ordering::SeqCst},
1858 time::Duration,
1859 };
1860
1861 use assert_matches::assert_matches;
1862 use matrix_sdk_base::{
1863 event_cache::{
1864 store::{
1865 integration_tests::{
1866 check_test_event, make_test_event, make_test_event_with_event_id,
1867 },
1868 media::IgnoreMediaRetentionPolicy,
1869 EventCacheStore, EventCacheStoreError,
1870 },
1871 Gap,
1872 },
1873 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1874 event_cache_store_media_integration_tests,
1875 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1876 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1877 };
1878 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1879 use once_cell::sync::Lazy;
1880 use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1881 use tempfile::{tempdir, TempDir};
1882
1883 use super::SqliteEventCacheStore;
1884 use crate::{
1885 event_cache_store::keys,
1886 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1887 SqliteStoreConfig,
1888 };
1889
1890 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1891 static NUM: AtomicU32 = AtomicU32::new(0);
1892
1893 fn new_event_cache_store_workspace() -> PathBuf {
1894 let name = NUM.fetch_add(1, SeqCst).to_string();
1895 TMP_DIR.path().join(name)
1896 }
1897
1898 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1899 let tmpdir_path = new_event_cache_store_workspace();
1900
1901 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1902
1903 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1904 }
1905
1906 event_cache_store_integration_tests!();
1907 event_cache_store_integration_tests_time!();
1908 event_cache_store_media_integration_tests!(with_media_size_tests);
1909
1910 async fn get_event_cache_store_content_sorted_by_last_access(
1911 event_cache_store: &SqliteEventCacheStore,
1912 ) -> Vec<Vec<u8>> {
1913 let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed");
1914 sqlite_db
1915 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1916 stmt.query(())?.mapped(|row| row.get(0)).collect()
1917 })
1918 .await
1919 .expect("querying media cache content by last access failed")
1920 }
1921
1922 #[async_test]
1923 async fn test_pool_size() {
1924 let tmpdir_path = new_event_cache_store_workspace();
1925 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1926
1927 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1928
1929 assert_eq!(store.pool.status().max_size, 42);
1930 }
1931
1932 #[async_test]
1933 async fn test_last_access() {
1934 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1935 let uri = mxc_uri!("mxc://localhost/media");
1936 let file_request = MediaRequestParameters {
1937 source: MediaSource::Plain(uri.to_owned()),
1938 format: MediaFormat::File,
1939 };
1940 let thumbnail_request = MediaRequestParameters {
1941 source: MediaSource::Plain(uri.to_owned()),
1942 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1943 Method::Crop,
1944 uint!(100),
1945 uint!(100),
1946 )),
1947 };
1948
1949 let content: Vec<u8> = "hello world".into();
1950 let thumbnail_content: Vec<u8> = "hello…".into();
1951
1952 event_cache_store
1954 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1955 .await
1956 .expect("adding file failed");
1957
1958 tokio::time::sleep(Duration::from_secs(3)).await;
1961
1962 event_cache_store
1963 .add_media_content(
1964 &thumbnail_request,
1965 thumbnail_content.clone(),
1966 IgnoreMediaRetentionPolicy::No,
1967 )
1968 .await
1969 .expect("adding thumbnail failed");
1970
1971 let contents =
1973 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1974
1975 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1976 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1977 assert_eq!(contents[1], content, "file is not second-to-last access");
1978
1979 tokio::time::sleep(Duration::from_secs(3)).await;
1982
1983 let _ = event_cache_store
1985 .get_media_content(&file_request)
1986 .await
1987 .expect("getting file failed")
1988 .expect("file is missing");
1989
1990 let contents =
1992 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1993
1994 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1995 assert_eq!(contents[0], content, "file is not last access");
1996 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1997 }
1998
1999 #[async_test]
2000 async fn test_linked_chunk_new_items_chunk() {
2001 let store = get_event_cache_store().await.expect("creating cache store failed");
2002
2003 let room_id = &DEFAULT_TEST_ROOM_ID;
2004 let linked_chunk_id = LinkedChunkId::Room(room_id);
2005
2006 store
2007 .handle_linked_chunk_updates(
2008 linked_chunk_id,
2009 vec![
2010 Update::NewItemsChunk {
2011 previous: None,
2012 new: ChunkIdentifier::new(42),
2013 next: None, },
2015 Update::NewItemsChunk {
2016 previous: Some(ChunkIdentifier::new(42)),
2017 new: ChunkIdentifier::new(13),
2018 next: Some(ChunkIdentifier::new(37)), },
2021 Update::NewItemsChunk {
2022 previous: Some(ChunkIdentifier::new(13)),
2023 new: ChunkIdentifier::new(37),
2024 next: None,
2025 },
2026 ],
2027 )
2028 .await
2029 .unwrap();
2030
2031 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2032
2033 assert_eq!(chunks.len(), 3);
2034
2035 {
2036 let c = chunks.remove(0);
2038 assert_eq!(c.identifier, ChunkIdentifier::new(13));
2039 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2040 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
2041 assert_matches!(c.content, ChunkContent::Items(events) => {
2042 assert!(events.is_empty());
2043 });
2044
2045 let c = chunks.remove(0);
2046 assert_eq!(c.identifier, ChunkIdentifier::new(37));
2047 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
2048 assert_eq!(c.next, None);
2049 assert_matches!(c.content, ChunkContent::Items(events) => {
2050 assert!(events.is_empty());
2051 });
2052
2053 let c = chunks.remove(0);
2054 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2055 assert_eq!(c.previous, None);
2056 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
2057 assert_matches!(c.content, ChunkContent::Items(events) => {
2058 assert!(events.is_empty());
2059 });
2060 }
2061 }
2062
2063 #[async_test]
2064 async fn test_linked_chunk_new_gap_chunk() {
2065 let store = get_event_cache_store().await.expect("creating cache store failed");
2066
2067 let room_id = &DEFAULT_TEST_ROOM_ID;
2068 let linked_chunk_id = LinkedChunkId::Room(room_id);
2069
2070 store
2071 .handle_linked_chunk_updates(
2072 linked_chunk_id,
2073 vec![Update::NewGapChunk {
2074 previous: None,
2075 new: ChunkIdentifier::new(42),
2076 next: None,
2077 gap: Gap { prev_token: "raclette".to_owned() },
2078 }],
2079 )
2080 .await
2081 .unwrap();
2082
2083 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2084
2085 assert_eq!(chunks.len(), 1);
2086
2087 let c = chunks.remove(0);
2089 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2090 assert_eq!(c.previous, None);
2091 assert_eq!(c.next, None);
2092 assert_matches!(c.content, ChunkContent::Gap(gap) => {
2093 assert_eq!(gap.prev_token, "raclette");
2094 });
2095 }
2096
2097 #[async_test]
2098 async fn test_linked_chunk_replace_item() {
2099 let store = get_event_cache_store().await.expect("creating cache store failed");
2100
2101 let room_id = &DEFAULT_TEST_ROOM_ID;
2102 let linked_chunk_id = LinkedChunkId::Room(room_id);
2103 let event_id = event_id!("$world");
2104
2105 store
2106 .handle_linked_chunk_updates(
2107 linked_chunk_id,
2108 vec![
2109 Update::NewItemsChunk {
2110 previous: None,
2111 new: ChunkIdentifier::new(42),
2112 next: None,
2113 },
2114 Update::PushItems {
2115 at: Position::new(ChunkIdentifier::new(42), 0),
2116 items: vec![
2117 make_test_event(room_id, "hello"),
2118 make_test_event_with_event_id(room_id, "world", Some(event_id)),
2119 ],
2120 },
2121 Update::ReplaceItem {
2122 at: Position::new(ChunkIdentifier::new(42), 1),
2123 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
2124 },
2125 ],
2126 )
2127 .await
2128 .unwrap();
2129
2130 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2131
2132 assert_eq!(chunks.len(), 1);
2133
2134 let c = chunks.remove(0);
2135 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2136 assert_eq!(c.previous, None);
2137 assert_eq!(c.next, None);
2138 assert_matches!(c.content, ChunkContent::Items(events) => {
2139 assert_eq!(events.len(), 2);
2140 check_test_event(&events[0], "hello");
2141 check_test_event(&events[1], "yolo");
2142 });
2143 }
2144
2145 #[async_test]
2146 async fn test_linked_chunk_remove_chunk() {
2147 let store = get_event_cache_store().await.expect("creating cache store failed");
2148
2149 let room_id = &DEFAULT_TEST_ROOM_ID;
2150 let linked_chunk_id = LinkedChunkId::Room(room_id);
2151
2152 store
2153 .handle_linked_chunk_updates(
2154 linked_chunk_id,
2155 vec![
2156 Update::NewGapChunk {
2157 previous: None,
2158 new: ChunkIdentifier::new(42),
2159 next: None,
2160 gap: Gap { prev_token: "raclette".to_owned() },
2161 },
2162 Update::NewGapChunk {
2163 previous: Some(ChunkIdentifier::new(42)),
2164 new: ChunkIdentifier::new(43),
2165 next: None,
2166 gap: Gap { prev_token: "fondue".to_owned() },
2167 },
2168 Update::NewGapChunk {
2169 previous: Some(ChunkIdentifier::new(43)),
2170 new: ChunkIdentifier::new(44),
2171 next: None,
2172 gap: Gap { prev_token: "tartiflette".to_owned() },
2173 },
2174 Update::RemoveChunk(ChunkIdentifier::new(43)),
2175 ],
2176 )
2177 .await
2178 .unwrap();
2179
2180 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2181
2182 assert_eq!(chunks.len(), 2);
2183
2184 let c = chunks.remove(0);
2186 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2187 assert_eq!(c.previous, None);
2188 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
2189 assert_matches!(c.content, ChunkContent::Gap(gap) => {
2190 assert_eq!(gap.prev_token, "raclette");
2191 });
2192
2193 let c = chunks.remove(0);
2194 assert_eq!(c.identifier, ChunkIdentifier::new(44));
2195 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2196 assert_eq!(c.next, None);
2197 assert_matches!(c.content, ChunkContent::Gap(gap) => {
2198 assert_eq!(gap.prev_token, "tartiflette");
2199 });
2200
2201 let gaps = store
2203 .read()
2204 .await
2205 .unwrap()
2206 .with_transaction(|txn| -> rusqlite::Result<_> {
2207 let mut gaps = Vec::new();
2208 for data in txn
2209 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
2210 .query_map((), |row| row.get::<_, u64>(0))?
2211 {
2212 gaps.push(data?);
2213 }
2214 Ok(gaps)
2215 })
2216 .await
2217 .unwrap();
2218
2219 assert_eq!(gaps, vec![42, 44]);
2220 }
2221
2222 #[async_test]
2223 async fn test_linked_chunk_push_items() {
2224 let store = get_event_cache_store().await.expect("creating cache store failed");
2225
2226 let room_id = &DEFAULT_TEST_ROOM_ID;
2227 let linked_chunk_id = LinkedChunkId::Room(room_id);
2228
2229 store
2230 .handle_linked_chunk_updates(
2231 linked_chunk_id,
2232 vec![
2233 Update::NewItemsChunk {
2234 previous: None,
2235 new: ChunkIdentifier::new(42),
2236 next: None,
2237 },
2238 Update::PushItems {
2239 at: Position::new(ChunkIdentifier::new(42), 0),
2240 items: vec![
2241 make_test_event(room_id, "hello"),
2242 make_test_event(room_id, "world"),
2243 ],
2244 },
2245 Update::PushItems {
2246 at: Position::new(ChunkIdentifier::new(42), 2),
2247 items: vec![make_test_event(room_id, "who?")],
2248 },
2249 ],
2250 )
2251 .await
2252 .unwrap();
2253
2254 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2255
2256 assert_eq!(chunks.len(), 1);
2257
2258 let c = chunks.remove(0);
2259 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2260 assert_eq!(c.previous, None);
2261 assert_eq!(c.next, None);
2262 assert_matches!(c.content, ChunkContent::Items(events) => {
2263 assert_eq!(events.len(), 3);
2264
2265 check_test_event(&events[0], "hello");
2266 check_test_event(&events[1], "world");
2267 check_test_event(&events[2], "who?");
2268 });
2269 }
2270
2271 #[async_test]
2272 async fn test_linked_chunk_remove_item() {
2273 let store = get_event_cache_store().await.expect("creating cache store failed");
2274
2275 let room_id = *DEFAULT_TEST_ROOM_ID;
2276 let linked_chunk_id = LinkedChunkId::Room(room_id);
2277
2278 store
2279 .handle_linked_chunk_updates(
2280 linked_chunk_id,
2281 vec![
2282 Update::NewItemsChunk {
2283 previous: None,
2284 new: ChunkIdentifier::new(42),
2285 next: None,
2286 },
2287 Update::PushItems {
2288 at: Position::new(ChunkIdentifier::new(42), 0),
2289 items: vec![
2290 make_test_event(room_id, "one"),
2291 make_test_event(room_id, "two"),
2292 make_test_event(room_id, "three"),
2293 make_test_event(room_id, "four"),
2294 make_test_event(room_id, "five"),
2295 make_test_event(room_id, "six"),
2296 ],
2297 },
2298 Update::RemoveItem {
2299 at: Position::new(ChunkIdentifier::new(42), 2), },
2301 ],
2302 )
2303 .await
2304 .unwrap();
2305
2306 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2307
2308 assert_eq!(chunks.len(), 1);
2309
2310 let c = chunks.remove(0);
2311 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2312 assert_eq!(c.previous, None);
2313 assert_eq!(c.next, None);
2314 assert_matches!(c.content, ChunkContent::Items(events) => {
2315 assert_eq!(events.len(), 5);
2316 check_test_event(&events[0], "one");
2317 check_test_event(&events[1], "two");
2318 check_test_event(&events[2], "four");
2319 check_test_event(&events[3], "five");
2320 check_test_event(&events[4], "six");
2321 });
2322
2323 let num_rows: u64 = store
2325 .read()
2326 .await
2327 .unwrap()
2328 .with_transaction(move |txn| {
2329 txn.query_row(
2330 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2331 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2332 |row| row.get(0),
2333 )
2334 })
2335 .await
2336 .unwrap();
2337 assert_eq!(num_rows, 3);
2338 }
2339
2340 #[async_test]
2341 async fn test_linked_chunk_detach_last_items() {
2342 let store = get_event_cache_store().await.expect("creating cache store failed");
2343
2344 let room_id = *DEFAULT_TEST_ROOM_ID;
2345 let linked_chunk_id = LinkedChunkId::Room(room_id);
2346
2347 store
2348 .handle_linked_chunk_updates(
2349 linked_chunk_id,
2350 vec![
2351 Update::NewItemsChunk {
2352 previous: None,
2353 new: ChunkIdentifier::new(42),
2354 next: None,
2355 },
2356 Update::PushItems {
2357 at: Position::new(ChunkIdentifier::new(42), 0),
2358 items: vec![
2359 make_test_event(room_id, "hello"),
2360 make_test_event(room_id, "world"),
2361 make_test_event(room_id, "howdy"),
2362 ],
2363 },
2364 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2365 ],
2366 )
2367 .await
2368 .unwrap();
2369
2370 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2371
2372 assert_eq!(chunks.len(), 1);
2373
2374 let c = chunks.remove(0);
2375 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2376 assert_eq!(c.previous, None);
2377 assert_eq!(c.next, None);
2378 assert_matches!(c.content, ChunkContent::Items(events) => {
2379 assert_eq!(events.len(), 1);
2380 check_test_event(&events[0], "hello");
2381 });
2382 }
2383
2384 #[async_test]
2385 async fn test_linked_chunk_start_end_reattach_items() {
2386 let store = get_event_cache_store().await.expect("creating cache store failed");
2387
2388 let room_id = *DEFAULT_TEST_ROOM_ID;
2389 let linked_chunk_id = LinkedChunkId::Room(room_id);
2390
2391 store
2395 .handle_linked_chunk_updates(
2396 linked_chunk_id,
2397 vec![
2398 Update::NewItemsChunk {
2399 previous: None,
2400 new: ChunkIdentifier::new(42),
2401 next: None,
2402 },
2403 Update::PushItems {
2404 at: Position::new(ChunkIdentifier::new(42), 0),
2405 items: vec![
2406 make_test_event(room_id, "hello"),
2407 make_test_event(room_id, "world"),
2408 make_test_event(room_id, "howdy"),
2409 ],
2410 },
2411 Update::StartReattachItems,
2412 Update::EndReattachItems,
2413 ],
2414 )
2415 .await
2416 .unwrap();
2417
2418 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2419
2420 assert_eq!(chunks.len(), 1);
2421
2422 let c = chunks.remove(0);
2423 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2424 assert_eq!(c.previous, None);
2425 assert_eq!(c.next, None);
2426 assert_matches!(c.content, ChunkContent::Items(events) => {
2427 assert_eq!(events.len(), 3);
2428 check_test_event(&events[0], "hello");
2429 check_test_event(&events[1], "world");
2430 check_test_event(&events[2], "howdy");
2431 });
2432 }
2433
2434 #[async_test]
2435 async fn test_linked_chunk_clear() {
2436 let store = get_event_cache_store().await.expect("creating cache store failed");
2437
2438 let room_id = *DEFAULT_TEST_ROOM_ID;
2439 let linked_chunk_id = LinkedChunkId::Room(room_id);
2440 let event_0 = make_test_event(room_id, "hello");
2441 let event_1 = make_test_event(room_id, "world");
2442 let event_2 = make_test_event(room_id, "howdy");
2443
2444 store
2445 .handle_linked_chunk_updates(
2446 linked_chunk_id,
2447 vec![
2448 Update::NewItemsChunk {
2449 previous: None,
2450 new: ChunkIdentifier::new(42),
2451 next: None,
2452 },
2453 Update::NewGapChunk {
2454 previous: Some(ChunkIdentifier::new(42)),
2455 new: ChunkIdentifier::new(54),
2456 next: None,
2457 gap: Gap { prev_token: "fondue".to_owned() },
2458 },
2459 Update::PushItems {
2460 at: Position::new(ChunkIdentifier::new(42), 0),
2461 items: vec![event_0.clone(), event_1, event_2],
2462 },
2463 Update::Clear,
2464 ],
2465 )
2466 .await
2467 .unwrap();
2468
2469 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2470 assert!(chunks.is_empty());
2471
2472 store
2474 .read()
2475 .await
2476 .unwrap()
2477 .with_transaction(|txn| -> rusqlite::Result<_> {
2478 let num_gaps = txn
2479 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2480 .query_row((), |row| row.get::<_, u64>(0))?;
2481 assert_eq!(num_gaps, 0);
2482
2483 let num_events = txn
2484 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2485 .query_row((), |row| row.get::<_, u64>(0))?;
2486 assert_eq!(num_events, 0);
2487
2488 Ok(())
2489 })
2490 .await
2491 .unwrap();
2492
2493 store
2495 .handle_linked_chunk_updates(
2496 linked_chunk_id,
2497 vec![
2498 Update::NewItemsChunk {
2499 previous: None,
2500 new: ChunkIdentifier::new(42),
2501 next: None,
2502 },
2503 Update::PushItems {
2504 at: Position::new(ChunkIdentifier::new(42), 0),
2505 items: vec![event_0],
2506 },
2507 ],
2508 )
2509 .await
2510 .unwrap();
2511 }
2512
2513 #[async_test]
2514 async fn test_linked_chunk_multiple_rooms() {
2515 let store = get_event_cache_store().await.expect("creating cache store failed");
2516
2517 let room1 = room_id!("!realcheeselovers:raclette.fr");
2518 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2519 let room2 = room_id!("!realcheeselovers:fondue.ch");
2520 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2521
2522 store
2526 .handle_linked_chunk_updates(
2527 linked_chunk_id1,
2528 vec![
2529 Update::NewItemsChunk {
2530 previous: None,
2531 new: ChunkIdentifier::new(42),
2532 next: None,
2533 },
2534 Update::PushItems {
2535 at: Position::new(ChunkIdentifier::new(42), 0),
2536 items: vec![
2537 make_test_event(room1, "best cheese is raclette"),
2538 make_test_event(room1, "obviously"),
2539 ],
2540 },
2541 ],
2542 )
2543 .await
2544 .unwrap();
2545
2546 store
2547 .handle_linked_chunk_updates(
2548 linked_chunk_id2,
2549 vec![
2550 Update::NewItemsChunk {
2551 previous: None,
2552 new: ChunkIdentifier::new(42),
2553 next: None,
2554 },
2555 Update::PushItems {
2556 at: Position::new(ChunkIdentifier::new(42), 0),
2557 items: vec![make_test_event(room1, "beaufort is the best")],
2558 },
2559 ],
2560 )
2561 .await
2562 .unwrap();
2563
2564 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2566 assert_eq!(chunks_room1.len(), 1);
2567
2568 let c = chunks_room1.remove(0);
2569 assert_matches!(c.content, ChunkContent::Items(events) => {
2570 assert_eq!(events.len(), 2);
2571 check_test_event(&events[0], "best cheese is raclette");
2572 check_test_event(&events[1], "obviously");
2573 });
2574
2575 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2577 assert_eq!(chunks_room2.len(), 1);
2578
2579 let c = chunks_room2.remove(0);
2580 assert_matches!(c.content, ChunkContent::Items(events) => {
2581 assert_eq!(events.len(), 1);
2582 check_test_event(&events[0], "beaufort is the best");
2583 });
2584 }
2585
2586 #[async_test]
2587 async fn test_linked_chunk_update_is_a_transaction() {
2588 let store = get_event_cache_store().await.expect("creating cache store failed");
2589
2590 let room_id = *DEFAULT_TEST_ROOM_ID;
2591 let linked_chunk_id = LinkedChunkId::Room(room_id);
2592
2593 let err = store
2596 .handle_linked_chunk_updates(
2597 linked_chunk_id,
2598 vec![
2599 Update::NewItemsChunk {
2600 previous: None,
2601 new: ChunkIdentifier::new(42),
2602 next: None,
2603 },
2604 Update::NewItemsChunk {
2605 previous: None,
2606 new: ChunkIdentifier::new(42),
2607 next: None,
2608 },
2609 ],
2610 )
2611 .await
2612 .unwrap_err();
2613
2614 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2616 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2617 });
2618
2619 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2623 assert!(chunks.is_empty());
2624 }
2625
2626 #[async_test]
2627 async fn test_filter_duplicate_events_no_events() {
2628 let store = get_event_cache_store().await.expect("creating cache store failed");
2629
2630 let room_id = *DEFAULT_TEST_ROOM_ID;
2631 let linked_chunk_id = LinkedChunkId::Room(room_id);
2632 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2633 assert!(duplicates.is_empty());
2634 }
2635
2636 #[async_test]
2637 async fn test_load_last_chunk() {
2638 let room_id = room_id!("!r0:matrix.org");
2639 let linked_chunk_id = LinkedChunkId::Room(room_id);
2640 let event = |msg: &str| make_test_event(room_id, msg);
2641 let store = get_event_cache_store().await.expect("creating cache store failed");
2642
2643 {
2645 let (last_chunk, chunk_identifier_generator) =
2646 store.load_last_chunk(linked_chunk_id).await.unwrap();
2647
2648 assert!(last_chunk.is_none());
2649 assert_eq!(chunk_identifier_generator.current(), 0);
2650 }
2651
2652 {
2654 store
2655 .handle_linked_chunk_updates(
2656 linked_chunk_id,
2657 vec![
2658 Update::NewItemsChunk {
2659 previous: None,
2660 new: ChunkIdentifier::new(42),
2661 next: None,
2662 },
2663 Update::PushItems {
2664 at: Position::new(ChunkIdentifier::new(42), 0),
2665 items: vec![event("saucisse de morteau"), event("comté")],
2666 },
2667 ],
2668 )
2669 .await
2670 .unwrap();
2671
2672 let (last_chunk, chunk_identifier_generator) =
2673 store.load_last_chunk(linked_chunk_id).await.unwrap();
2674
2675 assert_matches!(last_chunk, Some(last_chunk) => {
2676 assert_eq!(last_chunk.identifier, 42);
2677 assert!(last_chunk.previous.is_none());
2678 assert!(last_chunk.next.is_none());
2679 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2680 assert_eq!(items.len(), 2);
2681 check_test_event(&items[0], "saucisse de morteau");
2682 check_test_event(&items[1], "comté");
2683 });
2684 });
2685 assert_eq!(chunk_identifier_generator.current(), 42);
2686 }
2687
2688 {
2690 store
2691 .handle_linked_chunk_updates(
2692 linked_chunk_id,
2693 vec![
2694 Update::NewItemsChunk {
2695 previous: Some(ChunkIdentifier::new(42)),
2696 new: ChunkIdentifier::new(7),
2697 next: None,
2698 },
2699 Update::PushItems {
2700 at: Position::new(ChunkIdentifier::new(7), 0),
2701 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2702 },
2703 ],
2704 )
2705 .await
2706 .unwrap();
2707
2708 let (last_chunk, chunk_identifier_generator) =
2709 store.load_last_chunk(linked_chunk_id).await.unwrap();
2710
2711 assert_matches!(last_chunk, Some(last_chunk) => {
2712 assert_eq!(last_chunk.identifier, 7);
2713 assert_matches!(last_chunk.previous, Some(previous) => {
2714 assert_eq!(previous, 42);
2715 });
2716 assert!(last_chunk.next.is_none());
2717 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2718 assert_eq!(items.len(), 3);
2719 check_test_event(&items[0], "fondue");
2720 check_test_event(&items[1], "gruyère");
2721 check_test_event(&items[2], "mont d'or");
2722 });
2723 });
2724 assert_eq!(chunk_identifier_generator.current(), 42);
2725 }
2726 }
2727
2728 #[async_test]
2729 async fn test_load_last_chunk_with_a_cycle() {
2730 let room_id = room_id!("!r0:matrix.org");
2731 let linked_chunk_id = LinkedChunkId::Room(room_id);
2732 let store = get_event_cache_store().await.expect("creating cache store failed");
2733
2734 store
2735 .handle_linked_chunk_updates(
2736 linked_chunk_id,
2737 vec![
2738 Update::NewItemsChunk {
2739 previous: None,
2740 new: ChunkIdentifier::new(0),
2741 next: None,
2742 },
2743 Update::NewItemsChunk {
2744 previous: Some(ChunkIdentifier::new(0)),
2748 new: ChunkIdentifier::new(1),
2749 next: Some(ChunkIdentifier::new(0)),
2750 },
2751 ],
2752 )
2753 .await
2754 .unwrap();
2755
2756 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2757 }
2758
2759 #[async_test]
2760 async fn test_load_previous_chunk() {
2761 let room_id = room_id!("!r0:matrix.org");
2762 let linked_chunk_id = LinkedChunkId::Room(room_id);
2763 let event = |msg: &str| make_test_event(room_id, msg);
2764 let store = get_event_cache_store().await.expect("creating cache store failed");
2765
2766 {
2769 let previous_chunk = store
2770 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2771 .await
2772 .unwrap();
2773
2774 assert!(previous_chunk.is_none());
2775 }
2776
2777 {
2780 store
2781 .handle_linked_chunk_updates(
2782 linked_chunk_id,
2783 vec![Update::NewItemsChunk {
2784 previous: None,
2785 new: ChunkIdentifier::new(42),
2786 next: None,
2787 }],
2788 )
2789 .await
2790 .unwrap();
2791
2792 let previous_chunk =
2793 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2794
2795 assert!(previous_chunk.is_none());
2796 }
2797
2798 {
2800 store
2801 .handle_linked_chunk_updates(
2802 linked_chunk_id,
2803 vec![
2804 Update::NewItemsChunk {
2806 previous: None,
2807 new: ChunkIdentifier::new(7),
2808 next: Some(ChunkIdentifier::new(42)),
2809 },
2810 Update::PushItems {
2811 at: Position::new(ChunkIdentifier::new(7), 0),
2812 items: vec![event("brigand du jorat"), event("morbier")],
2813 },
2814 ],
2815 )
2816 .await
2817 .unwrap();
2818
2819 let previous_chunk =
2820 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2821
2822 assert_matches!(previous_chunk, Some(previous_chunk) => {
2823 assert_eq!(previous_chunk.identifier, 7);
2824 assert!(previous_chunk.previous.is_none());
2825 assert_matches!(previous_chunk.next, Some(next) => {
2826 assert_eq!(next, 42);
2827 });
2828 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2829 assert_eq!(items.len(), 2);
2830 check_test_event(&items[0], "brigand du jorat");
2831 check_test_event(&items[1], "morbier");
2832 });
2833 });
2834 }
2835 }
2836}
2837
2838#[cfg(test)]
2839mod encrypted_tests {
2840 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2841
2842 use matrix_sdk_base::{
2843 event_cache::store::{EventCacheStore, EventCacheStoreError},
2844 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2845 event_cache_store_media_integration_tests,
2846 };
2847 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2848 use once_cell::sync::Lazy;
2849 use ruma::{
2850 event_id,
2851 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2852 room_id, user_id,
2853 };
2854 use tempfile::{tempdir, TempDir};
2855
2856 use super::SqliteEventCacheStore;
2857
2858 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2859 static NUM: AtomicU32 = AtomicU32::new(0);
2860
2861 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2862 let name = NUM.fetch_add(1, SeqCst).to_string();
2863 let tmpdir_path = TMP_DIR.path().join(name);
2864
2865 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2866
2867 Ok(SqliteEventCacheStore::open(
2868 tmpdir_path.to_str().unwrap(),
2869 Some("default_test_password"),
2870 )
2871 .await
2872 .unwrap())
2873 }
2874
2875 event_cache_store_integration_tests!();
2876 event_cache_store_integration_tests_time!();
2877 event_cache_store_media_integration_tests!();
2878
2879 #[async_test]
2880 async fn test_no_sqlite_injection_in_find_event_relations() {
2881 let room_id = room_id!("!test:localhost");
2882 let another_room_id = room_id!("!r1:matrix.org");
2883 let sender = user_id!("@alice:localhost");
2884
2885 let store = get_event_cache_store()
2886 .await
2887 .expect("We should be able to create a new, empty, event cache store");
2888
2889 let f = EventFactory::new().room(room_id).sender(sender);
2890
2891 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2893 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2894
2895 let edit_id = event_id!("$find_me:matrix.org");
2897 let edit = f
2898 .text_msg("Find me")
2899 .event_id(edit_id)
2900 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2901 .into_event();
2902
2903 let f = f.room(another_room_id);
2905
2906 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2907 let another_event =
2908 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2909
2910 store.save_event(room_id, event).await.unwrap();
2912 store.save_event(room_id, edit).await.unwrap();
2913 store.save_event(another_room_id, another_event).await.unwrap();
2914
2915 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2919
2920 let results = store
2922 .find_event_relations(room_id, event_id, filter.as_deref())
2923 .await
2924 .expect("We should be able to attempt to find event relations");
2925
2926 similar_asserts::assert_eq!(
2928 results.len(),
2929 1,
2930 "We should only have loaded events for the first room {results:#?}"
2931 );
2932
2933 let (found_event, _) = &results[0];
2935 assert_eq!(
2936 found_event.event_id().as_deref(),
2937 Some(edit_id),
2938 "The single event we found should be the edit event"
2939 );
2940 }
2941}