OK
Mono Laboratorium

Mono Laboratorium

Mono Laboratorium

W poprzednim odcinku Spring + Kotlin pojawił się nowy typ Mono. Jest on dostarczony przez bibliotekę https://projectreactor.io/ , którą Spring wykorzystuje do operacji asynchronicznych.

Zagadnienie nie jest błahe i musimy przyjrzeć mu się bliżej. I tak jak przy nauce gry w bilard można albo liczyć kąty albo obserwować jak odbijają się kulki i wyrobić sobie intuicję co do ich zachowania po zderzeniu.

My zrobimy to drugie i zbadamy kilka konkretnych przypadków .Witamy w “Laboratorium”

Mono i Wątki

Przykład 1

Kod :

Mono
      .just("result")
      .log()
      .subscribe()

Wynik :

13:14:39.929 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
13:14:39.933 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
13:14:39.933 [main] INFO reactor.Mono.Just.1 - | onNext(result)
13:14:39.934 [main] INFO reactor.Mono.Just.1 - | onComplete()

Komentarz :

  • Wszystko działa w wątku main.
  • Pomimo, iż mamy tylko jeden element to widzimy w logach kilka operacji
    • Subskrypcja – bez tego w ogóle nie ruszy
    • Żądanie subskrybenta – w tym przypadku unbounded oznacza “bez limity” czyli dostaniemy tyle danych ile fabryka dała. Tutaj możemy sterować żądaniem aby nie zostać zalanymi przez zbyt szybką produkcję danych. Temat na osobny artykuł
    • Dostaliśmy kolejny element – ostatni bo to Mono
    • I akcja Complete

Na razie jest prosto ale przed nami więcej wątków.

Przykład 2

Kod :

Mono.just("result2")
      .log()
      .map(String::toUpperCase)
      .log()
      .subscribeOn(Schedulers.parallel())
      .subscribe()

Wynik :

Nic nie ma….

Komentarz:

W takim przypadku nic nie zobaczymy gdyż dane przepływają na osobnym wątku z puli “Schedulers.parallel” także main kończy się zanim cokolwiek zobaczymy. Jeśli teraz damy jakiegoś sleepa w main to wynik:

13:21:22.428 [parallel-1] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
13:21:22.432 [parallel-1] INFO reactor.Mono.MapFuseable.2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
13:21:22.433 [parallel-1] INFO reactor.Mono.MapFuseable.2 - | request(unbounded)
13:21:22.434 [parallel-1] INFO reactor.Mono.Just.1 - | request(unbounded)
13:21:22.434 [parallel-1] INFO reactor.Mono.Just.1 - | onNext(result2)
13:21:22.434 [parallel-1] INFO reactor.Mono.MapFuseable.2 - | onNext(RESULT2)
13:21:22.435 [parallel-1] INFO reactor.Mono.Just.1 - | onComplete()
13:21:22.435 [parallel-1] INFO reactor.Mono.MapFuseable.2 - | onComplete()

Komentarz :


W logach widzimy, że teraz akcja odbywa się w wątku parallel-1
Mamy log zapięty w dwóch miejscach strumienia – przed i po map – możemy zauważyć, iż zdradzają one kilka szczegółów implementacji.


Implementacja map jest następująca :

public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
  if (this instanceof Fuseable) {
    return onAssembly(new MonoMapFuseable<>(this, mapper));
  }
    return onAssembly(new MonoMap<>(this, mapper));
}

Jaką historię nam opowiada ten kod? Otóż budowanie Mono jest leniwe i generalnie jest dobry “ficzer” z punktu widzenia wykonywania kodu gdyż daje więcej możliwości optymalizacji ostatecznej wersji przed odpaleniem ale przygotuj się , iż miejsca “definicji” i “wykonania” są od siebie oddzielone co może utrudnić debugowanie.

Jest jeszcze to Fuseable. W skrócie jeśli strumień płynie sobie w jednym wątku bez blokowania i innych machinacji asynchronicznych to można wszystko scalić. Zobaczcie co się stanie z tym kodem.

Mono.just("result2")
      .delayElement(Duration.ofMillis(100))
      .map(String::toUpperCase)
      .log()
      .subscribeOn(Schedulers.parallel())
      .subscribe()

Wynik:

13:35:14.038 [parallel-1] INFO reactor.Mono.Map.1 - onSubscribe(FluxMap.MapSubscriber)
13:35:14.041 [parallel-1] INFO reactor.Mono.Map.1 - request(unbounded)
13:35:14.145 [parallel-2] INFO reactor.Mono.Map.1 - onNext(RESULT2)
13:35:14.145 [parallel-2] INFO reactor.Mono.Map.1 - onComplete()

Dodanie “delayElement” miesza na wątkach i już nie ma MapFuseable. Zauważ także zmianę wątków w logach. Przed nami ostatni przykład gdzie rozdzielimy subskrypcję i publikacje na inne wątki.

Przykład 3

Kod :

val s1 = Schedulers.newParallel("firstPool",4)
val s2 = Schedulers.newParallel("secondPool",3)
val m: Mono<String> = Mono.just("block and disposable")
      .log()
      .map(String::toUpperCase)
      .map {
println("mapping in ${Thread.currentThread().name}")
it + "mapped"
}
.subscribeOn(s1)
//Publikacja na innym wątku
m.publishOn(s2)
      .subscribe{println("In thread ${Thread.currentThread().name}")}
m.subscribe{println("second subscribe in  ${Thread.currentThread().name}")}
//Publikacja na innym wątku
m.publishOn(s2)
      .subscribe{println("third subscribe in  ${Thread.currentThread().name}")}

Trochę więcej niż poprzednio.

Wynik:

mapping in firstPool-3
mapping in firstPool-2
mapping in firstPool-1
second subscribe in  firstPool-2
13:40:13.489 [firstPool-2] INFO reactor.Mono.Just.1 - | onComplete()
13:40:13.490 [firstPool-1] INFO reactor.Mono.Just.1 - | onComplete()
13:40:13.490 [firstPool-3] INFO reactor.Mono.Just.1 - | onComplete()
third subscribe in  secondPool-5
In thread secondPool-4

Komentarz :

  • Operacje są wykonywane na “firstPool” każda w swoim wątku.
  • Zauważ, iż “second subscribe” jest wykonane po mapowaniu ale nadal na pierwszej puli.
  • Pierwsza i trzecia subskrypcja jest wykonana na drugiej puli i dodatkowo w odwrotnej kolejności niż je wywołaliśmy w kodzie – witamy w programowaniu wielowątkowym.

A kiedy to publikowanie w osobnym wątku się przydaje. Jeśli masz potrzebę użycia jednej puli do pobrania danych a innej do ogarnięcia odpowiedzi na przykład. No i w RxJava jest w zasadzie podobny mechanizm – tyle, że z innymi nazwami – i to się przydaje na androidzie bo tam wątki mogą mieć zupełnie inne uprawnienia i wtedy trzeba przekazać kontrolę.

Podsumowanie

Temat nie jest prosty ale trzeba go ogarnąć gdyż tego będzie coraz więcej w Springu. Nie ma ucieczki…

ul. Jaracza 62
90-251 Łódź
Bitnami