Source added

This commit is contained in:
Fr4nz D13trich 2025-11-20 09:26:33 +01:00
parent b2864b500e
commit ba28ca859e
8352 changed files with 1487182 additions and 1 deletions

View file

@ -0,0 +1,11 @@
plugins {
id("signal-library")
}
android {
namespace = "org.signal.paging"
}
dependencies {
implementation(project(":core-util"))
}

View file

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest
xmlns:android="http://schemas.android.com/apk/res/android">
</manifest>

View file

@ -0,0 +1,83 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* We have a bit of a threading problem -- we want our controller to have a fixed size so that it
* can keep track of which ranges of requests are in flight, but it needs to make a blocking call
* to find out the size of the dataset first!
*
* So what this controller does is use a serial executor so that it can buffer calls to a secondary
* controller. The first task on the executor creates the first controller, so all future calls to
* {@link #onDataNeededAroundIndex(int)} are guaranteed to have an active controller.
*
* It's also worth noting that this controller has lifecycle that matches the {@link PagedData} that
* contains it. When invalidations come in, this class will just swap out the active controller with
* a new one.
*/
class BufferedPagingController<Key, Data> implements PagingController<Key> {
private final PagedDataSource<Key, Data> dataSource;
private final PagingConfig config;
private final DataStream<Data> dataStream;
private final Executor serializationExecutor;
private PagingController<Key> activeController;
private int lastRequestedIndex;
BufferedPagingController(@NonNull PagedDataSource<Key, Data> dataSource,
@NonNull PagingConfig config,
@NonNull DataStream<Data> dataStream)
{
this.dataSource = dataSource;
this.config = config;
this.dataStream = dataStream;
this.serializationExecutor = Executors.newSingleThreadExecutor();
this.activeController = null;
this.lastRequestedIndex = config.startIndex();
onDataInvalidated();
}
@Override
public void onDataNeededAroundIndex(int aroundIndex) {
serializationExecutor.execute(() -> {
lastRequestedIndex = aroundIndex;
activeController.onDataNeededAroundIndex(aroundIndex);
});
}
@Override
public void onDataInvalidated() {
serializationExecutor.execute(() -> {
if (activeController != null) {
activeController.onDataInvalidated();
}
activeController = new FixedSizePagingController<>(dataSource, config, dataStream, dataSource.size());
activeController.onDataNeededAroundIndex(lastRequestedIndex);
});
}
@Override
public void onDataItemChanged(Key key) {
serializationExecutor.execute(() -> {
if (activeController != null) {
activeController.onDataItemChanged(key);
}
});
}
@Override
public void onDataItemInserted(Key key, int position) {
serializationExecutor.execute(() -> {
if (activeController != null) {
activeController.onDataItemInserted(key, position);
}
});
}
}

View file

@ -0,0 +1,48 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
/**
* A placeholder class for efficiently storing lists that are mostly empty space.
* TODO [greyson][paging]
*/
public class CompressedList<E> extends AbstractList<E> {
private final List<E> wrapped;
public CompressedList(@NonNull List<E> source) {
this.wrapped = new ArrayList<>(source);
}
public CompressedList(int totalSize) {
this.wrapped = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; i++) {
wrapped.add(null);
}
}
@Override
public int size() {
return wrapped.size();
}
@Override
public E get(int index) {
return wrapped.get(index);
}
@Override
public E set(int globalIndex, E element) {
return wrapped.set(globalIndex, element);
}
@Override
public void add(int index, E element) {
wrapped.add(index, element);
}
}

View file

@ -0,0 +1,86 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import androidx.core.util.Pools;
import java.util.BitSet;
/**
* Keeps track of what data is empty vs filled with an emphasis on doing so in a space-efficient way.
*/
class DataStatus {
private static final Pools.Pool<BitSet> POOL = new Pools.SynchronizedPool<>(1);
private final BitSet state;
private int size;
public static DataStatus obtain(int size) {
BitSet bitset = POOL.acquire();
if (bitset == null) {
bitset = new BitSet(size);
} else {
bitset.clear();
}
return new DataStatus(size, bitset);
}
private DataStatus(int size, @NonNull BitSet bitset) {
this.size = size;
this.state = bitset;
}
void mark(int position) {
state.set(position, true);
}
void markRange(int startInclusive, int endExclusive) {
state.set(startInclusive, endExclusive, true);
}
int getEarliestUnmarkedIndexInRange(int startInclusive, int endExclusive) {
for (int i = startInclusive; i < endExclusive; i++) {
if (!state.get(i)) {
return i;
}
}
return -1;
}
int getLatestUnmarkedIndexInRange(int startInclusive, int endExclusive) {
for (int i = endExclusive - 1; i >= startInclusive; i--) {
if (!state.get(i)) {
return i;
}
}
return -1;
}
boolean get(int position) {
return state.get(position);
}
void insertState(int position, boolean value) {
if (position < 0 || position > size + 1) {
throw new IndexOutOfBoundsException();
}
for (int i = size; i > position; i--) {
state.set(i, state.get(i - 1));
}
state.set(position, value);
this.size = size + 1;
}
int size() {
return size;
}
void recycle() {
POOL.release(state);
}
}

View file

@ -0,0 +1,10 @@
package org.signal.paging;
import java.util.List;
/**
* An abstraction over different types of ways the paging lib can provide data, e.g. Observables vs LiveData.
*/
interface DataStream<Data> {
void next(List<Data> data);
}

View file

@ -0,0 +1,257 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import org.signal.core.util.ThreadUtil;
import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
/**
* The workhorse of managing page requests.
*
* A controller whose life focuses around one invalidation cycle of a data set, and therefore has
* a fixed size throughout. It assumes that all interface methods are called on a single thread,
* which allows it to keep track of pending requests in a thread-safe way, while spinning off
* tasks to fetch data on its own executor.
*/
class FixedSizePagingController<Key, Data> implements PagingController<Key> {
private static final String TAG = Log.tag(FixedSizePagingController.class);
private static final Executor FETCH_EXECUTOR = SignalExecutors.newCachedSingleThreadExecutor("signal-FixedSizePagingController", ThreadUtil.PRIORITY_UI_BLOCKING_THREAD);
private static final boolean DEBUG = false;
private final PagedDataSource<Key, Data> dataSource;
private final PagingConfig config;
private final DataStream<Data> dataStream;
private final DataStatus loadState;
private final Map<Key, Integer> keyToPosition;
private List<Data> data;
private volatile boolean invalidated;
FixedSizePagingController(@NonNull PagedDataSource<Key, Data> dataSource,
@NonNull PagingConfig config,
@NonNull DataStream<Data> dataStream,
int size)
{
this.dataSource = dataSource;
this.config = config;
this.dataStream = dataStream;
this.loadState = DataStatus.obtain(size);
this.data = new CompressedList<>(loadState.size());
this.keyToPosition = new HashMap<>();
if (DEBUG) Log.d(TAG, "[Constructor] Creating with size " + size + " (loadState.size() = " + loadState.size() + ")");
}
/**
* We assume this method is always called on the same thread, so we can read our
* {@code loadState} and construct the parameters of a fetch request. That fetch request can
* then be performed on separate single-thread executor.
*/
@Override
public void onDataNeededAroundIndex(int aroundIndex) {
if (invalidated) {
Log.w(TAG, buildDataNeededLog(aroundIndex, "Invalidated! At very beginning."));
return;
}
final int loadStart;
final int loadEnd;
final int totalSize;
synchronized (loadState) {
if (loadState.size() == 0) {
dataStream.next(Collections.emptyList());
return;
}
int leftPageBoundary = (aroundIndex / config.pageSize()) * config.pageSize();
int rightPageBoundary = leftPageBoundary + config.pageSize();
int buffer = config.bufferPages() * config.pageSize();
int leftLoadBoundary = Math.max(0, leftPageBoundary - buffer);
int rightLoadBoundary = Math.min(loadState.size(), rightPageBoundary + buffer);
loadStart = loadState.getEarliestUnmarkedIndexInRange(leftLoadBoundary, rightLoadBoundary);
if (loadStart < 0) {
if (DEBUG) Log.i(TAG, buildDataNeededLog(aroundIndex, "loadStart < 0"));
return;
}
loadEnd = loadState.getLatestUnmarkedIndexInRange(Math.max(leftLoadBoundary, loadStart), rightLoadBoundary) + 1;
if (loadEnd <= loadStart) {
if (DEBUG) Log.i(TAG, buildDataNeededLog(aroundIndex, "loadEnd <= loadStart, loadEnd: " + loadEnd + ", loadStart: " + loadStart));
return;
}
totalSize = loadState.size();
loadState.markRange(loadStart, loadEnd);
if (DEBUG) Log.i(TAG, buildDataNeededLog(aroundIndex, "start: " + loadStart + ", end: " + loadEnd + ", totalSize: " + totalSize));
}
FETCH_EXECUTOR.execute(() -> {
if (invalidated) {
Log.w(TAG, buildDataNeededLog(aroundIndex, "Invalidated! At beginning of load task."));
return;
}
List<Data> loaded = dataSource.load(loadStart, loadEnd - loadStart, totalSize, () -> invalidated);
if (invalidated) {
Log.w(TAG, buildDataNeededLog(aroundIndex, "Invalidated! Just after data was loaded."));
return;
}
List<Data> updated = new CompressedList<>(data);
for (int i = 0, len = Math.min(loaded.size(), data.size() - loadStart); i < len; i++) {
int position = loadStart + i;
Data item = loaded.get(i);
updated.set(position, item);
keyToPosition.put(dataSource.getKey(item), position);
}
data = updated;
dataStream.next(updated);
});
}
@Override
public void onDataInvalidated() {
if (invalidated) {
return;
}
invalidated = true;
loadState.recycle();
}
@Override
public void onDataItemChanged(Key key) {
if (DEBUG) Log.d(TAG, buildItemChangedLog(key, ""));
FETCH_EXECUTOR.execute(() -> {
Integer position = keyToPosition.get(key);
if (position == null) {
Log.i(TAG, "Notified of key " + key + " but it wasn't in the cache!");
return;
}
if (invalidated) {
Log.w(TAG, "Invalidated! Just before individual change was loaded for position " + position);
return;
}
synchronized (loadState) {
loadState.mark(position);
}
Data item = dataSource.load(key);
if (item == null) {
Log.w(TAG, "Notified of key " + key + " but the loaded item was null!");
return;
}
if (invalidated) {
Log.w(TAG, "Invalidated! Just after individual change was loaded for position " + position);
return;
}
List<Data> updatedList = new CompressedList<>(data);
updatedList.set(position, item);
data = updatedList;
dataStream.next(updatedList);
if (DEBUG) Log.d(TAG, buildItemChangedLog(key, "Published updated data"));
});
}
@Override
public void onDataItemInserted(Key key, int inputPosition) {
if (DEBUG) Log.d(TAG, buildItemInsertedLog(key, inputPosition, ""));
FETCH_EXECUTOR.execute(() -> {
int position = inputPosition;
if (position == POSITION_END) {
position = data.size();
}
if (keyToPosition.containsKey(key)) {
Log.w(TAG, "Notified of key " + key + " being inserted at " + position + ", but the item already exists!");
return;
}
if (invalidated) {
Log.w(TAG, "Invalidated! Just before individual insert was loaded for position " + position);
return;
}
synchronized (loadState) {
loadState.insertState(position, true);
if (DEBUG) Log.d(TAG, buildItemInsertedLog(key, position, "Size of loadState updated to " + loadState.size()));
}
Data item = dataSource.load(key);
if (item == null) {
Log.w(TAG, "Notified of key " + key + " being inserted at " + position + ", but the loaded item was null!");
return;
}
if (invalidated) {
Log.w(TAG, "Invalidated! Just after individual insert was loaded for position " + position);
return;
}
List<Data> updatedList = new CompressedList<>(data);
updatedList.add(position, item);
rebuildKeyToPositionMap(keyToPosition, updatedList, dataSource);
data = updatedList;
dataStream.next(updatedList);
if (DEBUG) Log.d(TAG, buildItemInsertedLog(key, position, "Published updated data"));
});
}
private void rebuildKeyToPositionMap(@NonNull Map<Key, Integer> map, @NonNull List<Data> dataList, @NonNull PagedDataSource<Key, Data> dataSource) {
map.clear();
for (int i = 0, len = dataList.size(); i < len; i++) {
Data item = dataList.get(i);
if (item != null) {
map.put(dataSource.getKey(item), i);
}
}
}
private String buildDataNeededLog(int aroundIndex, String message) {
return "[onDataNeededAroundIndex(" + aroundIndex + "), size: " + loadState.size() + "] " + message;
}
private String buildItemInsertedLog(Key key, int position, String message) {
return "[onDataItemInserted(" + key + ", " + position + "), size: " + loadState.size() + "] " + message;
}
private String buildItemChangedLog(Key key, String message) {
return "[onDataItemChanged(" + key + "), size: " + loadState.size() + "] " + message;
}
}

View file

@ -0,0 +1,25 @@
package org.signal.paging;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.lifecycle.LiveData;
import java.util.List;
/**
* An implementation of {@link PagedData} that will provide data as a {@link LiveData}.
*/
public class LivePagedData<Key, Data> extends PagedData<Key> {
private final LiveData<List<Data>> data;
LivePagedData(@NonNull LiveData<List<Data>> data, @NonNull PagingController<Key> controller) {
super(controller);
this.data = data;
}
@AnyThread
public @NonNull LiveData<List<Data>> getData() {
return data;
}
}

View file

@ -0,0 +1,27 @@
package org.signal.paging;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import java.util.List;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.Subject;
/**
* An implementation of {@link PagedData} that will provide data as an {@link Observable}.
*/
public class ObservablePagedData<Key, Data> extends PagedData<Key> {
private final Observable<List<Data>> data;
ObservablePagedData(@NonNull Observable<List<Data>> data, @NonNull PagingController<Key> controller) {
super(controller);
this.data = data;
}
@AnyThread
public @NonNull Observable<List<Data>> getData() {
return data;
}
}

View file

@ -0,0 +1,42 @@
package org.signal.paging;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.lifecycle.MutableLiveData;
import java.util.List;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import io.reactivex.rxjava3.subjects.Subject;
/**
* The primary entry point for creating paged data.
*/
public class PagedData<Key> {
private final PagingController<Key> controller;
protected PagedData(PagingController<Key> controller) {
this.controller = controller;
}
@AnyThread
public static <Key, Data> LivePagedData<Key, Data> createForLiveData(@NonNull PagedDataSource<Key, Data> dataSource, @NonNull PagingConfig config) {
MutableLiveData<List<Data>> liveData = new MutableLiveData<>();
PagingController<Key> controller = new BufferedPagingController<>(dataSource, config, liveData::postValue);
return new LivePagedData<>(liveData, controller);
}
@AnyThread
public static <Key, Data> ObservablePagedData<Key, Data> createForObservable(@NonNull PagedDataSource<Key, Data> dataSource, @NonNull PagingConfig config) {
Subject<List<Data>> subject = BehaviorSubject.create();
PagingController<Key> controller = new BufferedPagingController<>(dataSource, config, subject::onNext);
return new ObservablePagedData<>(subject, controller);
}
public PagingController<Key> getController() {
return controller;
}
}

View file

@ -0,0 +1,42 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import java.util.List;
/**
* Represents a source of data that can be queried.
*/
public interface PagedDataSource<Key, Data> {
/**
* @return The total size of the data set.
*/
@WorkerThread
int size();
/**
* @param start The index of the first item that should be included in your results.
* @param length The total number of items you should return.
* @param totalSize The total number of items in the data source
* @param cancellationSignal An object that you can check to see if the load operation was canceled.
* @return A list of length {@code length} that represents the data starting at {@code start}.
* If you don't have the full range, just populate what you can.
*/
@WorkerThread
@NonNull List<Data> load(int start, int length, int totalSize, @NonNull CancellationSignal cancellationSignal);
@WorkerThread
@Nullable Data load(Key key);
@WorkerThread
@NonNull Key getKey(@NonNull Data data);
interface CancellationSignal {
/**
* @return True if the operation has been canceled, otherwise false.
*/
boolean isCanceled();
}
}

View file

@ -0,0 +1,81 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import java.util.concurrent.Executor;
/**
* Describes various properties of how you'd like paging to be handled.
*/
public final class PagingConfig {
private final int bufferPages;
private final int startIndex;
private final int pageSize;
private PagingConfig(@NonNull Builder builder) {
this.bufferPages = builder.bufferPages;
this.startIndex = builder.startIndex;
this.pageSize = builder.pageSize;
}
/**
* @return How many pages of 'buffer' you want ahead of and behind the active position. i.e. if
* the {@code pageSize()} is 10 and you specify 2 buffer pages, then there will always be
* at least 20 items ahead of and behind the current position.
*/
int bufferPages() {
return bufferPages;
}
/**
* @return How much data to load at a time when paging data.
*/
int pageSize() {
return pageSize;
}
/**
* @return What position to start loading at
*/
int startIndex() {
return startIndex;
}
public static class Builder {
private int bufferPages = 1;
private int startIndex = 0;
private int pageSize = 50;
public @NonNull Builder setBufferPages(int bufferPages) {
if (bufferPages < 1) {
throw new IllegalArgumentException("You must have at least one buffer page! Requested: " + bufferPages);
}
this.bufferPages = bufferPages;
return this;
}
public @NonNull Builder setPageSize(int pageSize) {
if (pageSize < 1) {
throw new IllegalArgumentException("You must have a page size of at least one! Requested: " + pageSize);
}
this.pageSize = pageSize;
return this;
}
public @NonNull Builder setStartIndex(int startIndex) {
if (startIndex < 0) {
throw new IndexOutOfBoundsException("Requested: " + startIndex);
}
this.startIndex = startIndex;
return this;
}
public @NonNull PagingConfig build() {
return new PagingConfig(this);
}
}
}

View file

@ -0,0 +1,11 @@
package org.signal.paging;
public interface PagingController<Key> {
int POSITION_END = -1;
void onDataNeededAroundIndex(int aroundIndex);
void onDataInvalidated();
void onDataItemChanged(Key key);
void onDataItemInserted(Key key, int position);
}

View file

@ -0,0 +1,48 @@
package org.signal.paging;
import androidx.annotation.Nullable;
/**
* A controller that forwards calls to a secondary, proxied controller. This is useful when you want
* to keep a single, static controller, even when the true controller may be changing due to data
* source changes.
*/
public class ProxyPagingController<Key> implements PagingController<Key> {
private PagingController<Key> proxied;
@Override
public synchronized void onDataNeededAroundIndex(int aroundIndex) {
if (proxied != null) {
proxied.onDataNeededAroundIndex(aroundIndex);
}
}
@Override
public synchronized void onDataInvalidated() {
if (proxied != null) {
proxied.onDataInvalidated();
}
}
@Override
public void onDataItemChanged(Key key) {
if (proxied != null) {
proxied.onDataItemChanged(key);
}
}
@Override
public void onDataItemInserted(Key key, int position) {
if (proxied != null) {
proxied.onDataItemInserted(key, position);
}
}
/**
* Updates the underlying controller to the one specified.
*/
public synchronized void set(@Nullable PagingController<Key> bound) {
this.proxied = bound;
}
}

View file

@ -0,0 +1,56 @@
package org.signal.paging
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Test
class DataStatusTest {
@Test
fun insertState_initiallyEmpty_InsertAtZero() {
val subject = DataStatus.obtain(0)
subject.insertState(0, true)
assertEquals(1, subject.size())
assertTrue(subject[0])
}
@Test
fun insertState_someData_InsertAtZero() {
val subject = DataStatus.obtain(2)
subject.mark(1)
subject.insertState(0, true)
assertEquals(3, subject.size())
assertTrue(subject[0])
assertFalse(subject[1])
assertTrue(subject[2])
}
@Test
fun insertState_someData_InsertAtOne() {
val subject = DataStatus.obtain(3)
subject.mark(1)
subject.insertState(1, true)
assertEquals(4, subject.size())
assertFalse(subject[0])
assertTrue(subject[1])
assertTrue(subject[2])
assertFalse(subject[3])
}
@Test(expected = IndexOutOfBoundsException::class)
fun insertState_negativeThrows() {
val subject = DataStatus.obtain(0)
subject.insertState(-1, true)
}
@Test(expected = IndexOutOfBoundsException::class)
fun insertState_largerThanSizePlusOneThrows() {
val subject = DataStatus.obtain(0)
subject.insertState(2, true)
}
}