As a platform, iOS offers numerous APIs to write asynchronous code and, many a times, this choice becomes hard to manage and a single code base ends up comprising multiple Asynchronous API usages, for example, closures for small Async tasks, delegation for background tasks, notification center for event-based tasks, and such.
RxSwift offers more control over asynchronous code in your iOS environment.
To implement the paradigm, reactive programming uses basic terms:
Observable sequences can emit zero or more events over their lifetimes. In RxSwift an Event
is just an Enumeration Type with 3 possible states:
.next(value: T)
. When a value or collection of values is added to an observable sequence it will send the next event to its subscribers. The associated value will contain the actual value from the sequence..error(error: Error)
. If an Error is encountered, a sequence will emit an error event. This will also terminate the sequence..completed
. If a sequence ends normally it sends a completed event to its subscribers.Subject is a special type in RxSwift that can act as both of these:
There are four subject types in RxSwift, each with unique characteristics that you may find useful in different scenarios. They are as listed:
Schedulers. An observable and the operators you use on it will execute and notify observers on the same thread on which the subscribe operator is being used.
Schedulers are an abstraction that let you specify distinct queues on which to perform work. It is always a good idea to move all the intensive work off the main thread to keep it as responsive as possible for users, and schedulers make it easy to do this. They can be Serial or Concurrent.
You will use a serial scheduler if you want to ensure that work is carried out on that queue in the order it was added to the queue, otherwise you will typically just use a concurrent queue.
There are built-in scheduler types that work with Grand Central Dispatch (GCD) queues and NSOperationQueues
.
You can also create your own custom schedulers by conforming to the ImmediateScheduler
protocol.
Install
Open Terminal and navigate to the project folder. Create a pod file inside your Xcode projects folder by executing the pod init
command.
Open the created pod file and paste the pod file's code:
pod 'RxSwift'
Let's create a helper function called executeProcedure(with description:String, procedure: () -> Void)
. The function takes two params: a string description and a procedure closure. It will print the description and will execute the procedure. We will use this function to encapsulate each procedure and make it easier to determine what each procedure is printing to the console:
import UIKit import RxSwift public func executeProcedure(for description:String, procedure: () -> Void){ print("Procedure executed for:", description) procedure() }
Now we will look at the subscribe
operator to subscribe to events being emitted by an Observable
and print each event as it is emitted:
executeProcedure(for: "just"){ let observable = Observable.just("Example of Just Operator!") //let fibonacciSequence = Observable.from([0,1,1,2,3,5,8]) //let dictSequence = Observable.from([1:"Hello",2:"World"]) observable.subscribe({ (event: Event<String>) in print(event) }) }
subscribe
takes a closure that receives the emitted event and will execute this closure each time an event is emitted. We manually define the event argument that is passed to subscribe's closure as being an event of the String type, and then we print each event as it is emitted.
Let's create an observable sequence from an array using the from operator:
executeProcedure(for: "from"){ Observable.from([10, 20,30]) .subscribe { print($0) } }
As you can see, we have not declared a variable to assign the returned value as we did in the previous examples. We then chained the subscription onto the Observable sequence returned and wrapped the subscription onto the next line to improve readability.
As you can see, subscribe returns Subscription
object used to unsubscribe from the observable sequence. This subscription object is of the type that confirms to the Disposable
protocol that represents any disposable resource.
Disposing of a subscription will cause the underlying Observable sequence to emit a completed event and terminate. In these basic examples, we have explicitly defined all the Observable sequence elements ahead of time, and they will automatically emit a completed event after emitting the last next event. More often than not though, an Observable sequence can continue to emit events, and they must be manually terminated or else you will leak that memory.
The more conventional approach is to add subscriptions to a container that holds disposables and will call dispose()
on its contents on its deinit
. That container is called a DisposeBag
. We will create a DisposeBag
instance before the subscription:
executeProcedure(for: "from") { let disposeBag = DisposeBag() let subscribed = Observable.from([10, 20,30]) .subscribe(onNext:{ print($0) }) subscribed.disposed(by: disposeBag) }
Then, we add the subscription to disposeBag
using the disposed(by:)
method.
The create
operator is another way to specify all events that an observable will emit to subscribers.
Observable<String>.create { observer in observer.onNext("1") observer.onCompleted() observer.onNext("?") return Disposables.create() }.subscribe( onNext: { print($0) }, onError: { print($0) }, onCompleted: { print("Completed") }, onDisposed: { print("Disposed") } ) .disposed(by: disposeBag)
The create
operator takes a single parameter named subscribe. Its job is to provide the implementation of calling subscribe
on the observable. In other words, it defines all the events that will be emitted to subscribers.
PublishSubject. A PublishSubject
emits only new items to its subscriber, that is, it does not replay events. We will create a PublishSubject
instance named pubSubject
:
let pubSubject = PublishSubject<String>()
Note that the PublishSubject
initializer is empty, but we need to declare a type that is String in our case. Next, we will subscribe to it and print each emitted event. Subject is empty at this point, so the subscription will not yield anything:
pubSubject.subscribe { print($0) }
We will now use the on operator to add a new next event, an element, onto pubSubject
:
pubSubject.on(.next("First Event"))
The on
operator will notify all subscribers about the new event. Using the on
operator and passing a next event causes the subscription to emit that event. The on
operator has convenience methods to simplify the syntax. For example, instead of writing an on
and passing a next event to it, we can just write onNext
and pass the value you want to emit in the next event to observers. In the code, we can create an onNext
event, as follows:
pubSubject.onNext("Second Event")
The overall code till now is this:
executeProcedure(for: "PublishSubject") { let pubSubject = PublishSubject<String>() pubSubject.subscribe { print($0) } pubSubject.on(.next("First Event")) pubSubject.onNext("Second Event") }
BehaviorSubject. A BehaviorSubject
will replay the latest or initial value in a next event to new subscribers. We will create a BehaviorSubject
instance with an initial value of Test.
executeProcedure(for: "BehaviorSubject") { let disposeBag = DisposeBag() let behSubject = BehaviorSubject(value: "Test") let initialSubscripton = behSubject.subscribe(onNext: { print("Line number is \(#line) and value is" , $0) }) behSubject.onNext("Second Event") let subsequentSubscription = behSubject.subscribe(onNext: { print("Line number is \(#line) and value is" , $0) }) initialSubscripton.disposed(by: disposeBag) subsequentSubscription.disposed(by: disposeBag) }
ReplaySubject. ReplaySubject
will maintain and replay a buffer of the size you specify of the latest next events in the order they were emitted. To create a ReplaySubject
, you need to explicitly declare a type because the initializer does not take an initial value, so it can't infer the type and use the create(bufferSize:)
static convenience method to pass the number of elements you want replayed to new subscribers for the bufferSize
parameter.
executeProcedure(for: "ReplaySubject"){ let disposeBag = DisposeBag() let repSubject = ReplaySubject<String>.create(bufferSize: 3) repSubject.onNext("First") repSubject.onNext("Second") repSubject.onNext("Third") repSubject.onNext("Fourth") repSubject.subscribe(onNext: { print($0) }) .disposed(by: disposeBag) repSubject.onNext("Fifth") repSubject.subscribe(onNext: { print("New Subscription: ", $0) }) .disposed(by: disposeBag) }
Traits
Traits are intended to help provide clarity of intention. We write code once, but this code might potentially be read many times by us during the course of development and others while trying to maintain it. So we should make as much effort as possible to reduce the cognitive load in understanding what a block of code does, or help make the overall code base that much easier to maintain, adapt, and extend. This is why we should use traits in our code. Fundamentally, traits are just structures that wrap Observable sequences. To access a trait's Observable
sequence, we need to call asObservable()
on it.
There are three types of trait in RxSwift and they are as listed:
A single trait will only emit either a single next event containing an element, or it will emit an error event containing an error. It will not replay its single emitted error or next event to new subscribers. We can convert a raw Observable sequence to single by calling asSingle()
on it. There are a number of places where you might use a single, such as retrieving data from local disk or from a networking operation. They will either deliver the data that is expected or fail with an error.
struct User: Codable { var name: String } func getUsers() -> Single<[User]> { return Single<[User]>.create { observer in let request = URLRequest(url: URL(string: "URL")!) let task = URLSession.shared.dataTask(with: request) { (data, response, error) in do { let users:[User] = try JSONDecoder().decode([User].self, from: data ?? Data()) observer(.success(users)) } catch let error { observer(.error(error)) } } task.resume() return Disposables.create { task.cancel() } } } getUsers().subscribe(onSuccess: { (result) in print(result) }) { (error) in print(error) }
A completable trait will either only emit a completed event, or it will emit an error event containing an error. It cannot emit next events. It also does not replay its completed or error events. We can convert a raw Observable sequence to Completable by calling asCompletable()
on it. We will use a Completable when we only care if some operation has completed, such as a file read or some other complex operation that can fail.
Maybe trait is a mix of single and completable. It can emit a single next event, a completed event, or an error event. In other words, it can either emit an element or error and be done or emit a completed event without emitting anything else. Like single and completable, it does not replay, and we can convert a raw Observable sequence to a maybe by calling asMaybe()
on it. A maybe can be used just like a single in places where it might be possible that it won't emit an element or an error and could just complete.
Transforming operators
Transforming operators are used when you want to model the data coming from a sequence to suit your requirements; in other words, prepare the data to match the requirements of a subscriber. For example, to transform elements emitted from an Observable sequence, you use the map
operator; map
will transform each element as it is emitted from the source Observable sequence. This is similar to the map(_:)
method in the standard Swift library, except that this map
works with Observable sequence elements.
executeProcedure(for: "map") { Observable.of(10, 20, 30) .map { $0 * $0 } .subscribe(onNext:{ print($0) }) .dispose() }
Imagine an Observable sequence that consists of objects that are themselves Observables and you want to create a new sequence from those. This is where flatMap
comes into play. flatMap
merges the emission of these resulting Observables and emitting these merged results as its own sequence.
let sequence1 = Observable<Int>.of(1,2) let sequence2 = Observable<Int>.of(1,2) let sequenceOfSequences = Observable.of(sequence1,sequence2) sequenceOfSequences.flatMap{ return $0 }.subscribe(onNext:{ print($0) })
scan
starts with an initial seed value and is used to aggregate values just like reduce
in Swift.
Observable.of(1,2,3,4,5).scan(0) { seed, value in return seed + value }.subscribe(onNext:{ print($0) })
Filtering operators
Filtering operators can be divided into separate categories depending on the type of filtering they provide. For instance, operators that ignore certain events, operators that skip events based on certain criteria, or the opposite of skipping, operators that allow you to take elements. Then come operators that allow us to work with distinct elements.
The filter
operator applies a predicate to each emitted element and only allows them through if they pass.
Observable.of(2,30,22,5,60,1) .filter {$0 > 10} .subscribe(onNext:{ print($0) })
Like filter
, takeWhile
also applies a predicate to elements emitted from observable sequences. However, takeWhile
will terminate the sequence after the first time the specified condition is false, and all the remaining emitted element will be ignored. Think of takeWhile
as a gate; once the gate is closed, nothing else gets through.
executeProcedure(for: "takeWhile") { let disposeBag = DisposeBag() let integers = Observable.of(10, 20, 30, 40, 30, 20, 10) integers.takeWhile({ $0 < 40 }) .subscribe(onNext: { print($0) } .disposed(by: disposeBag) }
If you just want to emit next events if the value changed from previous ones you need to use distinctUntilChanged
.
Observable.of(1,2,2,1,3).distinctUntilChanged().subscribe(onNext:{ print($0) })
Combining && Merging
Sometimes we want to work with multiple Observable sequences, reacting to new elements emitted from one or more of those sequences. There are a handful of operators that combine Observable sequences in a variety of ways.
While working with Observables, one of the most obvious requirements is to provide an initial or starting value to an observer. Some examples of this use case come into use while working with locations service and network connectivity when it becomes essential to provide an initial state to the observer; startWith
is one such operator that allows us to prefix an initial value.
executeProcedure(for: "startWith") { let disposeBag = DisposeBag() Observable.of("String 2", "String 3", "String 4") .startWith("String 0", "String 1") .subscribe(onNext:{ print($0) }) }
Merging can be defined as combining two or more sequences in the simplest way possible. Merge
will combine emissions by multiple Observable sequences into a single new Observable sequence and emit each event in the order it is emitted by each source sequence.
executeProcedure(for: "merge") { let disposeBag = DisposeBag() let pubSubject1 = PublishSubject<String>() let pubSubject2 = PublishSubject<String>() let pubSubject3 = PublishSubject<String>() Observable.of(pubSubject1, pubSubject2, pubSubject3) .merge() .subscribe(onNext:{ print($0) }) .disposed(by: disposeBag) pubSubject1.onNext("First Element from Subject 1") pubSubject2.onNext("First Element from Subject 2") pubSubject3.onNext("First Element from Subject 3") pubSubject1.onNext("Second Element from Subject 1") pubSubject3.onNext("Second Element from Subject 3") pubSubject2.onNext("Second Element from Subject 2") }
merge
operator fails when it comes to combining elements of different types. The zip
operator lets you combine sequences of different types and apply transformations to zip
them together. It will combine up to eight Observable
sequences into a single sequence and will wait to emit elements from each one of the other source Observable
sequences at an index until they all have emitted elements at that index.
executeProcedure(for: "zip") { let disposeBag = DisposeBag() let a = Observable.of(1,2,3,4,5) let b = Observable.of("a","b","c","d") Observable.zip(a,b){ return ($0,$1) }.subscribe { print($0) }.disposed(by: disposeBag) }
Side Effects
If you want to register callbacks that will be executed when certain events take place on an observable sequence you need to use the doOn
operator. It will not modify the emitted elements but rather just pass them through.
You can use
do(onNext:)
- if you want to do something just if a next event happeneddo(onError:)
- if errors will be emitted anddo(onCompleted:)
- if the sequence finished successfully.Observable.of(1,2,3,4,5).do(onNext: { $0 * 10 // This has no effect on the actual subscription }).subscribe(onNext:{ print($0) })
RxCocoa
RxCocoa is RxSwift’s companion library holding all classes that specifically aid development for UIKit and Cocoa. Besides featuring some advanced classes, RxCocoa adds reactive extensions to many UI components so that you can subscribe to various UI events out of the box.
For example, it’s very easy to use RxCocoa to subscribe to the state changes of a UISwitch
, like so:
toggleSwitch.rx.isOn .subscribe(onNext: { isOn in print(isOn ? "It's ON" : "It's OFF") })
So, RxCocoa library is useful to make view components become reactive. There are three kinds of trait in RxCocoa, and they are as follows:
Driver intended use is to reactively bind an Observable sequence to a UIElement
. It will also replay its latest element to its new subscriber, if there is one. We can convert an Observable sequence to a driver by calling asDriver()
on it. As Driver cannot fail, if the Observable can fail, we will need to use one of the asDriver()
APIs such as asDriver(onErrorJustReturn:)
to provide a return value in the event that the underlying Observable sequence emits an error. For example, in the case of an Observable sequence of an array, we can just return an empty array.
Example
demoTextView.rx.text.orEmpty.asDriver() .map { "Char count: \($0.characters.count)" } .drive(demoTextFieldLabel.rx.text) .disposed(by: disposeBag)
ControlProperty is a reactive wrapper for a property of a UIElement
, such as UITextField
. ControlProperties are defined in the Rx namespace. This helps separate and distinguish them from the standard library properties and methods. Generally, but not always, they use the same name as the standard library implementations that they wrap. Like Driver
, a ControlProperty
will also replay its latest element to new subscribes if there is one, and ControlProperty will automatically emit a completed event when its control or view is about to be deallocated.
Examples
// TextField demoTextField.rx.text .bind(to: demoTextFieldLabel.rx.text) .disposed(by: disposeBag)
We have used bind(to:)
to bind next events directly to the corresponding label's rx.text
value.
// SegmentedControl segmentedControl.rx.value .skip(1) .bind(onNext: { [unowned self] in self.segmentedControlLabel.text = "Selected Segment: \ ($0)" }) .disposed(by:disposeBag) // Slider slider.rx.value .bind(to: progressView.rx.progress) .disposed(by: disposeBag) // DatePicker datePicker.rx.date .map{ [unowned self] in "Date picked: " + self.dateFormatter.string(from: $0) } .bind(to: datePickerLabel.rx.text) .disposed(by: disposeBag)
Similar to ControlProperty
, ControlEvent is a reactive wrapper for a UIElement
event. For example, UIButton
's Rx.tapControlEvent
wraps its touchUpInside
event. It is also defined in the Rx namespace to differentiate it from the standard events. A ControlEvent
will not replay its latest elements to its new subscribers though; it will only emit events as they occur to the current subscribers; for example, you would not want to replay an old tap event to a new subscriber, and a ControlEvent
will emit a completed event when its control or view is about to be deallocated.
Basic table view
Basic support for UITableView and UICollectionView is present in the RxCocoa framework.
@IBOutlet var tableView: UITableView! func bindTableView() { let cities = Observable.of(["Lisbon", "Copenhagen", "London", "Madrid", "Vienna"]) cities .bind(to: tableView.rx.items) { (tableView: UITableView, index: Int, element: String) in let cell = UITableViewCell(style: .default, reuseIdentifier: "cell") cell.textLabel?.text = element return cell } .disposed(by: disposeBag) }
That’s all. You don’t even need to set your UIViewController
as an UITableViewDataSource
.
Let's overview of what’s going on:
tableView.rx.items
is a binder method operating on observable sequences of elements (like Observable<[String]>).ObserverType
object which subscribes to your sequence, and sets itself as the dataSource
and delegate
of the table view.This is straightforward to use. But what if you want to capture the user selection? Again, RxCocoa is here to help:
tableView.rx.modelSelected(String.self) .subscribe(onNext: { model in print("\(model) was selected") }) .disposed(by: disposeBag)
Implementing the ReactiveX Search Bar.
class ViewController: UIViewController, UITableViewDataSource { var itemsToShow = [String]() let items = ["Saranac River", "Fort Moreau", "Oval", "Fort Brown", "Water Pollution Control Plant", "Fort Scott"] let disposeBag = DisposeBag() @IBOutlet weak var searchBar: UISearchBar! @IBOutlet weak var tableView: UITableView! func tableView (_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int { return itemsToShow.count } func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell { let cell = tableView.dequeueReusableCell(withIdentifier: "prototypeCell", for: indexPath) cell.textLabel?.text = itemsToShow[indexPath.row] return cell } override func viewDidLoad() { super.viewDidLoad() searchBar.rx.text.orEmpty .subscribe(onNext: { [unowned self] query in self.itemsToShow = self.items.filter { $0.hasPrefix(query)} self.tableView.reloadData() }) } }
Schedule
Observable and the operators that we use on it will execute and notify the observer on the same thread on which the subscribe
operator is used. So far, we have done a lot of work on the main thread. Schedulers are an abstraction that lets us specify distinct queues on which to perform the work. It is always a good idea to move intensive work off the main thread to keep the app responsive for users. Schedulers make it easy to do this.
Scheduler can be serial or concurrent. We use serial schedulers if we want to ensure that the work is carried out on that queue in the order in which it was added to the queue. Otherwise, we can just use a concurrent queue.
There are built-in scheduler types that work with Grand Central Dispatch (GCD) queues and NSOperationQueues
, including SerialDispatchQueueScheduler
, which can dispatch work on a specified serial dispatch queue; ConcurrentDispatchQueueScheduler
, which will dispatch on a concurrent dispatch queue; and OperationQueueScheduler
, which will perform the work on a specified NSOperationQueues
.
We can also specify our own custom schedulers by conforming to the ImmediateScheduler
protocol.
There are a couple of Singleton schedulers that we can use for some common needs such as the following:
CurrentThreadScheduler
represents the current thread and is the default schedulerMainScheduler
represents the main thread and is typically used for all UI-related work.Example
let imageData = PublishSubject<Data>() let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .background) imageData .observeOn(concurrentScheduler) .map { UIImage(data: $0) } .observeOn(MainScheduler.instance) .subscribe(onNext: { imageView.image = $0 }) .disposed(by: disposeBag) imageData.onNext(swiftImageData)
RxAlamofire
RxAlamofire adds an idiomatic Rx layer to Alamofire, making it straightforward to integrate into your observable workflow. Most of the RxAlamofire API revolves around extending SessionManager
.
It’s straightforward to perform requests using the default SessionManager
session. If you don’t need to reuse a customized session, this can be your go-to request mechanism to retrieve a request result as raw text:
string(.get, stringURL).subscribe(onNext: { print($0) }).disposed(by: disposeBag)
Most of the time you’ll want to deal with and decode JSON, as simply as this:
json(.get, stringURL).subscribe(onNext: { print($0) }).disposed(by: disposeBag)
The resulting observable emits the result as a decoded JSON object.
You can also obtain raw Data
:
data(.get, stringURL).subscribe(onNext: { print($0) }).disposed(by: disposeBag)
The examples above didn’t modify the default values for customized parameters, URL encoding and HTTP headers. But that’s easy as well:
json(.get, "http://api.openweathermap.org/data/2.5/weather", parameters: ["q": "London", "APPID": "{APIKEY}"]) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)