Asynchronous read/write to Realm using RXJava 2

Question:

My first implementation of asynchronous work with RXJava 2 .

Target:

Get json data from the server using the Retrofit2 library. If successful, then write to the Realm and immediately after the write get back the data and send it to the RecyclerView adapter.

So, this is how I implemented it:

private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {

    String accessToken = accessDataModel.getAccessToken();

    MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<ChatsModel>() {
                @Override
                public void onNext(ChatsModel chatsModel) {
                    if (chatsRepository.hasData()) {

                        chatsRepository.updateChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        Log.d(TAG, "DO, onSuccess updated!");
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        Log.d(TAG, "DO, onError when update!");
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                    }

                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });

                    } else {
                        chatsRepository.insertChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                        Log.d(TAG, "DO, onSuccess inserted!");
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                        Log.d(TAG, "DO, onError when inserting!");
                                    }

                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });
                    }
                }

                @Override
                public void onError(Throwable t) {
                    Log.d(TAG, "onError" + t.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

I am writing data to the Realm in the subscriber's onNext() method onNext() MyApplication.getRestApi().getChats() .

Here is the entry code:

public Observable<ChatsModel> updateChatsData(final ChatsModel chatsModel) {

    return Observable.create(new ObservableOnSubscribe<ChatsModel>() {
        @Override
        public void subscribe(ObservableEmitter<ChatsModel> e) throws Exception {
            if (chatsModel != null) {
                realm.executeTransactionAsync(
                        realm -> realm.copyToRealmOrUpdate(chatsModel),
                        () -> {
                            Log.d(LOG_TAG, "Data success updated!");
                            ChatsModel localChatsModel = getAllChatsData();
                            e.onNext(localChatsModel);
                            e.onComplete();
                        },
                        error -> {
                            Log.d(LOG_TAG, "Update data failed!");
                            e.onError(error);
                        });
            }

        }
    });

}

The updateChatsData() method writes asynchronously and is declared in another class.

As you can see, my fetchChatsFromNetwork() method is cumbersome, or so it seems to me.

Question:

Am I doing the right thing or not, if not, what would be the right thing to do?

Answer:

You can completely decouple writing to the database from notifying the adapter of new data.

  1. Subscribe to an Observable that emits a selection from the database and notifies the adapter about it.
  2. When making a network request, write the received data to the database.

With this method, the Observable from the first paragraph will notify the adapter immediately after writing / updating data in the database.

The entry itself in the database can also be done more easily through flatMap something like this:

MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
        .flatMap(data -> (chatsRepository.hasData() ? chatsRepository.updateChatsData(data) : chatsRepository.insertChatsData(data)).flatMap(data -> Observable.just(true)))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(
            aBoolean -> System.out.println("data in DB updated"),
            error -> System.out.println("error: " + e.getMessage())
        );

Here, you may have to play with the replacement of transaction records in the database from synchronous to asynchronous (rather vice versa) due to how asynchronous work in threads without Looper .

Scroll to Top