Implemented basic sync queue for assets which require execution on GL sync thread

Created AssetManager.SyncTuple tuple to cache asset loading state
Renamed Dcc variables in DccLoader to dcc since parent and child dcc are the same
Changed AssetContainer future from a Future to a Promise and renamed it
Asset loading changed to use promises due to loading be a promise that the asset will load at a future time
Created AssetManager#getDepNow temp method to return non-retained reference to asset
Updated AssetManagerTest mpq test case
This commit is contained in:
Collin Smith
2021-11-07 00:47:14 -07:00
parent feee2f1a9e
commit 6ef6c26f7e
4 changed files with 97 additions and 30 deletions

View File

@ -3,32 +3,33 @@ package com.riiablo.asset;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class AssetContainer extends AbstractReferenceCounted {
public static AssetContainer wrap(AssetDesc asset, Future<?> future) {
if (future == null) throw new IllegalArgumentException("future cannot be null");
return new AssetContainer(asset, future);
public static AssetContainer wrap(AssetDesc asset, Promise<?> promise) {
if (promise == null) throw new IllegalArgumentException("promise cannot be null");
return new AssetContainer(asset, promise);
}
final AssetDesc asset; // for context of which asset this contains
final Future<?> future;
final Promise<?> promise;
AssetContainer(AssetDesc asset, Future<?> future) {
AssetContainer(AssetDesc asset, Promise<?> promise) {
this.asset = asset;
this.future = future;
this.promise = promise;
}
@SuppressWarnings("unchecked")
public <T> Future<T> get(Class<T> type) {
return (Future<T>) future;
return (Future<T>) promise;
}
@Override
protected void deallocate() {
// dispose if completed, else ?
future.cancel(false);
if (future.isDone()) AssetUtils.dispose(future.getNow());
promise.cancel(false);
if (promise.isDone()) AssetUtils.dispose(promise.getNow());
}
@Override
@ -39,7 +40,7 @@ public class AssetContainer extends AbstractReferenceCounted {
@Override
public String toString() {
return new ToStringBuilder(this)
.append("future", future)
.append("future", promise)
.append("refCnt", refCnt())
.toString();
}

View File

@ -6,6 +6,8 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.badlogic.gdx.files.FileHandle;
import com.badlogic.gdx.utils.Array;
@ -23,6 +25,7 @@ public final class AssetManager implements Disposable {
final ObjectMap<Class, Adapter> adapters = new ObjectMap<>();
final Array<PriorityContainer<FileHandleResolver>> resolvers = new Array<>();
final ObjectMap<Class, Class<? extends AssetParams>> defaultParams = new ObjectMap<>();
final BlockingQueue<SyncTuple> syncQueue = new LinkedBlockingQueue<>();
final EventExecutor io;
final EventExecutor sync;
@ -121,15 +124,24 @@ public final class AssetManager implements Disposable {
}
}
// TODO: something more elegant for dependencies
public <T> T getDepNow(final AssetDesc<T> asset) {
final AssetContainer container0 = loadedAssets.get(asset);
final T object = container0 != null ? container0.get(asset.type).getNow() : null;
if (object == null) throw new RuntimeException("dependency not loaded: " + asset);
return object;
}
public <T> Future<T> load(final AssetDesc<T> asset) {
final AssetContainer container = loadedAssets.get(asset);
if (container != null) {
container.retain();
return container.get(asset.type);
final AssetContainer container0 = loadedAssets.get(asset);
if (container0 != null) {
container0.retain();
return container0.get(asset.type);
}
final Promise<T> promise = sync.newPromise();
loadedAssets.put(asset, AssetContainer.wrap(asset, promise));
final AssetContainer container = AssetContainer.wrap(asset, promise);
loadedAssets.put(asset, container);
final AssetLoader loader = findLoader(asset.type);
final FileHandle handle = resolve(asset); // TODO: refactor AssetLoader#resolver?
final Adapter adapter = findAdapter(handle);
@ -142,17 +154,18 @@ public final class AssetManager implements Disposable {
.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) {
log.debug("Asset IO complete: " + asset);
log.debug("Asset IO complete: {}", asset);
@SuppressWarnings("unchecked") // guaranteed by loader contract
T object = (T) loader.loadAsync(AssetManager.this, asset, handle, future.getNow());
promise.setSuccess(object);
log.debug("Asset Async complete: {}", asset);
boolean inserted = syncQueue.offer(SyncTuple.wrap(container, promise, loader, object));
if (!inserted) log.error("Failed to enqueue {}", asset);
log.debug("Queue added {} {}", inserted, syncQueue.size());
}
});
}
});
// io promise -> loadAsync promise -> loadSync promise
// check loadedAssets
// check preload queue
// check task list
@ -164,6 +177,54 @@ public final class AssetManager implements Disposable {
final AssetContainer container = loadedAssets.get(asset);
if (container == null) return;
boolean release = container.release();
log.debug("container released? " + release);
log.debug("container released? {}", release);
}
// TODO: intended as a "process until something is synced" for testing
public boolean update() {
SyncTuple sync;
while ((sync = syncQueue.poll()) != null) {
log.debug("Asset Sync: {}", sync.container.asset);
sync.loadSync(this);
return true;
}
return false;
}
static final class SyncTuple<T> {
static <T> SyncTuple<T> wrap(
AssetContainer container,
Promise<T> promise,
AssetLoader loader,
T object
) {
return new SyncTuple<>(container, promise, loader, object);
}
final AssetContainer container;
final Promise<T> promise;
final AssetLoader loader;
final T object;
SyncTuple(
AssetContainer container,
Promise<T> promise,
AssetLoader loader,
T object
) {
assert container.promise == promise : "container.promise != promise";
this.container = container;
this.promise = promise;
this.loader = loader;
this.object = object;
}
@SuppressWarnings("unchecked") // guaranteed by loader contract
Future<?> loadSync(AssetManager assets) {
loader.loadSync(assets, container.asset, object);
promise.setSuccess(object);
log.debug("Asset Loaded: {}", container.asset);
return promise;
}
}
}

View File

@ -59,9 +59,9 @@ public class DccLoader extends AssetLoader<Dcc> {
log.traceEntry("ioAsync(executor: {}, asset: {}, handle: {}, adapter: {})", executor, asset, handle, adapter);
DcParams params = asset.params(DcParams.class);
if (params.direction >= 0) {
Dcc parent = assets.load(AssetDesc.of(asset, PARENT_DC)).getNow();
int offset = parent.dirOffset(params.direction);
int nextOffset = parent.dirOffset(params.direction + 1);
Dcc dcc = assets.getDepNow(AssetDesc.of(asset, PARENT_DC));
int offset = dcc.dirOffset(params.direction);
int nextOffset = dcc.dirOffset(params.direction + 1);
return adapter.buffer(executor, handle, offset, nextOffset - offset);
} else {
return adapter.stream(executor, handle, adapter.defaultBufferSize(handle));
@ -78,18 +78,17 @@ public class DccLoader extends AssetLoader<Dcc> {
log.traceEntry("loadAsync(assets: {}, asset: {}, handle: {}, data: {})", assets, asset, handle, data);
DcParams params = asset.params(DcParams.class);
if (params.direction >= 0) {
Dcc parent = assets.load(AssetDesc.of(asset, PARENT_DC)).getNow();
assert parent != null : "parent dcc should not be null";
Dcc dcc = assets.getDepNow(AssetDesc.of(asset, PARENT_DC));
assert data instanceof ByteBuf;
ByteBuf buffer = (ByteBuf) data; // borrowed, don't release
Dcc child = parent.read(buffer, params.direction);
dcc.read(buffer, params.direction);
DccDecoder decoder = decoders.obtain();
try {
decoder.decode(child, params.direction);
decoder.decode(dcc, params.direction);
} finally {
decoders.release(decoder);
}
return parent;
return dcc;
} else {
assert data instanceof InputStream;
InputStream stream = (InputStream) data;

View File

@ -139,7 +139,8 @@ public class AssetManagerTest extends RiiabloTest {
@ParameterizedTest
@ValueSource(strings = {
"data\\global\\CHARS\\BA\\LG\\BALGLITTNHTH.DCC",
"data\\global\\chars\\ba\\hd\\bahdbhma11hs.dcc",
// "data\\global\\CHARS\\BA\\LG\\BALGLITTNHTH.DCC",
})
void load_mpq(String path) {
AssetDesc<Dcc> asset = AssetDesc.of(path, Dcc.class, DcParams.of(-1));
@ -147,17 +148,22 @@ public class AssetManagerTest extends RiiabloTest {
Future<Dcc> handle = assets.load(asset);
try {
assertNotNull(handle);
while (!assets.update());
assertTimeout(Duration.ofMillis(100), () -> {
Dcc object = handle.get();
assertNotNull(object);
});
handle.syncUninterruptibly();
assets.loadedAssets.put(asset, AssetContainer.wrap(asset, handle));
// assets.loadedAssets.put(asset, AssetContainer.wrap(asset, (Promise<?>) handle));
assertEquals(1, assets.loadedAssets.size);
Future<Dcc> handle0 = assets.load(asset0);
while (!assets.update()); // throws NPE because headless LibGDX -> no Gdx.gl object
assertTimeout(Duration.ofMillis(100), () -> {
Dcc object = handle0.get();
assertNotNull(object);
});
handle0.syncUninterruptibly();
/** TODO: sync of direction throws NPE in {@link Dcc#uploadTextures(int)} */
} finally {
assets.unload(asset);
assets.unload(asset0);