Thursday, 28 December 2017

Observable in scala

Assuming you are quite familiar with scala basic knowledge. There are three interfaces ObservableSubscription and Observer.
These interfaces are similar to Publisher, Subscription and Subscriber interfaces from the reactive streams JVM implementation. 

Observable  =========> Publisher
Observer      =========> Subscriber
Subscription =========> Subscription


Observable  

It emits its result to the observer based on the demand requested by the subscription.

Subscription

It's an one to one life cycle of an "Observer" subscribing to an "Observable".

Observer

An observer provides the mechanism for receiving push based notification from the observables.

On subscription to an Observable[TResult] the Observer will be passed the Subscription via theonSubscribe(subscription: Subscription)

Demand for results is signaled via the Subscription and any results are passed to the onNext(result: TResult) method. If there is an error for any reason the onError(e: Throwable) will be called and no more events passed to the Observer

Alternatively, when the Observer has consumed all the results from the Observable the onComplete() method will be called.

Examples:
object ObservablesLifetime extends App
{
import rx.lang.scala._
val classics = List("Dangal", "Back to the future", "Die Hard")
val o = Observable.from(classics)
o.subscribe(new Observer[String] {
override def onNext(m: String) = log(s"Movies Watchlist - $m")
override def onError(e: Throwable) = log(s"Ooops - $e!")
override def onCompleted() = log(s"No more movies.") })
}

The ScalaObservable implicit class also provides the following Monadic operators to make chaining and working with Observable instances simpler:

GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })


The full list of Monadic operators available are:

  • andThen: Allows the chaining of Observables.
  • collect : Collects all the results into a sequence.
  • fallbackTo : Allows falling back to an alternative Observable if there is a failure
  • filter : Filters results of the Observable.
  • flatMap : Create a new Observable by applying a function to each result of the Observable.
  • foldLeft : Creates a new Observable that contains the single result of the applied accumulator function.
  • foreach : Applies a function applied to each emitted result.
  • head : Returns the head of the Observable in a Future.
  • map : Creates a new Observable by applying a function to each emitted result of the Observable.
  • recover : Creates a new Observable that will handle any matching throwable that this Observable might contain by assigning it a value of another Observable.
  • recoverWith : Creates a new Observable that will handle any matching throwable that this Observable might contain.
  • toFuture : Collects the Observable results and converts to a Future.
  • transform : Creates a new Observable by applying the resultFunction function to each emitted result.
  • withFilter : Provides for-comprehensions support to Observables.
  • zip : Zips the values of this and that Observable, and creates a new Observable holding the tuple of their results.

Monads in Scala

Monads belongs to Advance Scala   concepts. It  is not a class or a trait; it is a concept. It is an object which covers other object. A Mon...