Reactive programming with RxSwift

As a platform, iOS offers numerous APIs to write asynchronous code and, many a times, this choice becomes hard to manage and a single code base ends up comprising multiple Asynchronous API usages, for example, closures for small Async tasks, delegation for background tasks, notification center for event-based tasks, and such.

RxSwift offers more control over asynchronous code in your iOS environment.

To implement the paradigm, reactive programming uses basic terms:

  • Observables are items that can be observed. When they change, those changes are visible to components that subscribe to the observable in question.
  • Observers observe observables.

Observable sequences can emit zero or more events over their lifetimes. In RxSwift an Event is just an Enumeration Type with 3 possible states:

  • .next(value: T). When a value or collection of values is added to an observable sequence it will send the next event to its subscribers as seen above. The associated value will contain the actual value from the sequence.
  • .error(error: Error). If an Error is encountered, a sequence will emit an error event. This will also terminate the sequence.
  • .completed. If a sequence ends normally it sends a completed event to its subscribers.

Subject is a special type in RxSwift that can act as both of these:

  • An Observable sequence, which means it can be subscribed to
  • An Observer that enables adding new elements onto a subject that will then be emitted to the subject subscribers

There are four subject types in RxSwift, each with unique characteristics that you may find useful in different scenarios. They are as listed:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • Variable

Schedulers. An observable and the operators you use on it will execute and notify observers on the same thread on which the subscribe operator is being used.

Schedulers are an abstraction that let you specify distinct queues on which to perform work. It is always a good idea to move all the intensive work off the main thread to keep it as responsive as possible for users, and schedulers make it easy to do this. They can be Serial or Concurrent.

You will use a serial scheduler if you want to ensure that work is carried out on that queue in the order it was added to the queue, otherwise you will typically just use a concurrent queue.

There are built-in scheduler types that work with Grand Central Dispatch (GCD) queues and NSOperationQueues.

You can also create your own custom schedulers by conforming to the ImmediateScheduler protocol.

Install

Open Terminal and navigate to the project folder. Create a pod file inside your Xcode projects folder by executing the pod init command.

Open the created pod file and paste the pod file's code:

pod 'RxSwift'

Let's create a helper function called executeProcedure(with description:String, procedure: () -> Void). The function takes two params: a string description and a procedure closure. It will print the description and will execute the procedure. We will use this function to encapsulate each procedure and make it easier to determine what each procedure is printing to the console:

import UIKit
import RxSwift
public func executeProcedure(for description:String, procedure: () -> Void){
    print("Procedure executed for:", description)
    procedure()
}

Now we will look at the subscribe operator to subscribe to events being emitted by an Observable and print each event as it is emitted:

executeProcedure(for: "just"){
    let observable = Observable.just("Example of Just Operator!")

    //let fibonacciSequence = Observable.from([0,1,1,2,3,5,8])
    //let dictSequence = Observable.from([1:"Hello",2:"World"])

    observable.subscribe({ (event: Event<String>) in
        print(event)
    })
}

subscribe takes a closure that receives the emitted event and will execute this closure each time an event is emitted. We manually define the event argument that is passed to subscribe's closure as being an event of the String type, and then we print each event as it is emitted.

Let's create an observable sequence from an array using the from operator:

executeProcedure(for: "from"){
    Observable.from([10, 20,30])
        .subscribe { print($0) }
}

As you can see, we have not declared a variable to assign the returned value as we did in the previous examples. We then chained the subscription onto the Observable sequence returned and wrapped the subscription onto the next line to improve readability.

As you can see, subscribe returns Subscription object used to unsubscribe from the observable sequence. This subscription object is of the type that confirms to the Disposable protocol that represents any disposable resource.

Disposing of a subscription will cause the underlying Observable sequence to emit a completed event and terminate. In these basic examples, we have explicitly defined all the Observable sequence elements ahead of time, and they will automatically emit a completed event after emitting the last next event. More often than not though, an Observable sequence can continue to emit events, and they must be manually terminated or else you will leak that memory.

The more conventional approach is to add subscriptions to a container that holds disposables and will call dispose() on its contents on its deinit. That container is called a DisposeBag. We will create a DisposeBag instance before the subscription:

executeProcedure(for: "from") {
   let disposeBag = DisposeBag()

   let subscribed = Observable.from([10, 20,30])
    .subscribe(onNext:{
            print($0)
    })
    subscribed.disposed(by: disposeBag)
}

Then, we add the subscription to disposeBag using the disposed(by:) method.

The create operator is another way to specify all events that an observable will emit to subscribers.

Observable<String>.create { observer in 
    observer.onNext("1") 
    observer.onCompleted()
    observer.onNext("?")
    return Disposables.create() 
}.subscribe(
    onNext: { print($0) },
    onError: { print($0) }, 
    onCompleted: { print("Completed") }, 
    onDisposed: { print("Disposed") }
)
.disposed(by: disposeBag)

The create operator takes a single parameter named subscribe. Its job is to provide the implementation of calling subscribe on the observable. In other words, it defines all the events that will be emitted to subscribers.

PublishSubject. A PublishSubject emits only new items to its subscriber, that is, it does not replay events. We will create a PublishSubject instance named pubSubject:

let pubSubject = PublishSubject<String>()

Note that the PublishSubject initializer is empty, but we need to declare a type that is String in our case. Next, we will subscribe to it and print each emitted event. Subject is empty at this point, so the subscription will not yield anything:

pubSubject.subscribe {
    print($0)
}

We will now use the on operator to add a new next event, an element, onto pubSubject:

pubSubject.on(.next("First Event"))

The on operator will notify all subscribers about the new event. Using the on operator and passing a next event causes the subscription to emit that event. The on operator has convenience methods to simplify the syntax. For example, instead of writing an on and passing a next event to it, we can just write onNext and pass the value you want to emit in the next event to observers. In the code, we can create an onNext event, as follows:

pubSubject.onNext("Second Event")

The overall code till now is this:

executeProcedure(for: "PublishSubject") {
    let pubSubject = PublishSubject<String>()

    pubSubject.subscribe {
        print($0)
    }

    pubSubject.on(.next("First Event"))
    pubSubject.onNext("Second Event")
}

BehaviorSubject. A BehaviorSubject will replay the latest or initial value in a next event to new subscribers. We will create a BehaviorSubject instance with an initial value of Test.

executeProcedure(for: "BehaviorSubject") {
    let disposeBag = DisposeBag()

    let behSubject = BehaviorSubject(value: "Test")
    let initialSubscripton = behSubject.subscribe(onNext: {
        print("Line number is \(#line) and value is" , $0)
    })
    behSubject.onNext("Second Event")

    let subsequentSubscription = behSubject.subscribe(onNext: {
        print("Line number is \(#line) and value is" , $0)
    })

    initialSubscripton.disposed(by: disposeBag)
    subsequentSubscription.disposed(by: disposeBag)
}

ReplaySubject. ReplaySubject will maintain and replay a buffer of the size you specify of the latest next events in the order they were emitted. To create a ReplaySubject, you need to explicitly declare a type because the initializer does not take an initial value, so it can't infer the type and use the create(bufferSize:) static convenience method to pass the number of elements you want replayed to new subscribers for the bufferSize parameter.

executeProcedure(for: "ReplaySubject"){
    let disposeBag = DisposeBag()    
    let repSubject = ReplaySubject<String>.create(bufferSize: 3)

    repSubject.onNext("First")
    repSubject.onNext("Second")
    repSubject.onNext("Third") 
    repSubject.onNext("Fourth")

    repSubject.subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

    repSubject.onNext("Fifth")
    repSubject.subscribe(onNext: {
        print("New Subscription: ", $0)
    })
    .disposed(by: disposeBag)
}

Traits

Traits are intended to help provide clarity of intention. We write code once, but this code might potentially be read many times by us during the course of development and others while trying to maintain it. So we should make as much effort as possible to reduce the cognitive load in understanding what a block of code does, or help make the overall code base that much easier to maintain, adapt, and extend. This is why we should use traits in our code. Fundamentally, traits are just structures that wrap Observable sequences. To access a trait's Observable sequence, we need to call asObservable() on it.

There are three types of trait in RxSwift and they are as listed:

  • Single trait
  • Completable trait
  • Maybe trait

A single trait will only emit either a single next event containing an element, or it will emit an error event containing an error. It will not replay its single emitted error or next event to new subscribers. We can convert a raw Observable sequence to single by calling asSingle() on it. There are a number of places where you might use a single, such as retrieving data from local disk or from a networking operation. They will either deliver the data that is expected or fail with an error.

A completable trait will either only emit a completed event, or it will emit an error event containing an error. It cannot emit next events. It also does not replay its completed or error events. We can convert a raw Observable sequence to Completable by calling asCompletable() on it. We will use a Completable when we only care if some operation has completed, such as a file read or some other complex operation that can fail.

Maybe trait is a mix of single and completable. It can emit a single next event, a completed event, or an error event. In other words, it can either emit an element or error and be done or emit a completed event without emitting anything else. Like single and completable, it does not replay, and we can convert a raw Observable sequence to a maybe by calling asMaybe() on it. A maybe can be used just like a single in places where it might be possible that it won't emit an element or an error and could just complete.

Transforming operators

Transforming operators are used when you want to model the data coming from a sequence to suit your requirements; in other words, prepare the data to match the requirements of a subscriber. For example, to transform elements emitted from an Observable sequence, you use the map operator; map will transform each element as it is emitted from the source Observable sequence. This is similar to the map(_:) method in the standard Swift library, except that this map works with Observable sequence elements.

executeProcedure(for: "map") { 
    Observable.of(10, 20, 30)
        .map { $0 * $0 }
        .subscribe(onNext:{
            print($0)
        })
        .dispose()
}

Imagine an Observable sequence that consists of objects that are themselves Observables and you want to create a new sequence from those. This is where flatMap comes into play. flatMap merges the emission of these resulting Observables and emitting these merged results as its own sequence.

let sequence1  = Observable<Int>.of(1,2)
let sequence2  = Observable<Int>.of(1,2)
let sequenceOfSequences = Observable.of(sequence1,sequence2)
sequenceOfSequences.flatMap{ return $0 }.subscribe(onNext:{
    print($0)
})

scan starts with an initial seed value and is used to aggregate values just like reduce in Swift.

Observable.of(1,2,3,4,5).scan(0) { seed, value in
    return seed + value
}.subscribe(onNext:{
    print($0)
})

Filtering operators

Filtering operators can be divided into separate categories depending on the type of filtering they provide. For instance, operators that ignore certain events, operators that skip events based on certain criteria, or the opposite of skipping, operators that allow you to take elements. Then come operators that allow us to work with distinct elements.

The filter operator applies a predicate to each emitted element and only allows them through if they pass.

Observable.of(2,30,22,5,60,1)
    .filter {$0 > 10}
    .subscribe(onNext:{
      print($0)
})

Like filter, takeWhile also applies a predicate to elements emitted from observable sequences. However, takeWhile will terminate the sequence after the first time the specified condition is false, and all the remaining emitted element will be ignored. Think of takeWhile as a gate; once the gate is closed, nothing else gets through.

executeProcedure(for: "takeWhile") { 
    let disposeBag = DisposeBag()

    let integers = Observable.of(10, 20, 30, 40, 30, 20, 10)
    integers.takeWhile({ $0 < 40 })
    .subscribe(onNext: {
        print($0)        
    }
    .disposed(by: disposeBag)
}

If you just want to emit next events if the value changed from previous ones you need to use distinctUntilChanged.

Observable.of(1,2,2,1,3).distinctUntilChanged().subscribe(onNext:{
    print($0)
})

Combining && Merging

Sometimes we want to work with multiple Observable sequences, reacting to new elements emitted from one or more of those sequences. There are a handful of operators that combine Observable sequences in a variety of ways.

While working with Observables, one of the most obvious requirements is to provide an initial or starting value to an observer. Some examples of this use case come into use while working with locations service and network connectivity when it becomes essential to provide an initial state to the observer; startWith is one such operator that allows us to prefix an initial value.

executeProcedure(for: "startWith") { 
    let disposeBag = DisposeBag()

    Observable.of("String 2", "String 3", "String 4")
        .startWith("String 0", "String 1")
        .subscribe(onNext:{
            print($0)
        })
}

Merging can be defined as combining two or more sequences in the simplest way possible. Merge will combine emissions by multiple Observable sequences into a single new Observable sequence and emit each event in the order it is emitted by each source sequence.

executeProcedure(for: "merge") {
    let disposeBag = DisposeBag()

    let pubSubject1 = PublishSubject<String>()
    let pubSubject2 = PublishSubject<String>()
    let pubSubject3 = PublishSubject<String>()

    Observable.of(pubSubject1, pubSubject2, pubSubject3)
        .merge()
        .subscribe(onNext:{
            print($0)
        })
        .disposed(by: disposeBag)

    pubSubject1.onNext("First Element from Subject 1")
    pubSubject2.onNext("First Element from Subject 2")
    pubSubject3.onNext("First Element from  Subject 3")
    pubSubject1.onNext("Second Element from Subject 1")
    pubSubject3.onNext("Second Element from Subject 3")
    pubSubject2.onNext("Second Element from Subject 2")
}

merge operator fails when it comes to combining elements of different types. The zip operator lets you combine sequences of different types and apply transformations to zip them together. It will combine up to eight Observable sequences into a single sequence and will wait to emit elements from each one of the other source Observable sequences at an index until they all have emitted elements at that index.

executeProcedure(for: "zip") { 
    let disposeBag = DisposeBag()

    let a = Observable.of(1,2,3,4,5)
    let b = Observable.of("a","b","c","d")
    Observable.zip(a,b){ return ($0,$1) }.subscribe {
        print($0)
    }.disposed(by: disposeBag)
}

Side Effects

If you want to register callbacks that will be executed when certain events take place on an observable sequence you need to use the doOn operator. It will not modify the emitted elements but rather just pass them through.

You can use

  • do(onNext:) - if you want to do something just if a next event happened
  • do(onError:) - if errors will be emitted and
  • do(onCompleted:) - if the sequence finished successfully.
Observable.of(1,2,3,4,5).do(onNext: {
    $0 * 10 // This has no effect on the actual subscription
}).subscribe(onNext:{
    print($0)
})

RxCocoa

RxCocoa is RxSwift’s companion library holding all classes that specifically aid development for UIKit and Cocoa. Besides featuring some advanced classes, RxCocoa adds reactive extensions to many UI components so that you can subscribe to various UI events out of the box.

For example, it’s very easy to use RxCocoa to subscribe to the state changes of a UISwitch, like so:

toggleSwitch.rx.isOn .subscribe(onNext: { isOn in
    print(isOn ? "It's ON" : "It's OFF")
})

So, RxCocoa library is useful to make view components become reactive. There are three kinds of trait in RxCocoa, and they are as follows:

  • Driver
  • ControlProperty
  • ControlEvent

Driver intended use is to reactively bind an Observable sequence to a UIElement. It will also replay its latest element to its new subscriber, if there is one. We can convert an Observable sequence to a driver by calling asDriver() on it. As Driver cannot fail, if the Observable can fail, we will need to use one of the asDriver() APIs such as asDriver(onErrorJustReturn:) to provide a return value in the event that the underlying Observable sequence emits an error. For example, in the case of an Observable sequence of an array, we can just return an empty array.

Example

demoTextView.rx.text.orEmpty.asDriver()
    .map {
        "Char count: \($0.characters.count)"
    }
    .drive(demoTextFieldLabel.rx.text)
    .disposed(by: disposeBag)

ControlProperty is a reactive wrapper for a property of a UIElement, such as UITextField. ControlProperties are defined in the Rx namespace. This helps separate and distinguish them from the standard library properties and methods. Generally, but not always, they use the same name as the standard library implementations that they wrap. Like Driver, a ControlProperty will also replay its latest element to new subscribes if there is one, and ControlProperty will automatically emit a completed event when its control or view is about to be deallocated.

Examples

// TextField
demoTextField.rx.text
    .bind(to: demoTextFieldLabel.rx.text)
    .disposed(by: disposeBag)

We have used bind(to:) to bind next events directly to the corresponding label's rx.text value.

// SegmentedControl
segmentedControl.rx.value
    .skip(1)
    .bind(onNext: { [unowned self] in
        self.segmentedControlLabel.text = "Selected Segment: \
       ($0)"
    })
    .disposed(by:disposeBag)

// Slider
slider.rx.value
    .bind(to: progressView.rx.progress)
    .disposed(by: disposeBag)

// DatePicker

datePicker.rx.date
    .map{ [unowned self] in
        "Date picked: " + self.dateFormatter.string(from: $0)
    }
    .bind(to: datePickerLabel.rx.text)
    .disposed(by: disposeBag)            

Similar to ControlProperty, ControlEvent is a reactive wrapper for a UIElement event. For example, UIButton's Rx.tapControlEvent wraps its touchUpInside event. It is also defined in the Rx namespace to differentiate it from the standard events. A ControlEvent will not replay its latest elements to its new subscribers though; it will only emit events as they occur to the current subscribers; for example, you would not want to replay an old tap event to a new subscriber, and a ControlEvent will emit a completed event when its control or view is about to be deallocated.

Basic table view

Basic support for UITableView and UICollectionView is present in the RxCocoa framework.

@IBOutlet var tableView: UITableView!

func bindTableView() {
    let cities = Observable.of(["Lisbon", "Copenhagen", "London", "Madrid",
        "Vienna"])
    cities
        .bind(to: tableView.rx.items) {
            (tableView: UITableView, index: Int, element: String) in
            let cell = UITableViewCell(style: .default, reuseIdentifier: "cell")
            cell.textLabel?.text = element
            return cell 
        }
        .disposed(by: disposeBag) }

That’s all. You don’t even need to set your UIViewController as an UITableViewDataSource.

Let's overview of what’s going on:

  • tableView.rx.items is a binder method operating on observable sequences of elements (like Observable<[String]>).
  • The binding creates an invisible ObserverType object which subscribes to your sequence, and sets itself as the dataSource and delegate of the table view.
  • When a new array of elements is delivered on the observable, the binding reloads the table view.
  • To obtain the cell for each item, RxCocoa calls your closure with details (and date) for the row being reloaded.

This is straightforward to use. But what if you want to capture the user selection? Again, RxCocoa is here to help:

tableView.rx.modelSelected(String.self)
    .subscribe(onNext: { model in
        print("\(model) was selected")
    })
    .disposed(by: disposeBag)

Implementing the ReactiveX Search Bar.

class ViewController: UIViewController, UITableViewDataSource {

    var itemsToShow = [String]()
    let items = ["Saranac River", "Fort Moreau", "Oval", "Fort Brown",
        "Water Pollution Control Plant", "Fort Scott"]
    let disposeBag = DisposeBag()
    @IBOutlet weak var searchBar: UISearchBar!
    @IBOutlet weak var tableView: UITableView!

    func tableView (_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int {
        return itemsToShow.count 
    }

    func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) ->
        UITableViewCell {
        let cell = tableView.dequeueReusableCell(withIdentifier: "prototypeCell",
            for: indexPath)
        cell.textLabel?.text = itemsToShow[indexPath.row] return cell
    }

    override func viewDidLoad() {
        super.viewDidLoad()

        searchBar.rx.text.orEmpty
            .subscribe(onNext: { [unowned self] query in
                self.itemsToShow = self.items.filter { $0.hasPrefix(query)}
                self.tableView.reloadData() 
            })
    }
}

Schedule

Observable and the operators that we use on it will execute and notify the observer on the same thread on which the subscribe operator is used. So far, we have done a lot of work on the main thread. Schedulers are an abstraction that lets us specify distinct queues on which to perform the work. It is always a good idea to move intensive work off the main thread to keep the app responsive for users. Schedulers make it easy to do this.

Scheduler can be serial or concurrent. We use serial schedulers if we want to ensure that the work is carried out on that queue in the order in which it was added to the queue. Otherwise, we can just use a concurrent queue.

There are built-in scheduler types that work with Grand Central Dispatch (GCD) queues and NSOperationQueues, including SerialDispatchQueueScheduler, which can dispatch work on a specified serial dispatch queue; ConcurrentDispatchQueueScheduler, which will dispatch on a concurrent dispatch queue; and OperationQueueScheduler, which will perform the work on a specified NSOperationQueues.

We can also specify our own custom schedulers by conforming to the ImmediateScheduler protocol.

There are a couple of Singleton schedulers that we can use for some common needs such as the following:

  • CurrentThreadScheduler represents the current thread and is the default scheduler
  • MainScheduler represents the main thread and is typically used for all UI-related work.

Example

let imageData = PublishSubject<Data>()
let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .background)

imageData
    .observeOn(concurrentScheduler)
    .map { UIImage(data: $0) }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: {
        imageView.image = $0
    })
    .disposed(by: disposeBag)

imageData.onNext(swiftImageData)

RxAlamofire

RxAlamofire adds an idiomatic Rx layer to Alamofire, making it straightforward to integrate into your observable workflow. Most of the RxAlamofire API revolves around extending SessionManager.

It’s straightforward to perform requests using the default SessionManager session. If you don’t need to reuse a customized session, this can be your go-to request mechanism to retrieve a request result as raw text:

string(.get, stringURL).subscribe(onNext: { print($0) }).disposed(by: disposeBag)

Most of the time you’ll want to deal with and decode JSON, as simply as this:

    
json(.get, stringURL).subscribe(onNext: { print($0) }).disposed(by: disposeBag)

The resulting observable emits the result as a decoded JSON object.

You can also obtain raw Data:

data(.get, stringURL).subscribe(onNext: { print($0) }).disposed(by: disposeBag)

The examples above didn’t modify the default values for customized parameters, URL encoding and HTTP headers. But that’s easy as well:

json(.get, "http://api.openweathermap.org/data/2.5/weather",
    parameters: ["q": "London", "APPID": "{APIKEY}"]) 
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)