Question:
My first implementation of asynchronous work with RXJava 2
.
Target:
Get json
data from the server using the Retrofit2
library. If successful, then write to the Realm
and immediately after the write get back the data and send it to the RecyclerView
adapter.
So, this is how I implemented it:
private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {
String accessToken = accessDataModel.getAccessToken();
MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSubscriber<ChatsModel>() {
@Override
public void onNext(ChatsModel chatsModel) {
if (chatsRepository.hasData()) {
chatsRepository.updateChatsData(chatsModel)
.subscribe(new DisposableObserver<ChatsModel>() {
@Override
public void onNext(ChatsModel localChatsModel) {
Log.d(TAG, "DO, onSuccess updated!");
iGetChatsCallback.onGetChatsSuccess(localChatsModel);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "DO, onError when update!");
iGetChatsCallback.onGetChatsError(e.getMessage());
}
@Override
public void onComplete() {
dispose();
Log.d(TAG, "DO, onComplete!");
}
});
} else {
chatsRepository.insertChatsData(chatsModel)
.subscribe(new DisposableObserver<ChatsModel>() {
@Override
public void onNext(ChatsModel localChatsModel) {
iGetChatsCallback.onGetChatsSuccess(localChatsModel);
Log.d(TAG, "DO, onSuccess inserted!");
}
@Override
public void onError(Throwable e) {
iGetChatsCallback.onGetChatsError(e.getMessage());
Log.d(TAG, "DO, onError when inserting!");
}
@Override
public void onComplete() {
dispose();
Log.d(TAG, "DO, onComplete!");
}
});
}
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError" + t.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
I am writing data to the Realm
in the subscriber's onNext() method onNext()
MyApplication.getRestApi().getChats()
.
Here is the entry code:
public Observable<ChatsModel> updateChatsData(final ChatsModel chatsModel) {
return Observable.create(new ObservableOnSubscribe<ChatsModel>() {
@Override
public void subscribe(ObservableEmitter<ChatsModel> e) throws Exception {
if (chatsModel != null) {
realm.executeTransactionAsync(
realm -> realm.copyToRealmOrUpdate(chatsModel),
() -> {
Log.d(LOG_TAG, "Data success updated!");
ChatsModel localChatsModel = getAllChatsData();
e.onNext(localChatsModel);
e.onComplete();
},
error -> {
Log.d(LOG_TAG, "Update data failed!");
e.onError(error);
});
}
}
});
}
The updateChatsData()
method writes asynchronously and is declared in another class.
As you can see, my fetchChatsFromNetwork()
method is cumbersome, or so it seems to me.
Question:
Am I doing the right thing or not, if not, what would be the right thing to do?
Answer:
You can completely decouple writing to the database from notifying the adapter of new data.
- Subscribe to an
Observable
that emits a selection from the database and notifies the adapter about it. - When making a network request, write the received data to the database.
With this method, the Observable
from the first paragraph will notify the adapter immediately after writing / updating data in the database.
The entry itself in the database can also be done more easily through flatMap
something like this:
MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
.flatMap(data -> (chatsRepository.hasData() ? chatsRepository.updateChatsData(data) : chatsRepository.insertChatsData(data)).flatMap(data -> Observable.just(true)))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(
aBoolean -> System.out.println("data in DB updated"),
error -> System.out.println("error: " + e.getMessage())
);
Here, you may have to play with the replacement of transaction records in the database from synchronous to asynchronous (rather vice versa) due to how asynchronous work in threads without Looper
.