matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{
25            compute_filters_string, extract_event_relation,
26            media::{
27                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28                MediaService,
29            },
30            EventCacheStore,
31        },
32        Event, Gap,
33    },
34    linked_chunk::{
35        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
36        Position, RawChunk, Update,
37    },
38    media::{MediaRequestParameters, UniqueKey},
39    timer,
40};
41use matrix_sdk_store_encryption::StoreCipher;
42use ruma::{
43    events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
44    OwnedEventId, RoomId,
45};
46use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
47use tokio::{
48    fs,
49    sync::{Mutex, OwnedMutexGuard},
50};
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54    error::{Error, Result},
55    utils::{
56        repeat_vars, time_to_timestamp, EncryptableStore, Key, SqliteAsyncConnExt,
57        SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt,
58    },
59    OpenStoreError, SqliteStoreConfig,
60};
61
62mod keys {
63    // Entries in Key-value store
64    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
65    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
66
67    // Tables
68    pub const LINKED_CHUNKS: &str = "linked_chunks";
69    pub const MEDIA: &str = "media";
70}
71
72/// The database name.
73const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
74
75/// Identifier of the latest database version.
76///
77/// This is used to figure whether the SQLite database requires a migration.
78/// Every new SQL migration should imply a bump of this number, and changes in
79/// the [`run_migrations`] function.
80const DATABASE_VERSION: u8 = 8;
81
82/// The string used to identify a chunk of type events, in the `type` field in
83/// the database.
84const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
85/// The string used to identify a chunk of type gap, in the `type` field in the
86/// database.
87const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
88
89/// An SQLite-based event cache store.
90#[derive(Clone)]
91pub struct SqliteEventCacheStore {
92    store_cipher: Option<Arc<StoreCipher>>,
93
94    /// The pool of connections.
95    pool: SqlitePool,
96
97    /// We make the difference between connections for read operations, and for
98    /// write operations. We keep a single connection apart from write
99    /// operations. All other connections are used for read operations. The
100    /// lock is used to ensure there is one owner at a time.
101    write_connection: Arc<Mutex<SqliteAsyncConn>>,
102
103    media_service: MediaService,
104}
105
106#[cfg(not(tarpaulin_include))]
107impl fmt::Debug for SqliteEventCacheStore {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
110    }
111}
112
113impl EncryptableStore for SqliteEventCacheStore {
114    fn get_cypher(&self) -> Option<&StoreCipher> {
115        self.store_cipher.as_deref()
116    }
117}
118
119impl SqliteEventCacheStore {
120    /// Open the SQLite-based event cache store at the given path using the
121    /// given passphrase to encrypt private data.
122    pub async fn open(
123        path: impl AsRef<Path>,
124        passphrase: Option<&str>,
125    ) -> Result<Self, OpenStoreError> {
126        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
127    }
128
129    /// Open the SQLite-based event cache store with the config open config.
130    #[instrument(skip(config), fields(path = ?config.path))]
131    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
132        debug!(?config);
133
134        let _timer = timer!("open_with_config");
135
136        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
137
138        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
139
140        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
141        config.pool = Some(pool_config);
142
143        let pool = config.create_pool(Runtime::Tokio1)?;
144
145        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
146        this.write().await?.apply_runtime_config(runtime_config).await?;
147
148        Ok(this)
149    }
150
151    /// Open an SQLite-based event cache store using the given SQLite database
152    /// pool. The given passphrase will be used to encrypt private data.
153    async fn open_with_pool(
154        pool: SqlitePool,
155        passphrase: Option<&str>,
156    ) -> Result<Self, OpenStoreError> {
157        let conn = pool.get().await?;
158
159        let version = conn.db_version().await?;
160        run_migrations(&conn, version).await?;
161
162        let store_cipher = match passphrase {
163            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
164            None => None,
165        };
166
167        let media_service = MediaService::new();
168        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
169        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
170        media_service.restore(media_retention_policy, last_media_cleanup_time);
171
172        Ok(Self {
173            store_cipher,
174            pool,
175            // Use `conn` as our selected write connections.
176            write_connection: Arc::new(Mutex::new(conn)),
177            media_service,
178        })
179    }
180
181    // Acquire a connection for executing read operations.
182    #[instrument(skip_all)]
183    async fn read(&self) -> Result<SqliteAsyncConn> {
184        trace!("Taking a `read` connection");
185        let _timer = timer!("connection");
186
187        let connection = self.pool.get().await?;
188
189        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
190        // support must be enabled on a per-connection basis. Execute it every
191        // time we try to get a connection, since we can't guarantee a previous
192        // connection did enable it before.
193        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
194
195        Ok(connection)
196    }
197
198    // Acquire a connection for executing write operations.
199    #[instrument(skip_all)]
200    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
201        trace!("Taking a `write` connection");
202        let _timer = timer!("connection");
203
204        let connection = self.write_connection.clone().lock_owned().await;
205
206        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
207        // support must be enabled on a per-connection basis. Execute it every
208        // time we try to get a connection, since we can't guarantee a previous
209        // connection did enable it before.
210        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
211
212        Ok(connection)
213    }
214
215    fn map_row_to_chunk(
216        row: &rusqlite::Row<'_>,
217    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
218        Ok((
219            row.get::<_, u64>(0)?,
220            row.get::<_, Option<u64>>(1)?,
221            row.get::<_, Option<u64>>(2)?,
222            row.get::<_, String>(3)?,
223        ))
224    }
225
226    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
227        let serialized = serde_json::to_vec(event)?;
228
229        // Extract the relationship info here.
230        let raw_event = event.raw();
231        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
232
233        // The content may be encrypted.
234        let content = self.encode_value(serialized)?;
235
236        Ok(EncodedEvent {
237            content,
238            rel_type,
239            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
240        })
241    }
242}
243
244struct EncodedEvent {
245    content: Vec<u8>,
246    rel_type: Option<String>,
247    relates_to: Option<String>,
248}
249
250trait TransactionExtForLinkedChunks {
251    fn rebuild_chunk(
252        &self,
253        store: &SqliteEventCacheStore,
254        linked_chunk_id: &Key,
255        previous: Option<u64>,
256        index: u64,
257        next: Option<u64>,
258        chunk_type: &str,
259    ) -> Result<RawChunk<Event, Gap>>;
260
261    fn load_gap_content(
262        &self,
263        store: &SqliteEventCacheStore,
264        linked_chunk_id: &Key,
265        chunk_id: ChunkIdentifier,
266    ) -> Result<Gap>;
267
268    fn load_events_content(
269        &self,
270        store: &SqliteEventCacheStore,
271        linked_chunk_id: &Key,
272        chunk_id: ChunkIdentifier,
273    ) -> Result<Vec<Event>>;
274}
275
276impl TransactionExtForLinkedChunks for Transaction<'_> {
277    fn rebuild_chunk(
278        &self,
279        store: &SqliteEventCacheStore,
280        linked_chunk_id: &Key,
281        previous: Option<u64>,
282        id: u64,
283        next: Option<u64>,
284        chunk_type: &str,
285    ) -> Result<RawChunk<Event, Gap>> {
286        let previous = previous.map(ChunkIdentifier::new);
287        let next = next.map(ChunkIdentifier::new);
288        let id = ChunkIdentifier::new(id);
289
290        match chunk_type {
291            CHUNK_TYPE_GAP_TYPE_STRING => {
292                // It's a gap!
293                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
294                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
295            }
296
297            CHUNK_TYPE_EVENT_TYPE_STRING => {
298                // It's events!
299                let events = self.load_events_content(store, linked_chunk_id, id)?;
300                Ok(RawChunk {
301                    content: ChunkContent::Items(events),
302                    previous,
303                    identifier: id,
304                    next,
305                })
306            }
307
308            other => {
309                // It's an error!
310                Err(Error::InvalidData {
311                    details: format!("a linked chunk has an unknown type {other}"),
312                })
313            }
314        }
315    }
316
317    fn load_gap_content(
318        &self,
319        store: &SqliteEventCacheStore,
320        linked_chunk_id: &Key,
321        chunk_id: ChunkIdentifier,
322    ) -> Result<Gap> {
323        // There's at most one row for it in the database, so a call to `query_row` is
324        // sufficient.
325        let encoded_prev_token: Vec<u8> = self.query_row(
326            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
327            (chunk_id.index(), &linked_chunk_id),
328            |row| row.get(0),
329        )?;
330        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
331        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
332        Ok(Gap { prev_token })
333    }
334
335    fn load_events_content(
336        &self,
337        store: &SqliteEventCacheStore,
338        linked_chunk_id: &Key,
339        chunk_id: ChunkIdentifier,
340    ) -> Result<Vec<Event>> {
341        // Retrieve all the events from the database.
342        let mut events = Vec::new();
343
344        for event_data in self
345            .prepare(
346                r#"
347                    SELECT events.content
348                    FROM event_chunks ec, events
349                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
350                    ORDER BY ec.position ASC
351                "#,
352            )?
353            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
354        {
355            let encoded_content = event_data?;
356            let serialized_content = store.decode_value(&encoded_content)?;
357            let event = serde_json::from_slice(&serialized_content)?;
358
359            events.push(event);
360        }
361
362        Ok(events)
363    }
364}
365
366/// Run migrations for the given version of the database.
367async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
368    if version == 0 {
369        debug!("Creating database");
370    } else if version < DATABASE_VERSION {
371        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
372    } else {
373        return Ok(());
374    }
375
376    // Always enable foreign keys for the current connection.
377    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
378
379    if version < 1 {
380        // First turn on WAL mode, this can't be done in the transaction, it fails with
381        // the error message: "cannot change into wal mode from within a transaction".
382        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
383        conn.with_transaction(|txn| {
384            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
385            txn.set_db_version(1)
386        })
387        .await?;
388    }
389
390    if version < 2 {
391        conn.with_transaction(|txn| {
392            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
393            txn.set_db_version(2)
394        })
395        .await?;
396    }
397
398    if version < 3 {
399        conn.with_transaction(|txn| {
400            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
401            txn.set_db_version(3)
402        })
403        .await?;
404    }
405
406    if version < 4 {
407        conn.with_transaction(|txn| {
408            txn.execute_batch(include_str!(
409                "../migrations/event_cache_store/004_ignore_policy.sql"
410            ))?;
411            txn.set_db_version(4)
412        })
413        .await?;
414    }
415
416    if version < 5 {
417        conn.with_transaction(|txn| {
418            txn.execute_batch(include_str!(
419                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
420            ))?;
421            txn.set_db_version(5)
422        })
423        .await?;
424    }
425
426    if version < 6 {
427        conn.with_transaction(|txn| {
428            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
429            txn.set_db_version(6)
430        })
431        .await?;
432    }
433
434    if version < 7 {
435        conn.with_transaction(|txn| {
436            txn.execute_batch(include_str!(
437                "../migrations/event_cache_store/007_event_chunks.sql"
438            ))?;
439            txn.set_db_version(7)
440        })
441        .await?;
442    }
443
444    if version < 8 {
445        conn.with_transaction(|txn| {
446            txn.execute_batch(include_str!(
447                "../migrations/event_cache_store/008_linked_chunk_id.sql"
448            ))?;
449            txn.set_db_version(8)
450        })
451        .await?;
452    }
453
454    Ok(())
455}
456
457#[async_trait]
458impl EventCacheStore for SqliteEventCacheStore {
459    type Error = Error;
460
461    #[instrument(skip(self))]
462    async fn try_take_leased_lock(
463        &self,
464        lease_duration_ms: u32,
465        key: &str,
466        holder: &str,
467    ) -> Result<bool> {
468        let _timer = timer!("method");
469
470        let key = key.to_owned();
471        let holder = holder.to_owned();
472
473        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
474        let expiration = now + lease_duration_ms as u64;
475
476        let num_touched = self
477            .write()
478            .await?
479            .with_transaction(move |txn| {
480                txn.execute(
481                    "INSERT INTO lease_locks (key, holder, expiration)
482                    VALUES (?1, ?2, ?3)
483                    ON CONFLICT (key)
484                    DO
485                        UPDATE SET holder = ?2, expiration = ?3
486                        WHERE holder = ?2
487                        OR expiration < ?4
488                ",
489                    (key, holder, expiration, now),
490                )
491            })
492            .await?;
493
494        Ok(num_touched == 1)
495    }
496
497    #[instrument(skip(self, updates))]
498    async fn handle_linked_chunk_updates(
499        &self,
500        linked_chunk_id: LinkedChunkId<'_>,
501        updates: Vec<Update<Event, Gap>>,
502    ) -> Result<(), Self::Error> {
503        let _timer = timer!("method");
504
505        // Use a single transaction throughout this function, so that either all updates
506        // work, or none is taken into account.
507        let hashed_linked_chunk_id =
508            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
509        let linked_chunk_id = linked_chunk_id.to_owned();
510        let this = self.clone();
511
512        with_immediate_transaction(self, move |txn| {
513            for up in updates {
514                match up {
515                    Update::NewItemsChunk { previous, new, next } => {
516                        let previous = previous.as_ref().map(ChunkIdentifier::index);
517                        let new = new.index();
518                        let next = next.as_ref().map(ChunkIdentifier::index);
519
520                        trace!(
521                            %linked_chunk_id,
522                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
523                        );
524
525                        insert_chunk(
526                            txn,
527                            &hashed_linked_chunk_id,
528                            previous,
529                            new,
530                            next,
531                            CHUNK_TYPE_EVENT_TYPE_STRING,
532                        )?;
533                    }
534
535                    Update::NewGapChunk { previous, new, next, gap } => {
536                        let serialized = serde_json::to_vec(&gap.prev_token)?;
537                        let prev_token = this.encode_value(serialized)?;
538
539                        let previous = previous.as_ref().map(ChunkIdentifier::index);
540                        let new = new.index();
541                        let next = next.as_ref().map(ChunkIdentifier::index);
542
543                        trace!(
544                            %linked_chunk_id,
545                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
546                        );
547
548                        // Insert the chunk as a gap.
549                        insert_chunk(
550                            txn,
551                            &hashed_linked_chunk_id,
552                            previous,
553                            new,
554                            next,
555                            CHUNK_TYPE_GAP_TYPE_STRING,
556                        )?;
557
558                        // Insert the gap's value.
559                        txn.execute(
560                            r#"
561                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
562                            VALUES (?, ?, ?)
563                        "#,
564                            (new, &hashed_linked_chunk_id, prev_token),
565                        )?;
566                    }
567
568                    Update::RemoveChunk(chunk_identifier) => {
569                        let chunk_id = chunk_identifier.index();
570
571                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
572
573                        // Find chunk to delete.
574                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
575                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
576                            (chunk_id, &hashed_linked_chunk_id),
577                            |row| Ok((row.get(0)?, row.get(1)?))
578                        )?;
579
580                        // Replace its previous' next to its own next.
581                        if let Some(previous) = previous {
582                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
583                        }
584
585                        // Replace its next' previous to its own previous.
586                        if let Some(next) = next {
587                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
588                        }
589
590                        // Now delete it, and let cascading delete corresponding entries in the
591                        // other data tables.
592                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
593                    }
594
595                    Update::PushItems { at, items } => {
596                        if items.is_empty() {
597                            // Should never happens, but better be safe.
598                            continue;
599                        }
600
601                        let chunk_id = at.chunk_identifier().index();
602
603                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
604
605                        let mut chunk_statement = txn.prepare(
606                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
607                        )?;
608
609                        // Note: we use `OR REPLACE` here, because the event might have been
610                        // already inserted in the database. This is the case when an event is
611                        // deduplicated and moved to another position; or because it was inserted
612                        // outside the context of a linked chunk (e.g. pinned event).
613                        let mut content_statement = txn.prepare(
614                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
615                        )?;
616
617                        let invalid_event = |event: TimelineEvent| {
618                            let Some(event_id) = event.event_id() else {
619                                error!(%linked_chunk_id, "Trying to push an event with no ID");
620                                return None;
621                            };
622
623                            Some((event_id.to_string(), event))
624                        };
625
626                        let room_id = linked_chunk_id.room_id();
627                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
628
629                        for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
630                            // Insert the location information into the database.
631                            let index = at.index() + i;
632                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
633
634                            // Now, insert the event content into the database.
635                            let encoded_event = this.encode_event(&event)?;
636                            content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
637                        }
638                    }
639
640                    Update::ReplaceItem { at, item: event } => {
641                        let chunk_id = at.chunk_identifier().index();
642
643                        let index = at.index();
644
645                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
646
647                        // The event id should be the same, but just in case it changed…
648                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
649                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
650                            continue;
651                        };
652
653                        // Replace the event's content. Really we'd like to update, but in case the
654                        // event id changed, we are a bit lenient here and will allow an insertion
655                        // of the new event.
656                        let encoded_event = this.encode_event(&event)?;
657                        let room_id = linked_chunk_id.room_id();
658                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
659                        txn.execute(
660                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
661                        , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
662
663                        // Replace the event id in the linked chunk, in case it changed.
664                        txn.execute(
665                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
666                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
667                        )?;
668                    }
669
670                    Update::RemoveItem { at } => {
671                        let chunk_id = at.chunk_identifier().index();
672                        let index = at.index();
673
674                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
675
676                        // Remove the entry in the chunk table.
677                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
678
679                        // Decrement the index of each item after the one we are
680                        // going to remove.
681                        //
682                        // Imagine we have the following events:
683                        //
684                        // | event_id | linked_chunk_id | chunk_id | position |
685                        // |----------|-----------------|----------|----------|
686                        // | $ev0     | !r0             | 42       | 0        |
687                        // | $ev1     | !r0             | 42       | 1        |
688                        // | $ev2     | !r0             | 42       | 2        |
689                        // | $ev3     | !r0             | 42       | 3        |
690                        // | $ev4     | !r0             | 42       | 4        |
691                        // 
692                        // `$ev2` has been removed, then we end up in this
693                        // state:
694                        //
695                        // | event_id | linked_chunk_id    | chunk_id | position |
696                        // |----------|--------------------|----------|----------|
697                        // | $ev0     | !r0                | 42       | 0        |
698                        // | $ev1     | !r0                | 42       | 1        |
699                        // |          |                    |          |          | <- no more `$ev2`
700                        // | $ev3     | !r0                | 42       | 3        |
701                        // | $ev4     | !r0                | 42       | 4        |
702                        //
703                        // We need to shift the `position` of `$ev3` and `$ev4`
704                        // to `position - 1`, like so:
705                        // 
706                        // | event_id | linked_chunk_id | chunk_id | position |
707                        // |----------|-----------------|----------|----------|
708                        // | $ev0     | !r0             | 42       | 0        |
709                        // | $ev1     | !r0             | 42       | 1        |
710                        // | $ev3     | !r0             | 42       | 2        |
711                        // | $ev4     | !r0             | 42       | 3        |
712                        //
713                        // Usually, it boils down to run the following query:
714                        //
715                        // ```sql
716                        // UPDATE event_chunks
717                        // SET position = position - 1
718                        // WHERE position > 2 AND …
719                        // ```
720                        //
721                        // Okay. But `UPDATE` runs on rows in no particular
722                        // order. It means that it can update `$ev4` before
723                        // `$ev3` for example. What happens in this particular
724                        // case? The `position` of `$ev4` becomes `3`, however
725                        // `$ev3` already has `position = 3`. Because there
726                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
727                        // position)`, it will result in a constraint violation.
728                        //
729                        // There is **no way** to control the execution order of
730                        // `UPDATE` in SQLite. To persuade yourself, try:
731                        //
732                        // ```sql
733                        // UPDATE event_chunks
734                        // SET position = position - 1
735                        // FROM (
736                        //     SELECT event_id
737                        //     FROM event_chunks
738                        //     WHERE position > 2 AND …
739                        //     ORDER BY position ASC
740                        // ) as ordered
741                        // WHERE event_chunks.event_id = ordered.event_id
742                        // ```
743                        //
744                        // It will fail the same way.
745                        //
746                        // Thus, we have 2 solutions:
747                        //
748                        // 1. Remove the `UNIQUE` constraint,
749                        // 2. Be creative.
750                        //
751                        // The `UNIQUE` constraint is a safe belt. Normally, we
752                        // have `event_cache::Deduplicator` that is responsible
753                        // to ensure there is no duplicated event. However,
754                        // relying on this is “fragile” in the sense it can
755                        // contain bugs. Relying on the `UNIQUE` constraint from
756                        // SQLite is more robust. It's “braces and belt” as we
757                        // say here.
758                        //
759                        // So. We need to be creative.
760                        //
761                        // Many solutions exist. Amongst the most popular, we
762                        // see _dropping and re-creating the index_, which is
763                        // no-go for us, it's too expensive. I (@hywan) have
764                        // adopted the following one:
765                        // 
766                        // - Do `position = position - 1` but in the negative
767                        //   space, so `position = -(position - 1)`. A position
768                        //   cannot be negative; we are sure it is unique!
769                        // - Once all candidate rows are updated, do `position =
770                        //   -position` to move back to the positive space.
771                        //
772                        // 'told you it's gonna be creative.
773                        //
774                        // This solution is a hack, **but** it is a small
775                        // number of operations, and we can keep the `UNIQUE`
776                        // constraint in place.
777                        txn.execute(
778                            r#"
779                                UPDATE event_chunks
780                                SET position = -(position - 1)
781                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
782                            "#,
783                            (&hashed_linked_chunk_id, chunk_id, index)
784                        )?;
785                        txn.execute(
786                            r#"
787                                UPDATE event_chunks
788                                SET position = -position
789                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
790                            "#,
791                            (&hashed_linked_chunk_id, chunk_id)
792                        )?;
793                    }
794
795                    Update::DetachLastItems { at } => {
796                        let chunk_id = at.chunk_identifier().index();
797                        let index = at.index();
798
799                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
800
801                        // Remove these entries.
802                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
803                    }
804
805                    Update::Clear => {
806                        trace!(%linked_chunk_id, "clearing items");
807
808                        // Remove chunks, and let cascading do its job.
809                        txn.execute(
810                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
811                            (&hashed_linked_chunk_id,),
812                        )?;
813                    }
814
815                    Update::StartReattachItems | Update::EndReattachItems => {
816                        // Nothing.
817                    }
818                }
819            }
820
821            Ok(())
822        })
823        .await?;
824
825        Ok(())
826    }
827
828    #[instrument(skip(self))]
829    async fn load_all_chunks(
830        &self,
831        linked_chunk_id: LinkedChunkId<'_>,
832    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
833        let _timer = timer!("method");
834
835        let hashed_linked_chunk_id =
836            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
837
838        let this = self.clone();
839
840        let result = self
841            .read()
842            .await?
843            .with_transaction(move |txn| -> Result<_> {
844                let mut items = Vec::new();
845
846                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
847                for data in txn
848                    .prepare(
849                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
850                    )?
851                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
852                {
853                    let (id, previous, next, chunk_type) = data?;
854                    let new = txn.rebuild_chunk(
855                        &this,
856                        &hashed_linked_chunk_id,
857                        previous,
858                        id,
859                        next,
860                        chunk_type.as_str(),
861                    )?;
862                    items.push(new);
863                }
864
865                Ok(items)
866            })
867            .await?;
868
869        Ok(result)
870    }
871
872    #[instrument(skip(self))]
873    async fn load_all_chunks_metadata(
874        &self,
875        linked_chunk_id: LinkedChunkId<'_>,
876    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
877        let _timer = timer!("method");
878
879        let hashed_linked_chunk_id =
880            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
881
882        self.read()
883            .await?
884            .with_transaction(move |txn| -> Result<_> {
885                // We want to collect the metadata about each chunk (id, next, previous), and
886                // for event chunks, the number of events in it. For gaps, the
887                // number of events is 0, by convention.
888                //
889                // We've tried different strategies over time:
890                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
891                //   caused a full table traversal for each chunk, including for gaps which
892                //   don't have any events. This happened in
893                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
894                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
895                //   additional `SELECT` query. It was an immense improvement, but still caused
896                //   one select query per event chunk. This happened in
897                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
898                //
899                // The current solution is to run two queries:
900                // - one to get each chunk and its number of events, by doing a single `SELECT`
901                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
902                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
903                //   hashmap.
904                // - one to get each chunk's metadata (id, previous, next, type) from the
905                //   database with a `SELECT`, and then use the hashmap to get the number of
906                //   events.
907                //
908                // This strategy minimizes the number of queries to the database, and keeps them
909                // super simple, while doing a bit more processing here, which is much faster.
910
911                let num_events_by_chunk_ids = txn
912                    .prepare(
913                        r#"
914                            SELECT ec.chunk_id, COUNT(ec.event_id)
915                            FROM event_chunks as ec
916                            WHERE ec.linked_chunk_id = ?
917                            GROUP BY ec.chunk_id
918                        "#,
919                    )?
920                    .query_map((&hashed_linked_chunk_id,), |row| {
921                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
922                    })?
923                    .collect::<Result<HashMap<_, _>, _>>()?;
924
925                txn.prepare(
926                    r#"
927                        SELECT
928                            lc.id,
929                            lc.previous,
930                            lc.next,
931                            lc.type
932                        FROM linked_chunks as lc
933                        WHERE lc.linked_chunk_id = ?
934                        ORDER BY lc.id"#,
935                )?
936                .query_map((&hashed_linked_chunk_id,), |row| {
937                    Ok((
938                        row.get::<_, u64>(0)?,
939                        row.get::<_, Option<u64>>(1)?,
940                        row.get::<_, Option<u64>>(2)?,
941                        row.get::<_, String>(3)?,
942                    ))
943                })?
944                .map(|data| -> Result<_> {
945                    let (id, previous, next, chunk_type) = data?;
946
947                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
948                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
949                    // benchmarking shows that this is slightly slower than matching the chunk
950                    // type (around 1%, so in the realm of noise), so we keep the explicit
951                    // check instead.
952                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
953                        0
954                    } else {
955                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
956                    };
957
958                    Ok(ChunkMetadata {
959                        identifier: ChunkIdentifier::new(id),
960                        previous: previous.map(ChunkIdentifier::new),
961                        next: next.map(ChunkIdentifier::new),
962                        num_items,
963                    })
964                })
965                .collect::<Result<Vec<_>, _>>()
966            })
967            .await
968    }
969
970    #[instrument(skip(self))]
971    async fn load_last_chunk(
972        &self,
973        linked_chunk_id: LinkedChunkId<'_>,
974    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
975        let _timer = timer!("method");
976
977        let hashed_linked_chunk_id =
978            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
979
980        let this = self.clone();
981
982        self
983            .read()
984            .await?
985            .with_transaction(move |txn| -> Result<_> {
986                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
987                let (chunk_identifier_generator, number_of_chunks) = txn
988                    .prepare(
989                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
990                    )?
991                    .query_row(
992                        (&hashed_linked_chunk_id,),
993                        |row| {
994                            Ok((
995                                // Read the `MAX(id)` as an `Option<u64>` instead
996                                // of `u64` in case the `SELECT` returns nothing.
997                                // Indeed, if it returns no line, the `MAX(id)` is
998                                // set to `Null`.
999                                row.get::<_, Option<u64>>(0)?,
1000                                row.get::<_, u64>(1)?,
1001                            ))
1002                        }
1003                    )?;
1004
1005                let chunk_identifier_generator = match chunk_identifier_generator {
1006                    Some(last_chunk_identifier) => {
1007                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1008                            ChunkIdentifier::new(last_chunk_identifier)
1009                        )
1010                    },
1011                    None => ChunkIdentifierGenerator::new_from_scratch(),
1012                };
1013
1014                // Find the last chunk.
1015                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1016                    .prepare(
1017                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1018                    )?
1019                    .query_row(
1020                        (&hashed_linked_chunk_id,),
1021                        |row| {
1022                            Ok((
1023                                row.get::<_, u64>(0)?,
1024                                row.get::<_, Option<u64>>(1)?,
1025                                row.get::<_, String>(2)?,
1026                            ))
1027                        }
1028                    )
1029                    .optional()?
1030                else {
1031                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1032                    // good.
1033                    if number_of_chunks == 0 {
1034                        return Ok((None, chunk_identifier_generator));
1035                    }
1036                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1037                    // linked chunk is malformed.
1038                    //
1039                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1040                    else {
1041                        return Err(Error::InvalidData {
1042                            details:
1043                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1044                                    .to_owned()
1045                            }
1046                        )
1047                    }
1048                };
1049
1050                // Build the chunk.
1051                let last_chunk = txn.rebuild_chunk(
1052                    &this,
1053                    &hashed_linked_chunk_id,
1054                    previous_chunk,
1055                    chunk_identifier,
1056                    None,
1057                    &chunk_type
1058                )?;
1059
1060                Ok((Some(last_chunk), chunk_identifier_generator))
1061            })
1062            .await
1063    }
1064
1065    #[instrument(skip(self))]
1066    async fn load_previous_chunk(
1067        &self,
1068        linked_chunk_id: LinkedChunkId<'_>,
1069        before_chunk_identifier: ChunkIdentifier,
1070    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1071        let _timer = timer!("method");
1072
1073        let hashed_linked_chunk_id =
1074            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1075
1076        let this = self.clone();
1077
1078        self
1079            .read()
1080            .await?
1081            .with_transaction(move |txn| -> Result<_> {
1082                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1083                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1084                    .prepare(
1085                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1086                    )?
1087                    .query_row(
1088                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1089                        |row| {
1090                            Ok((
1091                                row.get::<_, u64>(0)?,
1092                                row.get::<_, Option<u64>>(1)?,
1093                                row.get::<_, Option<u64>>(2)?,
1094                                row.get::<_, String>(3)?,
1095                            ))
1096                        }
1097                    )
1098                    .optional()?
1099                else {
1100                    // Chunk is not found.
1101                    return Ok(None);
1102                };
1103
1104                // Build the chunk.
1105                let last_chunk = txn.rebuild_chunk(
1106                    &this,
1107                    &hashed_linked_chunk_id,
1108                    previous_chunk,
1109                    chunk_identifier,
1110                    next_chunk,
1111                    &chunk_type
1112                )?;
1113
1114                Ok(Some(last_chunk))
1115            })
1116            .await
1117    }
1118
1119    #[instrument(skip(self))]
1120    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1121        let _timer = timer!("method");
1122
1123        self.write()
1124            .await?
1125            .with_transaction(move |txn| {
1126                // Remove all the chunks, and let cascading do its job.
1127                txn.execute("DELETE FROM linked_chunks", ())?;
1128                // Also clear all the events' contents.
1129                txn.execute("DELETE FROM events", ())
1130            })
1131            .await?;
1132
1133        Ok(())
1134    }
1135
1136    #[instrument(skip(self, events))]
1137    async fn filter_duplicated_events(
1138        &self,
1139        linked_chunk_id: LinkedChunkId<'_>,
1140        events: Vec<OwnedEventId>,
1141    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1142        let _timer = timer!("method");
1143
1144        // If there's no events for which we want to check duplicates, we can return
1145        // early. It's not only an optimization to do so: it's required, otherwise the
1146        // `repeat_vars` call below will panic.
1147        if events.is_empty() {
1148            return Ok(Vec::new());
1149        }
1150
1151        // Select all events that exist in the store, i.e. the duplicates.
1152        let hashed_linked_chunk_id =
1153            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1154        let linked_chunk_id = linked_chunk_id.to_owned();
1155
1156        self.read()
1157            .await?
1158            .with_transaction(move |txn| -> Result<_> {
1159                txn.chunk_large_query_over(events, None, move |txn, events| {
1160                    let query = format!(
1161                        r#"
1162                            SELECT event_id, chunk_id, position
1163                            FROM event_chunks
1164                            WHERE linked_chunk_id = ? AND event_id IN ({})
1165                            ORDER BY chunk_id ASC, position ASC
1166                        "#,
1167                        repeat_vars(events.len()),
1168                    );
1169
1170                    let parameters = params_from_iter(
1171                        // parameter for `linked_chunk_id = ?`
1172                        once(
1173                            hashed_linked_chunk_id
1174                                .to_sql()
1175                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1176                                .unwrap(),
1177                        )
1178                        // parameters for `event_id IN (…)`
1179                        .chain(events.iter().map(|event| {
1180                            event
1181                                .as_str()
1182                                .to_sql()
1183                                // SAFETY: it cannot fail since `str::to_sql` never fails
1184                                .unwrap()
1185                        })),
1186                    );
1187
1188                    let mut duplicated_events = Vec::new();
1189
1190                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1191                        Ok((
1192                            row.get::<_, String>(0)?,
1193                            row.get::<_, u64>(1)?,
1194                            row.get::<_, usize>(2)?,
1195                        ))
1196                    })? {
1197                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1198
1199                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1200                            // Normally unreachable, but the event ID has been stored even if it is
1201                            // malformed, let's skip it.
1202                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1203                            continue;
1204                        };
1205
1206                        duplicated_events.push((
1207                            duplicated_event,
1208                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1209                        ));
1210                    }
1211
1212                    Ok(duplicated_events)
1213                })
1214            })
1215            .await
1216    }
1217
1218    #[instrument(skip(self, event_id))]
1219    async fn find_event(
1220        &self,
1221        room_id: &RoomId,
1222        event_id: &EventId,
1223    ) -> Result<Option<Event>, Self::Error> {
1224        let _timer = timer!("method");
1225
1226        let event_id = event_id.to_owned();
1227        let this = self.clone();
1228
1229        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1230
1231        self.read()
1232            .await?
1233            .with_transaction(move |txn| -> Result<_> {
1234                let Some(event) = txn
1235                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1236                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1237                    .optional()?
1238                else {
1239                    // Event is not found.
1240                    return Ok(None);
1241                };
1242
1243                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1244
1245                Ok(Some(event))
1246            })
1247            .await
1248    }
1249
1250    #[instrument(skip(self, event_id, filters))]
1251    async fn find_event_relations(
1252        &self,
1253        room_id: &RoomId,
1254        event_id: &EventId,
1255        filters: Option<&[RelationType]>,
1256    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1257        let _timer = timer!("method");
1258
1259        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1260
1261        let hashed_linked_chunk_id =
1262            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1263
1264        let event_id = event_id.to_owned();
1265        let filters = filters.map(ToOwned::to_owned);
1266        let store = self.clone();
1267
1268        self.read()
1269            .await?
1270            .with_transaction(move |txn| -> Result<_> {
1271                find_event_relations_transaction(
1272                    store,
1273                    hashed_room_id,
1274                    hashed_linked_chunk_id,
1275                    event_id,
1276                    filters,
1277                    txn,
1278                )
1279            })
1280            .await
1281    }
1282
1283    #[instrument(skip(self, event))]
1284    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1285        let _timer = timer!("method");
1286
1287        let Some(event_id) = event.event_id() else {
1288            error!(%room_id, "Trying to save an event with no ID");
1289            return Ok(());
1290        };
1291
1292        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1293        let event_id = event_id.to_string();
1294        let encoded_event = self.encode_event(&event)?;
1295
1296        self.write()
1297            .await?
1298            .with_transaction(move |txn| -> Result<_> {
1299                txn.execute(
1300                    "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1301                    , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1302
1303                Ok(())
1304            })
1305            .await
1306    }
1307
1308    #[instrument(skip_all)]
1309    async fn add_media_content(
1310        &self,
1311        request: &MediaRequestParameters,
1312        content: Vec<u8>,
1313        ignore_policy: IgnoreMediaRetentionPolicy,
1314    ) -> Result<()> {
1315        let _timer = timer!("method");
1316
1317        self.media_service.add_media_content(self, request, content, ignore_policy).await
1318    }
1319
1320    #[instrument(skip_all)]
1321    async fn replace_media_key(
1322        &self,
1323        from: &MediaRequestParameters,
1324        to: &MediaRequestParameters,
1325    ) -> Result<(), Self::Error> {
1326        let _timer = timer!("method");
1327
1328        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1329        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1330
1331        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1332        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1333
1334        let conn = self.write().await?;
1335        conn.execute(
1336            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1337            (new_uri, new_format, prev_uri, prev_format),
1338        )
1339        .await?;
1340
1341        Ok(())
1342    }
1343
1344    #[instrument(skip_all)]
1345    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1346        let _timer = timer!("method");
1347
1348        self.media_service.get_media_content(self, request).await
1349    }
1350
1351    #[instrument(skip_all)]
1352    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1353        let _timer = timer!("method");
1354
1355        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1356        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1357
1358        let conn = self.write().await?;
1359        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1360
1361        Ok(())
1362    }
1363
1364    #[instrument(skip(self))]
1365    async fn get_media_content_for_uri(
1366        &self,
1367        uri: &MxcUri,
1368    ) -> Result<Option<Vec<u8>>, Self::Error> {
1369        let _timer = timer!("method");
1370
1371        self.media_service.get_media_content_for_uri(self, uri).await
1372    }
1373
1374    #[instrument(skip(self))]
1375    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1376        let _timer = timer!("method");
1377
1378        let uri = self.encode_key(keys::MEDIA, uri);
1379
1380        let conn = self.write().await?;
1381        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1382
1383        Ok(())
1384    }
1385
1386    #[instrument(skip_all)]
1387    async fn set_media_retention_policy(
1388        &self,
1389        policy: MediaRetentionPolicy,
1390    ) -> Result<(), Self::Error> {
1391        let _timer = timer!("method");
1392
1393        self.media_service.set_media_retention_policy(self, policy).await
1394    }
1395
1396    #[instrument(skip_all)]
1397    fn media_retention_policy(&self) -> MediaRetentionPolicy {
1398        let _timer = timer!("method");
1399
1400        self.media_service.media_retention_policy()
1401    }
1402
1403    #[instrument(skip_all)]
1404    async fn set_ignore_media_retention_policy(
1405        &self,
1406        request: &MediaRequestParameters,
1407        ignore_policy: IgnoreMediaRetentionPolicy,
1408    ) -> Result<(), Self::Error> {
1409        let _timer = timer!("method");
1410
1411        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1412    }
1413
1414    #[instrument(skip_all)]
1415    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1416        let _timer = timer!("method");
1417
1418        self.media_service.clean_up_media_cache(self).await
1419    }
1420}
1421
1422#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1423#[cfg_attr(not(target_family = "wasm"), async_trait)]
1424impl EventCacheStoreMedia for SqliteEventCacheStore {
1425    type Error = Error;
1426
1427    async fn media_retention_policy_inner(
1428        &self,
1429    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1430        let conn = self.read().await?;
1431        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1432    }
1433
1434    async fn set_media_retention_policy_inner(
1435        &self,
1436        policy: MediaRetentionPolicy,
1437    ) -> Result<(), Self::Error> {
1438        let conn = self.write().await?;
1439        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1440        Ok(())
1441    }
1442
1443    async fn add_media_content_inner(
1444        &self,
1445        request: &MediaRequestParameters,
1446        data: Vec<u8>,
1447        last_access: SystemTime,
1448        policy: MediaRetentionPolicy,
1449        ignore_policy: IgnoreMediaRetentionPolicy,
1450    ) -> Result<(), Self::Error> {
1451        let ignore_policy = ignore_policy.is_yes();
1452        let data = self.encode_value(data)?;
1453
1454        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1455            return Ok(());
1456        }
1457
1458        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1459        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1460        let timestamp = time_to_timestamp(last_access);
1461
1462        let conn = self.write().await?;
1463        conn.execute(
1464            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1465            (uri, format, data, timestamp, ignore_policy),
1466        )
1467        .await?;
1468
1469        Ok(())
1470    }
1471
1472    async fn set_ignore_media_retention_policy_inner(
1473        &self,
1474        request: &MediaRequestParameters,
1475        ignore_policy: IgnoreMediaRetentionPolicy,
1476    ) -> Result<(), Self::Error> {
1477        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1478        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1479        let ignore_policy = ignore_policy.is_yes();
1480
1481        let conn = self.write().await?;
1482        conn.execute(
1483            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1484            (ignore_policy, uri, format),
1485        )
1486        .await?;
1487
1488        Ok(())
1489    }
1490
1491    async fn get_media_content_inner(
1492        &self,
1493        request: &MediaRequestParameters,
1494        current_time: SystemTime,
1495    ) -> Result<Option<Vec<u8>>, Self::Error> {
1496        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1497        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1498        let timestamp = time_to_timestamp(current_time);
1499
1500        let conn = self.write().await?;
1501        let data = conn
1502            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1503                // Update the last access.
1504                // We need to do this first so the transaction is in write mode right away.
1505                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1506                txn.execute(
1507                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1508                    (timestamp, &uri, &format),
1509                )?;
1510
1511                txn.query_row::<Vec<u8>, _, _>(
1512                    "SELECT data FROM media WHERE uri = ? AND format = ?",
1513                    (&uri, &format),
1514                    |row| row.get(0),
1515                )
1516                .optional()
1517            })
1518            .await?;
1519
1520        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1521    }
1522
1523    async fn get_media_content_for_uri_inner(
1524        &self,
1525        uri: &MxcUri,
1526        current_time: SystemTime,
1527    ) -> Result<Option<Vec<u8>>, Self::Error> {
1528        let uri = self.encode_key(keys::MEDIA, uri);
1529        let timestamp = time_to_timestamp(current_time);
1530
1531        let conn = self.write().await?;
1532        let data = conn
1533            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1534                // Update the last access.
1535                // We need to do this first so the transaction is in write mode right away.
1536                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1537                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1538
1539                txn.query_row::<Vec<u8>, _, _>(
1540                    "SELECT data FROM media WHERE uri = ?",
1541                    (&uri,),
1542                    |row| row.get(0),
1543                )
1544                .optional()
1545            })
1546            .await?;
1547
1548        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1549    }
1550
1551    async fn clean_up_media_cache_inner(
1552        &self,
1553        policy: MediaRetentionPolicy,
1554        current_time: SystemTime,
1555    ) -> Result<(), Self::Error> {
1556        if !policy.has_limitations() {
1557            // We can safely skip all the checks.
1558            return Ok(());
1559        }
1560
1561        let conn = self.write().await?;
1562        let removed = conn
1563            .with_transaction::<_, Error, _>(move |txn| {
1564                let mut removed = false;
1565
1566                // First, check media content that exceed the max filesize.
1567                if let Some(max_file_size) = policy.computed_max_file_size() {
1568                    let count = txn.execute(
1569                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1570                        (max_file_size,),
1571                    )?;
1572
1573                    if count > 0 {
1574                        removed = true;
1575                    }
1576                }
1577
1578                // Then, clean up expired media content.
1579                if let Some(last_access_expiry) = policy.last_access_expiry {
1580                    let current_timestamp = time_to_timestamp(current_time);
1581                    let expiry_secs = last_access_expiry.as_secs();
1582                    let count = txn.execute(
1583                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1584                        (current_timestamp, expiry_secs),
1585                    )?;
1586
1587                    if count > 0 {
1588                        removed = true;
1589                    }
1590                }
1591
1592                // Finally, if the cache size is too big, remove old items until it fits.
1593                if let Some(max_cache_size) = policy.max_cache_size {
1594                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
1595                    // during the conversion of the result.
1596                    let cache_size = txn
1597                        .query_row(
1598                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1599                            (),
1600                            |row| {
1601                                // `sum()` returns `NULL` if there are no rows.
1602                                row.get::<_, Option<u64>>(0)
1603                            },
1604                        )?
1605                        .unwrap_or_default();
1606
1607                    // If the cache size is overflowing or bigger than max cache size, clean up.
1608                    if cache_size > max_cache_size {
1609                        // Get the sizes of the media contents ordered by last access.
1610                        let mut cached_stmt = txn.prepare_cached(
1611                            "SELECT rowid, length(data) FROM media \
1612                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1613                        )?;
1614                        let content_sizes = cached_stmt
1615                            .query(())?
1616                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1617
1618                        let mut accumulated_items_size = 0u64;
1619                        let mut limit_reached = false;
1620                        let mut rows_to_remove = Vec::new();
1621
1622                        for result in content_sizes {
1623                            let (row_id, size) = match result {
1624                                Ok(content_size) => content_size,
1625                                Err(error) => {
1626                                    return Err(error.into());
1627                                }
1628                            };
1629
1630                            if limit_reached {
1631                                rows_to_remove.push(row_id);
1632                                continue;
1633                            }
1634
1635                            match accumulated_items_size.checked_add(size) {
1636                                Some(acc) if acc > max_cache_size => {
1637                                    // We can stop accumulating.
1638                                    limit_reached = true;
1639                                    rows_to_remove.push(row_id);
1640                                }
1641                                Some(acc) => accumulated_items_size = acc,
1642                                None => {
1643                                    // The accumulated size is overflowing but the setting cannot be
1644                                    // bigger than usize::MAX, we can stop accumulating.
1645                                    limit_reached = true;
1646                                    rows_to_remove.push(row_id);
1647                                }
1648                            }
1649                        }
1650
1651                        if !rows_to_remove.is_empty() {
1652                            removed = true;
1653                        }
1654
1655                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1656                            let sql_params = repeat_vars(row_ids.len());
1657                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1658                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1659                            Ok(Vec::<()>::new())
1660                        })?;
1661                    }
1662                }
1663
1664                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1665
1666                Ok(removed)
1667            })
1668            .await?;
1669
1670        // If we removed media, defragment the database and free space on the
1671        // filesystem.
1672        if removed {
1673            conn.vacuum().await?;
1674        }
1675
1676        Ok(())
1677    }
1678
1679    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1680        let conn = self.read().await?;
1681        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1682    }
1683}
1684
1685fn find_event_relations_transaction(
1686    store: SqliteEventCacheStore,
1687    hashed_room_id: Key,
1688    hashed_linked_chunk_id: Key,
1689    event_id: OwnedEventId,
1690    filters: Option<Vec<RelationType>>,
1691    txn: &Transaction<'_>,
1692) -> Result<Vec<(Event, Option<Position>)>> {
1693    let get_rows = |row: &rusqlite::Row<'_>| {
1694        Ok((
1695            row.get::<_, Vec<u8>>(0)?,
1696            row.get::<_, Option<u64>>(1)?,
1697            row.get::<_, Option<usize>>(2)?,
1698        ))
1699    };
1700
1701    // Collect related events.
1702    let collect_results = |transaction| {
1703        let mut related = Vec::new();
1704
1705        for result in transaction {
1706            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1707
1708            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1709
1710            // Only build the position if both the chunk_id and position were present; in
1711            // theory, they should either be present at the same time, or not at all.
1712            let pos = chunk_id
1713                .zip(index)
1714                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1715
1716            related.push((event, pos));
1717        }
1718
1719        Ok(related)
1720    };
1721
1722    let related = if let Some(filters) = compute_filters_string(filters.as_deref()) {
1723        let question_marks = repeat_vars(filters.len());
1724        let query = format!(
1725            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1726            FROM events
1727            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1728            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1729        );
1730
1731        let filters: Vec<_> = filters.iter().map(|f| f.to_sql().unwrap()).collect();
1732        let parameters = params_from_iter(
1733            [
1734                hashed_linked_chunk_id.to_sql().expect(
1735                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1736                ),
1737                event_id
1738                    .as_str()
1739                    .to_sql()
1740                    .expect("We should be able to convert an event ID to a SQLite value"),
1741                hashed_room_id
1742                    .to_sql()
1743                    .expect("We should be able to convert a room ID to a SQLite value"),
1744            ]
1745            .into_iter()
1746            .chain(filters),
1747        );
1748
1749        let mut transaction = txn.prepare(&query)?;
1750        let transaction = transaction.query_map(parameters, get_rows)?;
1751
1752        collect_results(transaction)
1753    } else {
1754        let query =
1755            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1756            FROM events
1757            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1758            WHERE relates_to = ? AND room_id = ?";
1759        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1760
1761        let mut transaction = txn.prepare(query)?;
1762        let transaction = transaction.query_map(parameters, get_rows)?;
1763
1764        collect_results(transaction)
1765    };
1766
1767    related
1768}
1769
1770/// Like `deadpool::managed::Object::with_transaction`, but starts the
1771/// transaction in immediate (write) mode from the beginning, precluding errors
1772/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1773/// both reads and writes, and start with a write.
1774async fn with_immediate_transaction<
1775    T: Send + 'static,
1776    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1777>(
1778    this: &SqliteEventCacheStore,
1779    f: F,
1780) -> Result<T, Error> {
1781    this.write()
1782        .await?
1783        .interact(move |conn| -> Result<T, Error> {
1784            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1785            // to avoid read transactions upgrading to write mode and causing
1786            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1787            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1788
1789            let code = || -> Result<T, Error> {
1790                let txn = conn.transaction()?;
1791                let res = f(&txn)?;
1792                txn.commit()?;
1793                Ok(res)
1794            };
1795
1796            let res = code();
1797
1798            // Reset the transaction behavior to use Deferred, after this transaction has
1799            // been run, whether it was successful or not.
1800            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1801
1802            res
1803        })
1804        .await
1805        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1806        .unwrap()
1807}
1808
1809fn insert_chunk(
1810    txn: &Transaction<'_>,
1811    linked_chunk_id: &Key,
1812    previous: Option<u64>,
1813    new: u64,
1814    next: Option<u64>,
1815    type_str: &str,
1816) -> rusqlite::Result<()> {
1817    // First, insert the new chunk.
1818    txn.execute(
1819        r#"
1820            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1821            VALUES (?, ?, ?, ?, ?)
1822        "#,
1823        (new, linked_chunk_id, previous, next, type_str),
1824    )?;
1825
1826    // If this chunk has a previous one, update its `next` field.
1827    if let Some(previous) = previous {
1828        txn.execute(
1829            r#"
1830                UPDATE linked_chunks
1831                SET next = ?
1832                WHERE id = ? AND linked_chunk_id = ?
1833            "#,
1834            (new, previous, linked_chunk_id),
1835        )?;
1836    }
1837
1838    // If this chunk has a next one, update its `previous` field.
1839    if let Some(next) = next {
1840        txn.execute(
1841            r#"
1842                UPDATE linked_chunks
1843                SET previous = ?
1844                WHERE id = ? AND linked_chunk_id = ?
1845            "#,
1846            (new, next, linked_chunk_id),
1847        )?;
1848    }
1849
1850    Ok(())
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855    use std::{
1856        path::PathBuf,
1857        sync::atomic::{AtomicU32, Ordering::SeqCst},
1858        time::Duration,
1859    };
1860
1861    use assert_matches::assert_matches;
1862    use matrix_sdk_base::{
1863        event_cache::{
1864            store::{
1865                integration_tests::{
1866                    check_test_event, make_test_event, make_test_event_with_event_id,
1867                },
1868                media::IgnoreMediaRetentionPolicy,
1869                EventCacheStore, EventCacheStoreError,
1870            },
1871            Gap,
1872        },
1873        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1874        event_cache_store_media_integration_tests,
1875        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1876        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1877    };
1878    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1879    use once_cell::sync::Lazy;
1880    use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1881    use tempfile::{tempdir, TempDir};
1882
1883    use super::SqliteEventCacheStore;
1884    use crate::{
1885        event_cache_store::keys,
1886        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1887        SqliteStoreConfig,
1888    };
1889
1890    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1891    static NUM: AtomicU32 = AtomicU32::new(0);
1892
1893    fn new_event_cache_store_workspace() -> PathBuf {
1894        let name = NUM.fetch_add(1, SeqCst).to_string();
1895        TMP_DIR.path().join(name)
1896    }
1897
1898    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1899        let tmpdir_path = new_event_cache_store_workspace();
1900
1901        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1902
1903        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1904    }
1905
1906    event_cache_store_integration_tests!();
1907    event_cache_store_integration_tests_time!();
1908    event_cache_store_media_integration_tests!(with_media_size_tests);
1909
1910    async fn get_event_cache_store_content_sorted_by_last_access(
1911        event_cache_store: &SqliteEventCacheStore,
1912    ) -> Vec<Vec<u8>> {
1913        let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed");
1914        sqlite_db
1915            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1916                stmt.query(())?.mapped(|row| row.get(0)).collect()
1917            })
1918            .await
1919            .expect("querying media cache content by last access failed")
1920    }
1921
1922    #[async_test]
1923    async fn test_pool_size() {
1924        let tmpdir_path = new_event_cache_store_workspace();
1925        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1926
1927        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1928
1929        assert_eq!(store.pool.status().max_size, 42);
1930    }
1931
1932    #[async_test]
1933    async fn test_last_access() {
1934        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1935        let uri = mxc_uri!("mxc://localhost/media");
1936        let file_request = MediaRequestParameters {
1937            source: MediaSource::Plain(uri.to_owned()),
1938            format: MediaFormat::File,
1939        };
1940        let thumbnail_request = MediaRequestParameters {
1941            source: MediaSource::Plain(uri.to_owned()),
1942            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1943                Method::Crop,
1944                uint!(100),
1945                uint!(100),
1946            )),
1947        };
1948
1949        let content: Vec<u8> = "hello world".into();
1950        let thumbnail_content: Vec<u8> = "hello…".into();
1951
1952        // Add the media.
1953        event_cache_store
1954            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1955            .await
1956            .expect("adding file failed");
1957
1958        // Since the precision of the timestamp is in seconds, wait so the timestamps
1959        // differ.
1960        tokio::time::sleep(Duration::from_secs(3)).await;
1961
1962        event_cache_store
1963            .add_media_content(
1964                &thumbnail_request,
1965                thumbnail_content.clone(),
1966                IgnoreMediaRetentionPolicy::No,
1967            )
1968            .await
1969            .expect("adding thumbnail failed");
1970
1971        // File's last access is older than thumbnail.
1972        let contents =
1973            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1974
1975        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1976        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1977        assert_eq!(contents[1], content, "file is not second-to-last access");
1978
1979        // Since the precision of the timestamp is in seconds, wait so the timestamps
1980        // differ.
1981        tokio::time::sleep(Duration::from_secs(3)).await;
1982
1983        // Access the file so its last access is more recent.
1984        let _ = event_cache_store
1985            .get_media_content(&file_request)
1986            .await
1987            .expect("getting file failed")
1988            .expect("file is missing");
1989
1990        // File's last access is more recent than thumbnail.
1991        let contents =
1992            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1993
1994        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1995        assert_eq!(contents[0], content, "file is not last access");
1996        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1997    }
1998
1999    #[async_test]
2000    async fn test_linked_chunk_new_items_chunk() {
2001        let store = get_event_cache_store().await.expect("creating cache store failed");
2002
2003        let room_id = &DEFAULT_TEST_ROOM_ID;
2004        let linked_chunk_id = LinkedChunkId::Room(room_id);
2005
2006        store
2007            .handle_linked_chunk_updates(
2008                linked_chunk_id,
2009                vec![
2010                    Update::NewItemsChunk {
2011                        previous: None,
2012                        new: ChunkIdentifier::new(42),
2013                        next: None, // Note: the store must link the next entry itself.
2014                    },
2015                    Update::NewItemsChunk {
2016                        previous: Some(ChunkIdentifier::new(42)),
2017                        new: ChunkIdentifier::new(13),
2018                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
2019                                                               * the next link ahead of time. */
2020                    },
2021                    Update::NewItemsChunk {
2022                        previous: Some(ChunkIdentifier::new(13)),
2023                        new: ChunkIdentifier::new(37),
2024                        next: None,
2025                    },
2026                ],
2027            )
2028            .await
2029            .unwrap();
2030
2031        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2032
2033        assert_eq!(chunks.len(), 3);
2034
2035        {
2036            // Chunks are ordered from smaller to bigger IDs.
2037            let c = chunks.remove(0);
2038            assert_eq!(c.identifier, ChunkIdentifier::new(13));
2039            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2040            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
2041            assert_matches!(c.content, ChunkContent::Items(events) => {
2042                assert!(events.is_empty());
2043            });
2044
2045            let c = chunks.remove(0);
2046            assert_eq!(c.identifier, ChunkIdentifier::new(37));
2047            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
2048            assert_eq!(c.next, None);
2049            assert_matches!(c.content, ChunkContent::Items(events) => {
2050                assert!(events.is_empty());
2051            });
2052
2053            let c = chunks.remove(0);
2054            assert_eq!(c.identifier, ChunkIdentifier::new(42));
2055            assert_eq!(c.previous, None);
2056            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
2057            assert_matches!(c.content, ChunkContent::Items(events) => {
2058                assert!(events.is_empty());
2059            });
2060        }
2061    }
2062
2063    #[async_test]
2064    async fn test_linked_chunk_new_gap_chunk() {
2065        let store = get_event_cache_store().await.expect("creating cache store failed");
2066
2067        let room_id = &DEFAULT_TEST_ROOM_ID;
2068        let linked_chunk_id = LinkedChunkId::Room(room_id);
2069
2070        store
2071            .handle_linked_chunk_updates(
2072                linked_chunk_id,
2073                vec![Update::NewGapChunk {
2074                    previous: None,
2075                    new: ChunkIdentifier::new(42),
2076                    next: None,
2077                    gap: Gap { prev_token: "raclette".to_owned() },
2078                }],
2079            )
2080            .await
2081            .unwrap();
2082
2083        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2084
2085        assert_eq!(chunks.len(), 1);
2086
2087        // Chunks are ordered from smaller to bigger IDs.
2088        let c = chunks.remove(0);
2089        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2090        assert_eq!(c.previous, None);
2091        assert_eq!(c.next, None);
2092        assert_matches!(c.content, ChunkContent::Gap(gap) => {
2093            assert_eq!(gap.prev_token, "raclette");
2094        });
2095    }
2096
2097    #[async_test]
2098    async fn test_linked_chunk_replace_item() {
2099        let store = get_event_cache_store().await.expect("creating cache store failed");
2100
2101        let room_id = &DEFAULT_TEST_ROOM_ID;
2102        let linked_chunk_id = LinkedChunkId::Room(room_id);
2103        let event_id = event_id!("$world");
2104
2105        store
2106            .handle_linked_chunk_updates(
2107                linked_chunk_id,
2108                vec![
2109                    Update::NewItemsChunk {
2110                        previous: None,
2111                        new: ChunkIdentifier::new(42),
2112                        next: None,
2113                    },
2114                    Update::PushItems {
2115                        at: Position::new(ChunkIdentifier::new(42), 0),
2116                        items: vec![
2117                            make_test_event(room_id, "hello"),
2118                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
2119                        ],
2120                    },
2121                    Update::ReplaceItem {
2122                        at: Position::new(ChunkIdentifier::new(42), 1),
2123                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
2124                    },
2125                ],
2126            )
2127            .await
2128            .unwrap();
2129
2130        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2131
2132        assert_eq!(chunks.len(), 1);
2133
2134        let c = chunks.remove(0);
2135        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2136        assert_eq!(c.previous, None);
2137        assert_eq!(c.next, None);
2138        assert_matches!(c.content, ChunkContent::Items(events) => {
2139            assert_eq!(events.len(), 2);
2140            check_test_event(&events[0], "hello");
2141            check_test_event(&events[1], "yolo");
2142        });
2143    }
2144
2145    #[async_test]
2146    async fn test_linked_chunk_remove_chunk() {
2147        let store = get_event_cache_store().await.expect("creating cache store failed");
2148
2149        let room_id = &DEFAULT_TEST_ROOM_ID;
2150        let linked_chunk_id = LinkedChunkId::Room(room_id);
2151
2152        store
2153            .handle_linked_chunk_updates(
2154                linked_chunk_id,
2155                vec![
2156                    Update::NewGapChunk {
2157                        previous: None,
2158                        new: ChunkIdentifier::new(42),
2159                        next: None,
2160                        gap: Gap { prev_token: "raclette".to_owned() },
2161                    },
2162                    Update::NewGapChunk {
2163                        previous: Some(ChunkIdentifier::new(42)),
2164                        new: ChunkIdentifier::new(43),
2165                        next: None,
2166                        gap: Gap { prev_token: "fondue".to_owned() },
2167                    },
2168                    Update::NewGapChunk {
2169                        previous: Some(ChunkIdentifier::new(43)),
2170                        new: ChunkIdentifier::new(44),
2171                        next: None,
2172                        gap: Gap { prev_token: "tartiflette".to_owned() },
2173                    },
2174                    Update::RemoveChunk(ChunkIdentifier::new(43)),
2175                ],
2176            )
2177            .await
2178            .unwrap();
2179
2180        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2181
2182        assert_eq!(chunks.len(), 2);
2183
2184        // Chunks are ordered from smaller to bigger IDs.
2185        let c = chunks.remove(0);
2186        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2187        assert_eq!(c.previous, None);
2188        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
2189        assert_matches!(c.content, ChunkContent::Gap(gap) => {
2190            assert_eq!(gap.prev_token, "raclette");
2191        });
2192
2193        let c = chunks.remove(0);
2194        assert_eq!(c.identifier, ChunkIdentifier::new(44));
2195        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2196        assert_eq!(c.next, None);
2197        assert_matches!(c.content, ChunkContent::Gap(gap) => {
2198            assert_eq!(gap.prev_token, "tartiflette");
2199        });
2200
2201        // Check that cascading worked. Yes, SQLite, I doubt you.
2202        let gaps = store
2203            .read()
2204            .await
2205            .unwrap()
2206            .with_transaction(|txn| -> rusqlite::Result<_> {
2207                let mut gaps = Vec::new();
2208                for data in txn
2209                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
2210                    .query_map((), |row| row.get::<_, u64>(0))?
2211                {
2212                    gaps.push(data?);
2213                }
2214                Ok(gaps)
2215            })
2216            .await
2217            .unwrap();
2218
2219        assert_eq!(gaps, vec![42, 44]);
2220    }
2221
2222    #[async_test]
2223    async fn test_linked_chunk_push_items() {
2224        let store = get_event_cache_store().await.expect("creating cache store failed");
2225
2226        let room_id = &DEFAULT_TEST_ROOM_ID;
2227        let linked_chunk_id = LinkedChunkId::Room(room_id);
2228
2229        store
2230            .handle_linked_chunk_updates(
2231                linked_chunk_id,
2232                vec![
2233                    Update::NewItemsChunk {
2234                        previous: None,
2235                        new: ChunkIdentifier::new(42),
2236                        next: None,
2237                    },
2238                    Update::PushItems {
2239                        at: Position::new(ChunkIdentifier::new(42), 0),
2240                        items: vec![
2241                            make_test_event(room_id, "hello"),
2242                            make_test_event(room_id, "world"),
2243                        ],
2244                    },
2245                    Update::PushItems {
2246                        at: Position::new(ChunkIdentifier::new(42), 2),
2247                        items: vec![make_test_event(room_id, "who?")],
2248                    },
2249                ],
2250            )
2251            .await
2252            .unwrap();
2253
2254        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2255
2256        assert_eq!(chunks.len(), 1);
2257
2258        let c = chunks.remove(0);
2259        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2260        assert_eq!(c.previous, None);
2261        assert_eq!(c.next, None);
2262        assert_matches!(c.content, ChunkContent::Items(events) => {
2263            assert_eq!(events.len(), 3);
2264
2265            check_test_event(&events[0], "hello");
2266            check_test_event(&events[1], "world");
2267            check_test_event(&events[2], "who?");
2268        });
2269    }
2270
2271    #[async_test]
2272    async fn test_linked_chunk_remove_item() {
2273        let store = get_event_cache_store().await.expect("creating cache store failed");
2274
2275        let room_id = *DEFAULT_TEST_ROOM_ID;
2276        let linked_chunk_id = LinkedChunkId::Room(room_id);
2277
2278        store
2279            .handle_linked_chunk_updates(
2280                linked_chunk_id,
2281                vec![
2282                    Update::NewItemsChunk {
2283                        previous: None,
2284                        new: ChunkIdentifier::new(42),
2285                        next: None,
2286                    },
2287                    Update::PushItems {
2288                        at: Position::new(ChunkIdentifier::new(42), 0),
2289                        items: vec![
2290                            make_test_event(room_id, "one"),
2291                            make_test_event(room_id, "two"),
2292                            make_test_event(room_id, "three"),
2293                            make_test_event(room_id, "four"),
2294                            make_test_event(room_id, "five"),
2295                            make_test_event(room_id, "six"),
2296                        ],
2297                    },
2298                    Update::RemoveItem {
2299                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
2300                    },
2301                ],
2302            )
2303            .await
2304            .unwrap();
2305
2306        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2307
2308        assert_eq!(chunks.len(), 1);
2309
2310        let c = chunks.remove(0);
2311        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2312        assert_eq!(c.previous, None);
2313        assert_eq!(c.next, None);
2314        assert_matches!(c.content, ChunkContent::Items(events) => {
2315            assert_eq!(events.len(), 5);
2316            check_test_event(&events[0], "one");
2317            check_test_event(&events[1], "two");
2318            check_test_event(&events[2], "four");
2319            check_test_event(&events[3], "five");
2320            check_test_event(&events[4], "six");
2321        });
2322
2323        // Make sure the position have been updated for the remaining events.
2324        let num_rows: u64 = store
2325            .read()
2326            .await
2327            .unwrap()
2328            .with_transaction(move |txn| {
2329                txn.query_row(
2330                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2331                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2332                    |row| row.get(0),
2333                )
2334            })
2335            .await
2336            .unwrap();
2337        assert_eq!(num_rows, 3);
2338    }
2339
2340    #[async_test]
2341    async fn test_linked_chunk_detach_last_items() {
2342        let store = get_event_cache_store().await.expect("creating cache store failed");
2343
2344        let room_id = *DEFAULT_TEST_ROOM_ID;
2345        let linked_chunk_id = LinkedChunkId::Room(room_id);
2346
2347        store
2348            .handle_linked_chunk_updates(
2349                linked_chunk_id,
2350                vec![
2351                    Update::NewItemsChunk {
2352                        previous: None,
2353                        new: ChunkIdentifier::new(42),
2354                        next: None,
2355                    },
2356                    Update::PushItems {
2357                        at: Position::new(ChunkIdentifier::new(42), 0),
2358                        items: vec![
2359                            make_test_event(room_id, "hello"),
2360                            make_test_event(room_id, "world"),
2361                            make_test_event(room_id, "howdy"),
2362                        ],
2363                    },
2364                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2365                ],
2366            )
2367            .await
2368            .unwrap();
2369
2370        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2371
2372        assert_eq!(chunks.len(), 1);
2373
2374        let c = chunks.remove(0);
2375        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2376        assert_eq!(c.previous, None);
2377        assert_eq!(c.next, None);
2378        assert_matches!(c.content, ChunkContent::Items(events) => {
2379            assert_eq!(events.len(), 1);
2380            check_test_event(&events[0], "hello");
2381        });
2382    }
2383
2384    #[async_test]
2385    async fn test_linked_chunk_start_end_reattach_items() {
2386        let store = get_event_cache_store().await.expect("creating cache store failed");
2387
2388        let room_id = *DEFAULT_TEST_ROOM_ID;
2389        let linked_chunk_id = LinkedChunkId::Room(room_id);
2390
2391        // Same updates and checks as test_linked_chunk_push_items, but with extra
2392        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2393        // effects.
2394        store
2395            .handle_linked_chunk_updates(
2396                linked_chunk_id,
2397                vec![
2398                    Update::NewItemsChunk {
2399                        previous: None,
2400                        new: ChunkIdentifier::new(42),
2401                        next: None,
2402                    },
2403                    Update::PushItems {
2404                        at: Position::new(ChunkIdentifier::new(42), 0),
2405                        items: vec![
2406                            make_test_event(room_id, "hello"),
2407                            make_test_event(room_id, "world"),
2408                            make_test_event(room_id, "howdy"),
2409                        ],
2410                    },
2411                    Update::StartReattachItems,
2412                    Update::EndReattachItems,
2413                ],
2414            )
2415            .await
2416            .unwrap();
2417
2418        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2419
2420        assert_eq!(chunks.len(), 1);
2421
2422        let c = chunks.remove(0);
2423        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2424        assert_eq!(c.previous, None);
2425        assert_eq!(c.next, None);
2426        assert_matches!(c.content, ChunkContent::Items(events) => {
2427            assert_eq!(events.len(), 3);
2428            check_test_event(&events[0], "hello");
2429            check_test_event(&events[1], "world");
2430            check_test_event(&events[2], "howdy");
2431        });
2432    }
2433
2434    #[async_test]
2435    async fn test_linked_chunk_clear() {
2436        let store = get_event_cache_store().await.expect("creating cache store failed");
2437
2438        let room_id = *DEFAULT_TEST_ROOM_ID;
2439        let linked_chunk_id = LinkedChunkId::Room(room_id);
2440        let event_0 = make_test_event(room_id, "hello");
2441        let event_1 = make_test_event(room_id, "world");
2442        let event_2 = make_test_event(room_id, "howdy");
2443
2444        store
2445            .handle_linked_chunk_updates(
2446                linked_chunk_id,
2447                vec![
2448                    Update::NewItemsChunk {
2449                        previous: None,
2450                        new: ChunkIdentifier::new(42),
2451                        next: None,
2452                    },
2453                    Update::NewGapChunk {
2454                        previous: Some(ChunkIdentifier::new(42)),
2455                        new: ChunkIdentifier::new(54),
2456                        next: None,
2457                        gap: Gap { prev_token: "fondue".to_owned() },
2458                    },
2459                    Update::PushItems {
2460                        at: Position::new(ChunkIdentifier::new(42), 0),
2461                        items: vec![event_0.clone(), event_1, event_2],
2462                    },
2463                    Update::Clear,
2464                ],
2465            )
2466            .await
2467            .unwrap();
2468
2469        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2470        assert!(chunks.is_empty());
2471
2472        // Check that cascading worked. Yes, SQLite, I doubt you.
2473        store
2474            .read()
2475            .await
2476            .unwrap()
2477            .with_transaction(|txn| -> rusqlite::Result<_> {
2478                let num_gaps = txn
2479                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2480                    .query_row((), |row| row.get::<_, u64>(0))?;
2481                assert_eq!(num_gaps, 0);
2482
2483                let num_events = txn
2484                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2485                    .query_row((), |row| row.get::<_, u64>(0))?;
2486                assert_eq!(num_events, 0);
2487
2488                Ok(())
2489            })
2490            .await
2491            .unwrap();
2492
2493        // It's okay to re-insert a past event.
2494        store
2495            .handle_linked_chunk_updates(
2496                linked_chunk_id,
2497                vec![
2498                    Update::NewItemsChunk {
2499                        previous: None,
2500                        new: ChunkIdentifier::new(42),
2501                        next: None,
2502                    },
2503                    Update::PushItems {
2504                        at: Position::new(ChunkIdentifier::new(42), 0),
2505                        items: vec![event_0],
2506                    },
2507                ],
2508            )
2509            .await
2510            .unwrap();
2511    }
2512
2513    #[async_test]
2514    async fn test_linked_chunk_multiple_rooms() {
2515        let store = get_event_cache_store().await.expect("creating cache store failed");
2516
2517        let room1 = room_id!("!realcheeselovers:raclette.fr");
2518        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2519        let room2 = room_id!("!realcheeselovers:fondue.ch");
2520        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2521
2522        // Check that applying updates to one room doesn't affect the others.
2523        // Use the same chunk identifier in both rooms to battle-test search.
2524
2525        store
2526            .handle_linked_chunk_updates(
2527                linked_chunk_id1,
2528                vec![
2529                    Update::NewItemsChunk {
2530                        previous: None,
2531                        new: ChunkIdentifier::new(42),
2532                        next: None,
2533                    },
2534                    Update::PushItems {
2535                        at: Position::new(ChunkIdentifier::new(42), 0),
2536                        items: vec![
2537                            make_test_event(room1, "best cheese is raclette"),
2538                            make_test_event(room1, "obviously"),
2539                        ],
2540                    },
2541                ],
2542            )
2543            .await
2544            .unwrap();
2545
2546        store
2547            .handle_linked_chunk_updates(
2548                linked_chunk_id2,
2549                vec![
2550                    Update::NewItemsChunk {
2551                        previous: None,
2552                        new: ChunkIdentifier::new(42),
2553                        next: None,
2554                    },
2555                    Update::PushItems {
2556                        at: Position::new(ChunkIdentifier::new(42), 0),
2557                        items: vec![make_test_event(room1, "beaufort is the best")],
2558                    },
2559                ],
2560            )
2561            .await
2562            .unwrap();
2563
2564        // Check chunks from room 1.
2565        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2566        assert_eq!(chunks_room1.len(), 1);
2567
2568        let c = chunks_room1.remove(0);
2569        assert_matches!(c.content, ChunkContent::Items(events) => {
2570            assert_eq!(events.len(), 2);
2571            check_test_event(&events[0], "best cheese is raclette");
2572            check_test_event(&events[1], "obviously");
2573        });
2574
2575        // Check chunks from room 2.
2576        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2577        assert_eq!(chunks_room2.len(), 1);
2578
2579        let c = chunks_room2.remove(0);
2580        assert_matches!(c.content, ChunkContent::Items(events) => {
2581            assert_eq!(events.len(), 1);
2582            check_test_event(&events[0], "beaufort is the best");
2583        });
2584    }
2585
2586    #[async_test]
2587    async fn test_linked_chunk_update_is_a_transaction() {
2588        let store = get_event_cache_store().await.expect("creating cache store failed");
2589
2590        let room_id = *DEFAULT_TEST_ROOM_ID;
2591        let linked_chunk_id = LinkedChunkId::Room(room_id);
2592
2593        // Trigger a violation of the unique constraint on the (room id, chunk id)
2594        // couple.
2595        let err = store
2596            .handle_linked_chunk_updates(
2597                linked_chunk_id,
2598                vec![
2599                    Update::NewItemsChunk {
2600                        previous: None,
2601                        new: ChunkIdentifier::new(42),
2602                        next: None,
2603                    },
2604                    Update::NewItemsChunk {
2605                        previous: None,
2606                        new: ChunkIdentifier::new(42),
2607                        next: None,
2608                    },
2609                ],
2610            )
2611            .await
2612            .unwrap_err();
2613
2614        // The operation fails with a constraint violation error.
2615        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2616            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2617        });
2618
2619        // If the updates have been handled transactionally, then no new chunks should
2620        // have been added; failure of the second update leads to the first one being
2621        // rolled back.
2622        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2623        assert!(chunks.is_empty());
2624    }
2625
2626    #[async_test]
2627    async fn test_filter_duplicate_events_no_events() {
2628        let store = get_event_cache_store().await.expect("creating cache store failed");
2629
2630        let room_id = *DEFAULT_TEST_ROOM_ID;
2631        let linked_chunk_id = LinkedChunkId::Room(room_id);
2632        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2633        assert!(duplicates.is_empty());
2634    }
2635
2636    #[async_test]
2637    async fn test_load_last_chunk() {
2638        let room_id = room_id!("!r0:matrix.org");
2639        let linked_chunk_id = LinkedChunkId::Room(room_id);
2640        let event = |msg: &str| make_test_event(room_id, msg);
2641        let store = get_event_cache_store().await.expect("creating cache store failed");
2642
2643        // Case #1: no last chunk.
2644        {
2645            let (last_chunk, chunk_identifier_generator) =
2646                store.load_last_chunk(linked_chunk_id).await.unwrap();
2647
2648            assert!(last_chunk.is_none());
2649            assert_eq!(chunk_identifier_generator.current(), 0);
2650        }
2651
2652        // Case #2: only one chunk is present.
2653        {
2654            store
2655                .handle_linked_chunk_updates(
2656                    linked_chunk_id,
2657                    vec![
2658                        Update::NewItemsChunk {
2659                            previous: None,
2660                            new: ChunkIdentifier::new(42),
2661                            next: None,
2662                        },
2663                        Update::PushItems {
2664                            at: Position::new(ChunkIdentifier::new(42), 0),
2665                            items: vec![event("saucisse de morteau"), event("comté")],
2666                        },
2667                    ],
2668                )
2669                .await
2670                .unwrap();
2671
2672            let (last_chunk, chunk_identifier_generator) =
2673                store.load_last_chunk(linked_chunk_id).await.unwrap();
2674
2675            assert_matches!(last_chunk, Some(last_chunk) => {
2676                assert_eq!(last_chunk.identifier, 42);
2677                assert!(last_chunk.previous.is_none());
2678                assert!(last_chunk.next.is_none());
2679                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2680                    assert_eq!(items.len(), 2);
2681                    check_test_event(&items[0], "saucisse de morteau");
2682                    check_test_event(&items[1], "comté");
2683                });
2684            });
2685            assert_eq!(chunk_identifier_generator.current(), 42);
2686        }
2687
2688        // Case #3: more chunks are present.
2689        {
2690            store
2691                .handle_linked_chunk_updates(
2692                    linked_chunk_id,
2693                    vec![
2694                        Update::NewItemsChunk {
2695                            previous: Some(ChunkIdentifier::new(42)),
2696                            new: ChunkIdentifier::new(7),
2697                            next: None,
2698                        },
2699                        Update::PushItems {
2700                            at: Position::new(ChunkIdentifier::new(7), 0),
2701                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2702                        },
2703                    ],
2704                )
2705                .await
2706                .unwrap();
2707
2708            let (last_chunk, chunk_identifier_generator) =
2709                store.load_last_chunk(linked_chunk_id).await.unwrap();
2710
2711            assert_matches!(last_chunk, Some(last_chunk) => {
2712                assert_eq!(last_chunk.identifier, 7);
2713                assert_matches!(last_chunk.previous, Some(previous) => {
2714                    assert_eq!(previous, 42);
2715                });
2716                assert!(last_chunk.next.is_none());
2717                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2718                    assert_eq!(items.len(), 3);
2719                    check_test_event(&items[0], "fondue");
2720                    check_test_event(&items[1], "gruyère");
2721                    check_test_event(&items[2], "mont d'or");
2722                });
2723            });
2724            assert_eq!(chunk_identifier_generator.current(), 42);
2725        }
2726    }
2727
2728    #[async_test]
2729    async fn test_load_last_chunk_with_a_cycle() {
2730        let room_id = room_id!("!r0:matrix.org");
2731        let linked_chunk_id = LinkedChunkId::Room(room_id);
2732        let store = get_event_cache_store().await.expect("creating cache store failed");
2733
2734        store
2735            .handle_linked_chunk_updates(
2736                linked_chunk_id,
2737                vec![
2738                    Update::NewItemsChunk {
2739                        previous: None,
2740                        new: ChunkIdentifier::new(0),
2741                        next: None,
2742                    },
2743                    Update::NewItemsChunk {
2744                        // Because `previous` connects to chunk #0, it will create a cycle.
2745                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2746                        // **does not exist**. We have to detect this cycle.
2747                        previous: Some(ChunkIdentifier::new(0)),
2748                        new: ChunkIdentifier::new(1),
2749                        next: Some(ChunkIdentifier::new(0)),
2750                    },
2751                ],
2752            )
2753            .await
2754            .unwrap();
2755
2756        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2757    }
2758
2759    #[async_test]
2760    async fn test_load_previous_chunk() {
2761        let room_id = room_id!("!r0:matrix.org");
2762        let linked_chunk_id = LinkedChunkId::Room(room_id);
2763        let event = |msg: &str| make_test_event(room_id, msg);
2764        let store = get_event_cache_store().await.expect("creating cache store failed");
2765
2766        // Case #1: no chunk at all, equivalent to having an nonexistent
2767        // `before_chunk_identifier`.
2768        {
2769            let previous_chunk = store
2770                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2771                .await
2772                .unwrap();
2773
2774            assert!(previous_chunk.is_none());
2775        }
2776
2777        // Case #2: there is one chunk only: we request the previous on this
2778        // one, it doesn't exist.
2779        {
2780            store
2781                .handle_linked_chunk_updates(
2782                    linked_chunk_id,
2783                    vec![Update::NewItemsChunk {
2784                        previous: None,
2785                        new: ChunkIdentifier::new(42),
2786                        next: None,
2787                    }],
2788                )
2789                .await
2790                .unwrap();
2791
2792            let previous_chunk =
2793                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2794
2795            assert!(previous_chunk.is_none());
2796        }
2797
2798        // Case #3: there are two chunks.
2799        {
2800            store
2801                .handle_linked_chunk_updates(
2802                    linked_chunk_id,
2803                    vec![
2804                        // new chunk before the one that exists.
2805                        Update::NewItemsChunk {
2806                            previous: None,
2807                            new: ChunkIdentifier::new(7),
2808                            next: Some(ChunkIdentifier::new(42)),
2809                        },
2810                        Update::PushItems {
2811                            at: Position::new(ChunkIdentifier::new(7), 0),
2812                            items: vec![event("brigand du jorat"), event("morbier")],
2813                        },
2814                    ],
2815                )
2816                .await
2817                .unwrap();
2818
2819            let previous_chunk =
2820                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2821
2822            assert_matches!(previous_chunk, Some(previous_chunk) => {
2823                assert_eq!(previous_chunk.identifier, 7);
2824                assert!(previous_chunk.previous.is_none());
2825                assert_matches!(previous_chunk.next, Some(next) => {
2826                    assert_eq!(next, 42);
2827                });
2828                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2829                    assert_eq!(items.len(), 2);
2830                    check_test_event(&items[0], "brigand du jorat");
2831                    check_test_event(&items[1], "morbier");
2832                });
2833            });
2834        }
2835    }
2836}
2837
2838#[cfg(test)]
2839mod encrypted_tests {
2840    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2841
2842    use matrix_sdk_base::{
2843        event_cache::store::{EventCacheStore, EventCacheStoreError},
2844        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2845        event_cache_store_media_integration_tests,
2846    };
2847    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2848    use once_cell::sync::Lazy;
2849    use ruma::{
2850        event_id,
2851        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2852        room_id, user_id,
2853    };
2854    use tempfile::{tempdir, TempDir};
2855
2856    use super::SqliteEventCacheStore;
2857
2858    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2859    static NUM: AtomicU32 = AtomicU32::new(0);
2860
2861    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2862        let name = NUM.fetch_add(1, SeqCst).to_string();
2863        let tmpdir_path = TMP_DIR.path().join(name);
2864
2865        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2866
2867        Ok(SqliteEventCacheStore::open(
2868            tmpdir_path.to_str().unwrap(),
2869            Some("default_test_password"),
2870        )
2871        .await
2872        .unwrap())
2873    }
2874
2875    event_cache_store_integration_tests!();
2876    event_cache_store_integration_tests_time!();
2877    event_cache_store_media_integration_tests!();
2878
2879    #[async_test]
2880    async fn test_no_sqlite_injection_in_find_event_relations() {
2881        let room_id = room_id!("!test:localhost");
2882        let another_room_id = room_id!("!r1:matrix.org");
2883        let sender = user_id!("@alice:localhost");
2884
2885        let store = get_event_cache_store()
2886            .await
2887            .expect("We should be able to create a new, empty, event cache store");
2888
2889        let f = EventFactory::new().room(room_id).sender(sender);
2890
2891        // Create an event for the first room.
2892        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2893        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2894
2895        // Create a related event.
2896        let edit_id = event_id!("$find_me:matrix.org");
2897        let edit = f
2898            .text_msg("Find me")
2899            .event_id(edit_id)
2900            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2901            .into_event();
2902
2903        // Create an event for the second room.
2904        let f = f.room(another_room_id);
2905
2906        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2907        let another_event =
2908            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2909
2910        // Save the events in the DB.
2911        store.save_event(room_id, event).await.unwrap();
2912        store.save_event(room_id, edit).await.unwrap();
2913        store.save_event(another_room_id, another_event).await.unwrap();
2914
2915        // Craft a `RelationType` that will inject some SQL to be executed. The
2916        // `OR 1=1` ensures that all the previous parameters, the room
2917        // ID and event ID are ignored.
2918        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2919
2920        // Attempt to find events in the first room.
2921        let results = store
2922            .find_event_relations(room_id, event_id, filter.as_deref())
2923            .await
2924            .expect("We should be able to attempt to find event relations");
2925
2926        // Ensure that we only got the single related event the first room contains.
2927        similar_asserts::assert_eq!(
2928            results.len(),
2929            1,
2930            "We should only have loaded events for the first room {results:#?}"
2931        );
2932
2933        // The event needs to be the edit event, otherwise something is wrong.
2934        let (found_event, _) = &results[0];
2935        assert_eq!(
2936            found_event.event_id().as_deref(),
2937            Some(edit_id),
2938            "The single event we found should be the edit event"
2939        );
2940    }
2941}