Use Kotlin coroutines for Firebase real-time database streams

Photo by Tobias Carlsson on Unsplash

There is a lot of articles about coroutines and Firebase firestore, but I found none about the real-time database. I am aware that RTD evolved into a more mature version — firestore, but there is a lot of projects still running on RTD. This article is just for them.

The difference

There are more differences, of course, to learn more please visit the official docs.

Coroutines and Task API

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-play-services:1.1.1'

Now, the only thing we need to do is adding .await() to the function call. This inspired me to make some kotlin extensions that will simplify the usage of the Firebase Realtime Database.

Kotlin Extensions for single get

public interface ValueEventListener {public void onDataChange(@NonNull DataSnapshot snapshot);public void onCancelled(@NonNull DatabaseError error);
}

Two methods in the above interface are pretty self-explanatory, let's jump into the coroutines.

We want to suspend the async operation and resume on a listener method call, eighter onCancelled or onDataChange. For this purpose, I will use suspendCancellableCoroutine() builder function which enables wrapping callback in suspending function that allows us to handle cancellation.

Let’s discuss it step by step:

#1: start suspend block with suspendCancellableCoroutine, the continuation is an interface representation of continuation after a suspension point

#2: create the listener object and override two methods that we mention before

#3: onCancelled , we can check the error type here, log it and resume the execution of suspend function with an exception.

#4: onDataChange, this is the place where we receive the data and resume our coroutine with proper data snapshot

#5: invokeOnCancellation let us react to coroutine cancellation, in this case, remove the listener

#6: last, but not least, we are adding the listener to DatabaseReference.

A thus prepared function is returning DataSnapshot, which is a Firebase data class. We can cast this object to your custom class, using getValue(value: T). Here is an example:

Since the PATH can be wrong, or casting can fail, it is a good practice to run our await extension in try/catch.

Observe value change

This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used from any context, e.g. from a callback-based API.

This property of callbackFlow makes it perfect to use for firebase callbacks.

As you can see, this extension function is very similar to that presented earlier for single await. The key differences:

  • return type, we are producing stream of values, so the return type is now Flow<T>
  • close(), closes this channel(stream) and throws ClosedSendChannelException , on attempts to send() or offer()
  • offer() instead of continuation.resume(), immediately adds the element to this channel
  • awaitClose { }, invokes when this channel is either closed or canceled

Example of usage:

Observe child changes

Short explanation for overrides:

  • onChildMoved, this is called when one of the child nodes will move in the tree
  • onChildChanged, this is called whenever any of the child at given path will change, not only the primitives, but also one of the value of pojo
  • onChildAdded, when new node appear under the path
  • onChildRemoved, when on of the child got removed

Example of usage:

The example above could be easily adopted for observing deletion of messages or their changes. There is a tutorial how to build chat app with firebase. You can learn more HERE

Know the difference between firebase listeners and use them appropriately with kotlin coroutines for better code readability and simplicity.

Android Developer @Schibsted Enthusiast of kotlin and clean architecture.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store