Assuming you are quite familiar with scala basic knowledge. There are three interfaces
The full list of Monadic operators available are:
Observable, Subscription and Observer.
These interfaces are similar to
Publisher, Subscription and Subscriber interfaces from the reactive streams JVM implementation.
Observable =========> Publisher
Observer =========> Subscriber
Subscription =========> Subscription
Observable
It emits its result to the observer based on the demand requested by the subscription.
Subscription
It's an one to one life cycle of an "Observer" subscribing to an "Observable".
Observer
An observer provides the mechanism for receiving push based notification from the observables.
On subscription to an
Demand for results is signaled via the
Alternatively, when the
Examples:
The
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
On subscription to an
Observable[TResult] the Observer will be passed the Subscription via theonSubscribe(subscription: Subscription). Demand for results is signaled via the
Subscription and any results are passed to the onNext(result: TResult) method. If there is an error for any reason the onError(e: Throwable) will be called and no more events passed to the Observer. Alternatively, when the
Observer has consumed all the results from the Observable the onComplete() method will be called.Examples:
object ObservablesLifetime extends App
{import rx.lang.scala._val classics = List("Dangal", "Back to the future", "Die Hard")val o = Observable.from(classics)o.subscribe(new Observer[String] {override def onNext(m: String) = log(s"Movies Watchlist - $m")override def onError(e: Throwable) = log(s"Ooops - $e!")override def onCompleted() = log(s"No more movies.") })}
The
ScalaObservable implicit class also provides the following Monadic operators to make chaining and working with Observable instances simpler:
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
The full list of Monadic operators available are:
andThen: Allows the chaining of Observables.collect: Collects all the results into a sequence.fallbackTo: Allows falling back to an alternativeObservableif there is a failurefilter: Filters results of theObservable.flatMap: Create a newObservableby applying a function to each result of theObservable.foldLeft: Creates a new Observable that contains the single result of the applied accumulator function.foreach: Applies a function applied to each emitted result.head: Returns the head of theObservablein aFuture.map: Creates a new Observable by applying a function to each emitted result of the Observable.recover: Creates a newObservablethat will handle any matching throwable that thisObservablemight contain by assigning it a value of anotherObservable.recoverWith: Creates a new Observable that will handle any matching throwable that this Observable might contain.toFuture: Collects theObservableresults and converts to aFuture.transform: Creates a newObservableby applying the resultFunction function to each emitted result.withFilter: Provides for-comprehensions support to Observables.zip: Zips the values of this and thatObservable, and creates a newObservableholding the tuple of their results.

