Use Kotlin coroutines for Firebase real-time database streams

Piotr Prus
4 min readApr 19, 2021
Photo by Tobias Carlsson on Unsplash

There is a lot of articles about coroutines and Firebase firestore, but I found none about the real-time database. I am aware that RTD evolved into a more mature version — firestore, but there is a lot of projects still running on RTD. This article is just for them.

The difference

The key difference between these two databases, in my opinion, is in querying the data. Both databases offer to sort and filter the data, but RTD can eighter sort or filter in a single query, never both. The Firestore, on the other hand, supports indexed queries with compound sorting and filtering. Cloud Firestore allows chaining the filters and combining filters and sort on a property in a single query.

There are more differences, of course, to learn more please visit the official docs.

Coroutines and Task API

The task is an API that represents asynchronous method calls, similar to PendingResult in previous versions of Google Play Services. There are a number of functions that return Task<T>, Firebase Auth is one of these. To be able to use coroutines support with Task, we need to add the following dependency:

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-play-services:1.1.1'

Now, the only thing we need to do is adding .await() to the function call. This inspired me to make some kotlin extensions that will simplify the usage of the Firebase Realtime Database.

Kotlin Extensions for single get

First, let’s make an extension for the single get function. For that, we can use ValueEventListener.

public interface ValueEventListener {public void onDataChange(@NonNull DataSnapshot snapshot);public void onCancelled(@NonNull DatabaseError error);
}

Two methods in the above interface are pretty self-explanatory, let's jump into the coroutines.

We want to suspend the async operation and resume on a listener method call, eighter onCancelled or onDataChange. For this purpose, I will use suspendCancellableCoroutine() builder function which enables wrapping callback in suspending function that allows us to handle cancellation.

Let’s discuss it step by step:

#1: start suspend block with suspendCancellableCoroutine, the continuation is an interface representation of continuation after a suspension point

#2: create the listener object and override two methods that we mention before

#3: onCancelled , we can check the error type here, log it and resume the execution of suspend function with an exception.

#4: onDataChange, this is the place where we receive the data and resume our coroutine with proper data snapshot

#5: invokeOnCancellation let us react to coroutine cancellation, in this case, remove the listener

#6: last, but not least, we are adding the listener to DatabaseReference.

A thus prepared function is returning DataSnapshot, which is a Firebase data class. We can cast this object to your custom class, using getValue(value: T). Here is an example:

Since the PATH can be wrong, or casting can fail, it is a good practice to run our await extension in try/catch.

Observe value change

The whole idea behind realtime database is to react to database changes in realtime. Observing stream of data makes much more sense than a single value. For this purpose, we will use callback flow API. Following docs:

This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used from any context, e.g. from a callback-based API.

This property of callbackFlow makes it perfect to use for firebase callbacks.

As you can see, this extension function is very similar to that presented earlier for single await. The key differences:

  • return type, we are producing stream of values, so the return type is now Flow<T>
  • close(), closes this channel(stream) and throws ClosedSendChannelException , on attempts to send() or offer()
  • offer() instead of continuation.resume(), immediately adds the element to this channel
  • awaitClose { }, invokes when this channel is either closed or canceled

Example of usage:

Observe child changes

In FRD we can observe all the changes(events) that are happening for child of selected node in database. The events like: changing, removing, adding or even moving. ChildEventListener is the one that is allowing this observation and here is the implementation for coroutines.

Short explanation for overrides:

  • onChildMoved, this is called when one of the child nodes will move in the tree
  • onChildChanged, this is called whenever any of the child at given path will change, not only the primitives, but also one of the value of pojo
  • onChildAdded, when new node appear under the path
  • onChildRemoved, when on of the child got removed

Example of usage:

The example above could be easily adopted for observing deletion of messages or their changes. There is a tutorial how to build chat app with firebase. You can learn more HERE

Know the difference between firebase listeners and use them appropriately with kotlin coroutines for better code readability and simplicity.

--

--

Piotr Prus

Android Developer @Tilt, Enthusiast of kotlin, jetpack compose and clean architecture. Currently Composing and KMMing all the things ❤️