RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Overview

RxJava: Reactive Extensions for the JVM

codecov.io Maven Central

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 3.x (Javadoc)

  • single dependency: Reactive-Streams
  • Java 8+ (Android desugar friendly)
  • Java 8 lambda-friendly API
  • fixed API mistakes and many limits of RxJava 2
  • intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
  • non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.)
  • async or synchronous execution
  • virtual time and schedulers for parameterized concurrency
  • test and diagnostic support via test schedulers, test consumers and plugin hooks

Learn more about RxJava in general on the Wiki Home.

ℹ️ Please read the What's different in 3.0 for details on the changes and migration information when upgrading from 2.x.

Version 2.x

The 2.x version is end-of-life as of February 28, 2021. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 2.2.21, will remain accessible.

Version 1.x

The 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible.

Getting started

Setting up the dependency

The first step is to include RxJava 3 into your project, for example, as a Gradle compile dependency:

implementation "io.reactivex.rxjava3:rxjava:3.x.y"

(Please replace x and y with the latest version numbers: Maven Central )

Hello World

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

Note that RxJava 3 components now live under io.reactivex.rxjava3 and the base classes and interfaces live under io.reactivex.rxjava3.core.

Base classes

RxJava 3 features several base classes you can discover operators on:

Some terminology

Upstream, downstream

The dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

Here, if we imagine ourselves on operator2, looking to the left towards the source is called the upstream. Looking to the right towards the subscriber/consumer is called the downstream. This is often more apparent when each element is written on a separate line:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion

In RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated to the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

Assembly time

The preparation of dataflows by applying various intermediate operators happens in the so-called assembly time:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

At this point, the data is not flowing yet and no side-effects are happening.

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Simple background computation

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

Schedulers

RxJava operators don't work with Threads or ExecutorServices directly but with so-called Schedulers that abstract away sources of concurrency behind a uniform API. RxJava 3 features several standard schedulers accessible via Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

In addition, there is an option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Concurrency within a flow

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Parallel processing

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Note, however, that flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.

Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.

Dependent

The most typical scenario is to given a value, invoke another service, await and continue with its result:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

Here, the original value will be available inside the inner flatMap, courtesy of lambda variable capture.

Non-dependent

In other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, flatMap works as well:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

however, the continuation in this case stays Observable instead of the likely more appropriate Single. (This is understandable because from the perspective of flatMapSingle, sourceObservable is a multi-valued source and thus the mapping may result in multiple values as well).

Often though there is a way that is somewhat more expressive (and also lower overhead) by using Completable as the mediator and its operator andThen to resume with something else:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

The only dependency between the sourceObservable and the someSingleSource is that the former should complete normally in order for the latter to be consumed.

Deferred-dependent

Sometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

Unfortunately, this prints 0 because Single.just(count.get()) is evaluated at assembly time when the dataflow hasn't even run yet. We need something that defers the evaluation of this Single source until runtime when the main source completes:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

or

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

Sometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, getDemandAsync could return a Single<DemandRecord>. If the code example is left unchanged, this will result in a compile-time error (however, often with a misleading error message about lack of overload).

In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type.

Converting to the desired type

Each reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options:

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Observable toFlowable2 first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Single toFlowable3 toObservable toMaybe ignoreElement
Maybe toFlowable3 toObservable toSingle ignoreElement
Completable toFlowable toObservable toSingle toMaybe

1: When turning a multi-valued source into a single-valued source, one should decide which of the many source values should be considered as the result.

2: Turning an Observable into Flowable requires an additional decision: what to do with the potential unconstrained flow of the source Observable? There are several strategies available (such as buffering, dropping, keeping the latest) via the BackpressureStrategy parameter or via standard Flowable operators such as onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest which also allow further customization of the backpressure behavior.

3: When there is only (at most) one source item, there is no problem with backpressure as it can be always stored until the downstream is ready to consume.

Using an overload with the desired type

Many frequently used operator has overloads that can deal with the other types. These are usually named with the suffix of the target type:

Operator Overloads
flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

The reason these operators have a suffix instead of simply having the same name with different signature is type erasure. Java doesn't consider signatures such as operator(Function<T, Single<R>>) and operator(Function<T, Maybe<R>>) different (unlike C#) and due to erasure, the two operators would end up as duplicate methods with the same signature.

Operator naming conventions

Naming in programming is one of the hardest things as names are expected to be not long, expressive, capturing and easily memorable. Unfortunately, the target language (and pre-existing conventions) may not give too much help in this regard (unusable keywords, type erasure, type ambiguities, etc.).

Unusable keywords

In the original Rx.NET, the operator that emits a single item and then completes is called Return(T). Since the Java convention is to have a lowercase letter start a method name, this would have been return(T) which is a keyword in Java and thus not available. Therefore, RxJava chose to name this operator just(T). The same limitation exists for the operator Switch, which had to be named switchOnNext. Yet another example is Catch which was named onErrorResumeNext.

Type erasure

Many operators that expect the user to provide some function returning a reactive type can't be overloaded because the type erasure around a Function<T, X> turns such method signatures into duplicates. RxJava chose to name such operators by appending the type as suffix as well:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

Even though certain operators have no problems from type erasure, their signature may turn up being ambiguous, especially if one uses Java 8 and lambdas. For example, there are several overloads of concatWith taking the various other reactive base types as arguments (for providing convenience and performance benefits in the underlying implementation):

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Both Publisher and SingleSource appear as functional interfaces (types with one abstract method) and may encourage users to try to provide a lambda expression:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

Unfortunately, this approach doesn't work and the example does not print 2 at all. In fact, since version 2.1.10, it doesn't even compile because at least 4 concatWith overloads exist and the compiler finds the code above ambiguous.

The user in such situations probably wanted to defer some computation until the someSource has completed, thus the correct unambiguous operator should have been defer:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

Sometimes, a suffix is added to avoid logical ambiguities that may compile but produce the wrong type in a flow:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

This can get also ambiguous when functional interface types get involved as the type argument T.

Error handling

Dataflows can fail, at which point the error is emitted to the consumer(s). Sometimes though, multiple sources may fail at which point there is a choice whether or not wait for all of them to complete or fail. To indicate this opportunity, many operator names are suffixed with the DelayError words (while others feature a delayError or delayErrors boolean flag in one of their overloads):

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

Of course, suffixes of various kinds may appear together:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

The base classes can be considered heavy due to the sheer number of static and instance methods on them. RxJava 3's design was heavily influenced by the Reactive Streams specification, therefore, the library features a class and an interface per each reactive type:

Type Class Interface Consumer
0..N backpressured Flowable Publisher1 Subscriber
0..N unbounded Observable ObservableSource2 Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

1The org.reactivestreams.Publisher is part of the external Reactive Streams library. It is the main type to interact with other reactive libraries through a standardized mechanism governed by the Reactive Streams specification.

2The naming convention of the interface was to append Source to the semi-traditional class name. There is no FlowableSource since Publisher is provided by the Reactive Streams library (and subtyping it wouldn't have helped with interoperation either). These interfaces are, however, not standard in the sense of the Reactive Streams specification and are currently RxJava specific only.

R8 and ProGuard settings

By default, RxJava itself doesn't require any ProGuard/R8 settings and should work without problems. Unfortunately, the Reactive Streams dependency since version 1.0.3 has embedded Java 9 class files in its JAR that can cause warnings with the plain ProGuard:

Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher

It is recommended one sets up the following -dontwarn entry in the application's proguard-ruleset file:

-dontwarn java.util.concurrent.Flow*

For R8, the RxJava jar includes the META-INF/proguard/rxjava3.pro with the same no-warning clause and should apply automatically.

Further reading

For further details, consult the wiki.

Communication

Versioning

Version 3.x is in development. Bugfixes will be applied to both 2.x and 3.x branches, but new features will only be added to 3.x.

Minor 3.x increments (such as 3.1, 3.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 3.x.y increments (such as 3.0.0 -> 3.0.1, 3.3.1 -> 3.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an @Beta or @Experimental annotation can also be added in the patch releases to allow rapid exploration and iteration of unstable new functionality.

@Beta

APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

@Experimental

APIs marked with the @Experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

@Deprecated

APIs marked with the @Deprecated annotation at the class or method level will remain supported until the next major release but it is recommended to stop using them.

io.reactivex.rxjava3.internal.*

All code inside the io.reactivex.rxjava3.internal.* packages are considered private API and should not be relied upon at all. It can change at any time.

Full Documentation

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Gradle:

implementation 'io.reactivex.rxjava3:rxjava:x.y.z'

and for Maven:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex.rxjava3" name="rxjava" rev="x.y.z" />

Snapshots

Snapshots are available via https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}

JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot

Build

To build:

$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build

Further details on building can be found on the Getting Started page of the wiki.

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright (c) 2016-present, RxJava Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Comments
  • Idiomatic Scala Support

    Idiomatic Scala Support

    As of version 0.11.0 Scala support is provided through the use of implicits. Conversations on Twitter are bringing up other possible improvements. Let's use this issue to discuss.

    opened by benjchristensen 96
  • 2.x: Handling null values

    2.x: Handling null values

    With the upcoming RxJava2 release one of the important changes is that null is no longer accepted as a stream element.

    Honestly, I have mixed feelings about this change and part of me understands that it will enforce clean APIs, but I can see a number of use cases when this might be a problem.

    For instance, in my app I have an in-memory cache:

    @Nullable CacheItem findCacheItem(long id);
    

    CacheItem might not be present in cache, so method might return null value.

    The way it is used with Rx* - is as following:

    Observable<CacheItem> getStream(final long id) {
        return Observable.fromCallable(new Callable<CacheItem>() {
            @Override public CacheItem call() throws Exception {
                return findCacheItem(id);
            }
        });
    }
    

    So with this approach, I might get null in my stream which is totally valid situation, so it is handled properly on receiving side - let's say UI changes its state if item is not present in cache:

    Observable.just(user)
              .map(user -> user.getName())
              .map(name -> convertNameToId(name))
              .flatMap(id -> getStream(id))
              .map(cacheItem -> getUserInfoFromCacheItem(cacheItem))
              .subscribe(
                  userInfo -> {
                      if(userInfo != null) showUserInfo();
                      else showPrompt();
                  }
              );
    

    With RxJava2 I am no longer allowed to post null down the stream, so I either need to wrap my CacheItem into some other class and make my stream produce that wrapper instead or make quite big architectural changes.

    Wrapping every single stream element into nullable counterpart doesn't look right to me.

    Am I missing something fundamental here?

    It seems like the situation like mine is quite popular, so Im curious what is the recommended strategy to tackle this problem given new "no null" policy in RxJava2?

    Question 
    opened by paveldudka 89
  • OnSubscribeRedo - fix race conditions

    OnSubscribeRedo - fix race conditions

    While searching for the cause of #2863 I bumped into this race condition (which doesn't fix #2863):

    If a request is made between L238 and L239 then consumerCapacity may become Long.MAX_VALUE on arriving at L239 in which case we don't wish to decrement it. To fix, used compareAndSet.

    What is interesting about this fix is that in the test loop of 5000 in OperatorRetryTest I see many more occurrences of the failure on average (3 -> 50) presumably because the extra time to perform the compareAndSet action has expanded the window for the race condition causing the failures.

    Bug 
    opened by davidmoten 83
  • Observer reference is not released on unsubscribe

    Observer reference is not released on unsubscribe

    With the following code http://pastebin.com/VUZ5aApe and using the latest version of rxjava (0.18.4),

    rotations should result in the activity calling unsubscribe on my subscription.

    This should release the reference to my observer, which should in turn release the reference to my activity.

    However, the memory tests I've done show that as I rotate the phone, my activity count keeps going up, which means the reference to my activity is not being released.

    (let me know if the pastebin expires for some reason, I'll get something back up)

    opened by fdoyle 74
  • Roadmap to 1.0

    Roadmap to 1.0

    I want everyone to know where we're heading and what's left before we hit 1.0.

    From the beginning we have allowed ourselves to do breaking changes on each 0.x release as we were aware that the design was not finished when we started this project. We are getting close to being done. Our goal is to release 1.0 in the coming months after stabilizing the API so that it can be relied upon without breaking every couple months.

    Project Structure

    When we hit 1.0 we intend on splitting out the language adaptors into their own top-level projects such as RxScala, RxClojure, RxGroovy, RxKotlin, RxJRuby etc.

    This will allow each project to iterate as needed at their own pace, especially since some will need to continue iterating while the RxJava core stabilizes. For example, if RxScala needs breaking changes it can bump it's major version while RxJava does not. This is particularly important to RxScala for handling changes such as Scala 2.10 -> 2.11 -> 2.12 etc.

    Major contrib modules will also be moved out, such as RxAndroid which also needs its own life-cycle.

    Outstanding Work

    The major items of work to be finished before 1.0 are:

    • ~~Backpressure: https://github.com/Netflix/RxJava/issues/1000~~ [Completed]
    • ~~Serialization Behavior: https://github.com/Netflix/RxJava/issues/998~~ [Completed in new merge implementation]
    • ~~Scheduler API: https://github.com/Netflix/RxJava/issues/997~~ Completed
    • ~~Remove all deprecated methods/classes~~ [Completed] #1053 #1621
    • ~~Finish migrating all operators to using lift and chaining the Subscription via Subscriber (current priority)~~ [Completed]

    The primary goal is to nail down the public API. New functionality can come in 1.1, 1.2, etc. The secondary goal is for all operators to work as advertised (regarding unsubscribe, back pressure and non-blocking). There will always be bugs, that's why 1.x.y will still be active after release, but the desire is to not need to ship 2.x soon after 1.x as this is a low level library that once entrenched becomes hard to migrate (we create significant pain at Netflix on each 0.x release).

    Going Forward

    Please comment if you feel there are other critical things to achieve before 1.0. The fastest way to getting us to 1.0 is helping us achieve the work stated above.

    Information 
    opened by benjchristensen 59
  • Version 0.17.0 Release Notes [Preview]

    Version 0.17.0 Release Notes [Preview]

    #0.17.0 Release Notes

    Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.

    The new signatures related to Observable in this release are:

    // A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
    public final static <T> Observable<T> create(OnSubscribe<T> f)
    
    // The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
    public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
    
    // Subscriber is an Observer + Subscription
    public abstract class Subscriber<T> implements Observer<T>, Subscription
    
    // The main `subscribe` behavior receives a Subscriber instead of Observer
    public final Subscription subscribe(Subscriber<? super T> observer)
    
    // Subscribing with an Observer however is still appropriate
    // and the Observer is automatically converted into a Subscriber
    public final Subscription subscribe(Observer<? super T> observer)
    
    // A new 'lift' function allows composing Operator implementations together
    public <R> Observable<R> lift(Func1<Subscriber<? super R>, Subscriber<? super T>> lift)
    
    

    Also changed is the Scheduler interface which is much simpler:

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.

    As shown in the code above the changes fall into 2 major sections:

    1) Lift/OnSubscribe/Subscriber

    Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.

    2) Schedulers

    Simplification of the Scheduler interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.

    Lift/OnSubscribe/Subscriber

    New types Subscriber and OnSubscribe along with the new lift operator have been added. The reasons and benefits are as follows:

    1) Synchronous Unsubscribe

    RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Observer<? super Integer> Observer) {
            for (int i = 1; i < 1000000; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
    

    Subscribing to this Observable will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10).

    Version 0.17.0 fixes this issue by injecting the Subscription into the OnSubscribe function to allow code like this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            // we now receive a Subscriber instead of Observer
            for (int i = 1; i < 1000000; i++) {
                // the OnSubscribe can now check for isUnsubscribed
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    
    });
    

    Subscribing to this will now correctly only emit 10 onNext and unsubscribe:

    // subscribe with an Observer
    oi.take(10).subscribe(new Observer<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
        }
    
    })
    

    Or the new Subscriber type can be used and the Subscriber itself can unsubscribe:

    // or subscribe with a Subscriber which supports unsubscribe
    oi.subscribe(new Subscriber<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
            if(t >= 10) {
                // a Subscriber can unsubscribe
                this.unsubscribe();
            }
        }
    
    })
    

    2) Custom Operator Chaining

    Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable is using static methods. This has meant code like this:

    MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
    

    In reality we want:

    observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
    

    Using the newly added lift we can get quite close to this:

    observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
    

    Here is how the proposed lift method looks if all operators were applied with it:

    Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
    

    Along with the lift function comes a new Operator signature:

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
    

    All operator implementations in the rx.operators package will over time be migrated to this new signature.

    3) Simpler Operator Implementations

    The lift operator injects the necessary Observer and Subscription instances (via the new Subscriber type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription is available in-scope there are no awkward coding patterns needed for creating a Subscription, closing over it and returning and taking into account synchronous vs asynchronous.

    For example, the body of fromIterable is simply:

    public void call(Subscriber<? super T> o) {
        for (T i : is) {
            if (o.isUnsubscribed()) {
                return;
            }
            o.onNext(i);
        }
        o.onCompleted();
    }
    

    The take operator is:

    public final class OperatorTake<T> implements Operator<T, T> {
    
        final int limit;
    
        public OperatorTake(int limit) {
            this.limit = limit;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> o) {
            CompositeSubscription parent = new CompositeSubscription();
            if (limit == 0) {
                o.onCompleted();
                parent.unsubscribe();
            }
            return new Subscriber<T>(parent) {
    
                int count = 0;
                boolean completed = false;
    
                @Override
                public void onCompleted() {
                    if (!completed) {
                        o.onCompleted();
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    if (!completed) {
                        o.onError(e);
                    }
                }
    
                @Override
                public void onNext(T i) {
                    if (!isUnsubscribed()) {
                        o.onNext(i);
                        if (++count >= limit) {
                            completed = true;
                            o.onCompleted();
                            unsubscribe();
                        }
                    }
                }
    
            };
        }
    
    }
    

    4) Recursion/Loop Performance

    The fromIterable use case is 20x faster when implemented as a loop instead of recursive scheduler (see https://github.com/Netflix/RxJava/commit/a18b8c1a572b7b9509b7a7fe1a5075ce93657771).

    Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.

    Schedulers

    Schedulers were greatly simplified to a design based around Action1<Inner>.

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This design change originated from three findings:

    1. It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.

    To solve this the new design explicitly has the outer Scheduler and then Scheduler.Inner for recursion.

    1. The passing of state is not useful since scheduling over network boundaries with this model does not work.

    In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler that attempted to use observeOn to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.

    1. The number of overloads with different ways of doing the same things were confusing.

    This new design removes all but the essential and simplest methods.

    1. A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.

    This new design applies similar principles as done with lift/create/OnSubscribe/Subscriber and injects the Subscription via the Inner interface so a running task can check isUnsubscribed().

    WIth this new design, the simplest execution of a single task is:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
        }
    
    });
    

    Recursion is easily invoked:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
            inner.schedule(this);
        }
    
    });
    

    The use of Action1<Inner> on both the outer and inner levels makes it so recursion that refer to this and it works easily.

    Similar to the new lift/create pattern with Subscriber the Inner is also a Subscription so it allows efficient loops with unsubscribe support:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    

    An action can now unsubscribe the Scheduler.Inner:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                int i = doOtherWork();
                if(i > 100) {
                    // an Action can cause the Scheduler to unsubscribe and stop
                    inner.unsubscribe();
                }
            }
        }
    
    });
    

    Typically just stopping is sufficient:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            }
        }
    
    });
    

    but if other work in other tasks is being done and you want to unsubscribe conditionally you could:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            } else {
                inner.unsubscribe();
            }
        }
    
    });
    

    and the recursion can be delayed:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed ... but delay the recursion
            inner.schedule(this, 500, TimeUnit.MILLISECONDS);
        }
    
    });
    

    The methods on the Inner never return a Subscription because they are always a single thread/event-loop/actor/etc and controlled by the Subscription returned by the initial Scheduler.schedule method. This is part of clarifying the contract.

    Thus an unsubscribe controlled from the outside would be done like this:

    Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    
    // unsubscribe from outside
    s.unsubscribe();
    

    Migration Path

    1) Lift/OnSubscribe/Subscriber

    The lift function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber type is also additive and for most use cases does not need to be used directly, the Observer interface can continue being used.

    The previous create(OnSubscribeFunc f) signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.

    Code such as this:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(Observer<? super Integer> o) {
            o.onNext(1);
            o.onNext(2);
            o.onCompleted();
            return Subscriptions.empty();
        }
    });
    

    should change to this:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onCompleted();
        }
    });
    

    If concurrency was being injected:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(final Observer<? super Integer> o) {
            final BooleanSubscription s = new BooleanSubscription();
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (s.isUnsubscribed()) {
                        o.onNext(i++);
                    }
                }
    
            });
            t.start();
            return s;
        }
    });
    

    you may no longer need it and can implement like this instead:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    });
    

    or can just simplify the Subscription management:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
            t.start();
        }
    });
    

    or use a Scheduler:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Schedulers.io().schedule(new Action1<Inner>() {
    
                @Override
                public void call(Inner inner) {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
        }
    });
    

    or use subscribeOn which now works to make synchronous Observables async while supporting unsubscribe (this didn't work before):

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    }).subscribeOn(Schedulers.newThread());
    

    2) Schedulers

    Custom Scheduler implementations will need to be re-implemented and any direct use of the Scheduler interface will also need to be updated.

    3) Subscription

    If you have custom Subscription implementations you will see they now need an isUnsubscribed() method.

    You can either add this method, or wrap your function using Subscriptions.create and it will handle the isUnsubscribed behavior and execute your function when unsubscribe() is called.

    The Future...

    This is hopefully the last of the major refactors to rxjava-core and we're approaching version 1.0. We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create/subscribe signatures as we want and the Subscription and Scheduler interfaces are now clean.

    We need to improve on some of the Subject implementations still, particularly ReplaySubject. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.

    We appreciate your usage, feedback and contributions and hope the library is creating value for you!

    opened by benjchristensen 57
  • Proposed Scheduler Interface Change for 0.18 (yes, again)

    Proposed Scheduler Interface Change for 0.18 (yes, again)

    Reviewing the Scheduler interface changes of 0.17 with @headinthebox revealed that we're not 100% happy with the outcome, particularly after learning that Java 8 does not allow referencing this from within a lambda.

    The Scheduler interface as of 0.17 is:

    class Scheduler {
        public abstract Subscription schedule(Action1<Scheduler.Inner> action);
        public abstract Subscription schedule(Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
        public Subscription scheduleRecursive(Action1<Recurse> action);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public int degreeOfParallelism();
        public long now();
    
        public static final class Recurse {
            public void schedule() {
            public void schedule(long delay, TimeUnit unit) {
        }
    
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    We have determined two problems with this:

    1. Inner/Outer Dance

    In practice we have found that usage is always one of two things, either you just interact with the outer and don't care about the Inner, or you immediately need the Inner and have to do an awkward first scheduling just to get access to the Inner. (See here and weep.)

    1. Recursion

    The Action1<Scheduler.Inner> signature was chosen and put on both outer and inner so that an inner class could refer to itself using this to simply reschedule itself from the outer onto the inner.

    It was assumed this would work in Java 8 lambdas but unfortunately we did not prove it.

    This works with anonymous classes:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            System.out.println("do stuff");
            // recurse
            inner.schedule(this);
        }
    
    });
    

    but this does not with lambdas:

    Schedulers.newThread().schedule((inner) -> {
        System.out.println("do stuff");
        inner.schedule(this); // doesn't compile
    });
    

    So we end up with this:

    Schedulers.newThread().scheduleRecursive((recurse) -> {
        System.out.println("do stuff");
        recurse.schedule();
    });
    

    At that point it's clear that Inner is not working well and we have Recurse to fix the problem.

    Thus, the proposed changes (breaking again) are:

    class Scheduler {
        public final Subscription schedule(Action1<Recurse> action);
        public final Subscription schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        public final Subscription schedulePeriodically(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit);
        public abstract Inner createInner(); // for advanced use cases like `observeOn`
        public int degreeOfParallelism();
        public long now();
    
        // now the primary interface
        public static final class Recurse {
            public final void schedule();
            public final void schedule(long delay, TimeUnit unit);
            public final void schedule(Action1<Recurse> action);
            public final void schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        }
    
        // now mostly an implementation detail except for advanced use cases
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Recurse> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Recurse> action);
            public long now();
        }
    }
    

    The name of Recurse is up for debate. It may be possible to merge Recurse and Inner but I haven't figured it out yet. The reason is that Inner is a single instance representing a thread or event-loop whereas Recurse represents an Action or work. Thus a given Inner could have multiple Recurse actions scheduled on to it. It is being an Action that allows it to recurse by invoking schedule() that just reschedules itself.

    This would make it better support Java 8 lambdas and simply recursion, while also better supporting (via the createInner() method) the more complicated use cases like observeOn where current code is very awkward.

    This needs to be the last refactor of this so we nail it down and stop breaking things and can get to 1.0.

    Let the discussion begin ...

    opened by benjchristensen 55
  • Profiling Memory Usage and Object Creation

    Profiling Memory Usage and Object Creation

    We need to spend time profiling memory and object allocation and finding places where we can improve.

    I would really appreciate help diving into this and finding problem areas. Even if you don't fix them, but just identity use cases, operators, etc that would be very valuable.

    This is partly a result of the fact that in Netflix production we have seen an increase in YoungGen GCs since 0.17.x.

    The areas to start should probably be:

    • Observable.create
    • Observable.lift
    • Subscriber
    • CompositeSubscription
    • map
    • flatMap

    If you can or want to get involved in this please comment here so we all can collaborate together.

    opened by benjchristensen 54
  • Experimental Proposal of rx.Task

    Experimental Proposal of rx.Task

    Adds rx.Task as a "scalar Observable" for representing work with a single return value.

    See https://github.com/ReactiveX/RxJava/issues/1594 rx.Future/Task

    This provides a type similar to Future in that it represents a scalar unit of work, but it is lazy like an Observable and many Tasks can be combined into an Observable stream. Note how Task.zip returns Task<R> whereas Task.merge returns Observable<R>.

    NOTE: This is for experimentation and feedback at this time.

    Items requiring review and work that I'm particularly aware of:

    • naming of OnExecute
    • naming of TaskObserver (this one in particular I don't like)
    • design and implementation of Task.Promise
    • should the public lift use the Observable.Operator or should that only be for internal reuse?
    • should we have a public lift that uses a Task.Operator?
    • the Task.toObservable implementation right now is efficient but will likely break something so it likely needs to change to use subscribe
    • implementation of this merge variant: Task<T> merge(final Task<? extends Task<? extends T>> source)
    • several operators currently just wrap as Observable to reuse existing operators ... is that okay performance wise?
    • Javadocs

    Examples of using this class:

    import rx.Observable;
    import rx.Task;
    import rx.Task.Promise;
    
    public class TaskExamples {
    
        public static void main(String... args) {
            // scalar synchronous value
            Task<String> t1 = Task.create(t -> {
                t.onSuccess("Hello World!");
            });
    
            // scalar synchronous value using helper method
            Task<Integer> t2 = Task.just(1);
    
            // synchronous error
            Task<String> error = Task.create(t -> {
                t.onError(new RuntimeException("failed!"));
            });
    
            // executing
            t1.subscribe(System.out::println);
            t2.subscribe(System.out::println);
            error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));
    
            // scalar Tasks for request/response like a Future
            getData(1).subscribe(System.out::println);
            getDataUsingPromise(2).subscribe(System.out::println);
    
            // combining Tasks into another Task
            Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);
    
            // combining Tasks into an Observable stream
            Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
            Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));
    
            zipped.subscribe(v -> System.out.println("zipped => " + v));
            merged.subscribe(v -> System.out.println("merged => " + v));
            mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
        }
    
        /**
         * Example of an async scalar execution using Task.create
         * <p>
         * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
         *
         * @param arg
         * @return
         */
        public static Task<String> getData(int arg) {
            return Task.create(s -> {
                new Thread(() -> {
                    try {
                        Thread.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // deliver value
                        s.onSuccess("Data=" + arg);
                    }).start();
            });
        }
    
        /**
         * Example of an async scalar execution using a Task.Promise
         * <p>
         * This shows how an eager (hot) process would work like using a Future.
         *
         * @param arg
         * @return
         */
        public static Task<String> getDataUsingPromise(int arg) {
            Task.Promise<String> p = Promise.create();
    
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    p.onSuccess("Data=" + arg);
                }).start();
    
            return p.getTask();
        }
    }
    
    Enhancement 
    opened by benjchristensen 50
  • Stacktraces and subscribeOn/observeOn

    Stacktraces and subscribeOn/observeOn

    RxJava in not nice when it comes to stacktraces.

    The worst thing is observeOn that schedules execution on different threads. Every time we use observeOn we get our stacktrace from the process root, all the call history gets erased.

    The idea is to make schedulers save the caller's stack, wrap action calls and attach the stacktrace to each exception that has been thrown by scheduled actions.

    Pros: traceable exceptions Cons: Performance cost Leaks because of recursion

    I think that we can have Schedulers.io(boolean trace) alternative that will save the stacktrace, so we could call it Schedules.io(DEBUG) to turn stacktracing off on production for performance critical parts.

    How do you guys find this idea?

    Question 
    opened by konmik 46
  • Adding super/extends so that Observable is covariant

    Adding super/extends so that Observable is covariant

    Ok, so this pull request changes a lot of lines. It's mostly generalizing all the FuncXs to be used like FuncX[-T1, -T2, ..., -TX, +R] (contravariant parameters, covariant return type) and all the Observers to be used "in a contravariant way". A few of the Observable uses are covariant, now, too (mostly zip).

    This is the pull request for #326.

    This doesn't look very good in the code (thanks Java). Also, it doesn't seem to make Scala interop easier at all (at least not yet).

    Please take a look. I'm not exactly happy with the result. - Maybe I'm doing something wrong here? - I've still got hope that there's an easier way...

    The pull request compiles and tests ok for me (except for the Clojure module, but that's another story and not due to my changes).

    opened by jmhofer 46
  • Bump actions/cache from 3.0.11 to 3.2.2

    Bump actions/cache from 3.0.11 to 3.2.2

    Bumps actions/cache from 3.0.11 to 3.2.2.

    Release notes

    Sourced from actions/cache's releases.

    v3.2.2

    What's Changed

    New Contributors

    Full Changelog: https://github.com/actions/cache/compare/v3.2.1...v3.2.2

    v3.2.1

    What's Changed

    Full Changelog: https://github.com/actions/cache/compare/v3.2.0...v3.2.1

    v3.2.0

    What's Changed

    New Contributors

    ... (truncated)

    Changelog

    Sourced from actions/cache's changelog.

    3.0.11

    • Update toolkit version to 3.0.5 to include @actions/core@^1.10.0
    • Update @actions/cache to use updated saveState and setOutput functions from @actions/core@^1.10.0

    3.1.0-beta.1

    • Update @actions/cache on windows to use gnu tar and zstd by default and fallback to bsdtar and zstd if gnu tar is not available. (issue)

    3.1.0-beta.2

    • Added support for fallback to gzip to restore old caches on windows.

    3.1.0-beta.3

    • Bug fixes for bsdtar fallback if gnutar not available and gzip fallback if cache saved using old cache action on windows.

    3.2.0-beta.1

    • Added two new actions - restore and save for granular control on cache.

    3.2.0

    • Released the two new actions - restore and save for granular control on cache

    3.2.1

    • Update @actions/cache on windows to use gnu tar and zstd by default and fallback to bsdtar and zstd if gnu tar is not available. (issue)
    • Added support for fallback to gzip to restore old caches on windows.
    • Added logs for cache version in case of a cache miss.

    3.2.2

    • Reverted the changes made in 3.2.1 to use gnu tar and zstd by default on windows.
    Commits
    • 4723a57 Revert compression changes related to windows but keep version logging (#1049)
    • d1507cc Merge pull request #1042 from me-and/correct-readme-re-windows
    • 3337563 Merge branch 'main' into correct-readme-re-windows
    • 60c7666 save/README.md: Fix typo in example (#1040)
    • b053f2b Fix formatting error in restore/README.md (#1044)
    • 501277c README.md: remove outdated Windows cache tip link
    • c1a5de8 Upgrade codeql to v2 (#1023)
    • 9b0be58 Release compression related changes for windows (#1039)
    • c17f4bf GA for granular cache (#1035)
    • ac25611 docs: fix an invalid link in workarounds.md (#929)
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies github_actions 
    opened by dependabot[bot] 1
  • ThreadDeath is deprecated for removal in Java 20

    ThreadDeath is deprecated for removal in Java 20

    src/main/java/io/reactivex/rxjava3/exceptions/Exceptions.java:70: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
            } else if (t instanceof ThreadDeath) {
                                    ^
    src/main/java/io/reactivex/rxjava3/exceptions/Exceptions.java:71: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
                throw (ThreadDeath) t;
                       ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:143: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
        @Test(expected = ThreadDeath.class)
                         ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:164: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
                    throw new ThreadDeath();
                              ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:179: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
                Exceptions.throwIfFatal(new ThreadDeath());
                                            ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:181: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
            } catch (ThreadDeath ex) {
                     ^
    
    Enhancement 3.x 
    opened by akarnokd 0
  • 3.x: Add onDropped callbacks to operators

    3.x: Add onDropped callbacks to operators

    Add onDropped callback overloads to most operators that drop items that can't be recovered by other means.

    • [x] throttleLatest PR #7457
    • [x] throttleFirst PR #7482
    • [x] throttleLast / sample PR #7488
    • [ ] throttleWithTimeout / debounce (PR TBD)
    • [ ] onBackpressureLatest (PR TBD)
    • [ ] onBackpressureBuffer (PR TBD)
      • 📓 The current overloads with Action are somewhat unhelpful, however, we'll have to add overloads in a way that avoids lambda ambiguity.
    Enhancement 3.x 
    opened by akarnokd 7
  • Feature Request: Add unfold construct to Flowable/Observable

    Feature Request: Add unfold construct to Flowable/Observable

    Version 3.1.4

    Scala as of 2.13 and Akka (another reactive framework) have added various unfold patterns, and I'm finding a need for this currently. Here's the use case - pagination of responses to a "cursor"-like REST api.

    Essentially, I'd like to make a Flowable that makes requests to get another page - where it starts without a "continuation" parameter, and keeps making requests (honoring backpressure) until the response comes without a "continuation".

    Flowable.create doesn't seem to deal with the backpressure well, and Flowable.generate is blocking, which seems disappointing as Flowable.fromFuture is great at dealing with async http clients.

    This unfold pattern seems to offer what would be very helpful to the cause - where a new request isn't made until the subscriber asks for more, and the termination state is known from some inspection of the previous element in the feed (the http response, or the json parsing of it).

    Here's a stab of making these using RxJava constructs; it would be nice if Flowable.unfold and Flowable.unfoldMaybe (and Observable too) could be added to RxJava itself.

    import java.util.Optional;
    import java.util.function.Function;
    
    import io.reactivex.rxjava3.core.Flowable;
    import io.reactivex.rxjava3.core.Maybe;
    import io.reactivex.rxjava3.core.MaybeSource;
    
    public class Unfold {
        public static class Pair<L, R> {
            private final L left;
            private final R right;
    
            public Pair(L left, R right) {
                this.left = left;
                this.right = right;
            }
    
            public L getLeft() {
                return left;
            }
    
            public R getRight() {
                return right;
            }
        }
    
        public static <S, E> Flowable<E> flowableUnfold(S state, Function<S, Optional<Pair<S, E>>> generator) {
            return Flowable.defer(() -> Flowable.fromOptional(generator.apply(state)))
                    .flatMap(pair -> Flowable.just(pair.getRight())
                            .concatWith(flowableUnfold(pair.getLeft(), generator)));
        }
    
        public static <S, E> Flowable<E> flowableUnfoldMaybe(S state, Function<S, ? extends MaybeSource<Pair<S, E>>> generator) {
            return Flowable.defer(() -> Flowable.fromMaybe(generator.apply(state)))
                    .flatMap(pair -> Flowable.just(pair.getRight())
                            .concatWith(flowableUnfoldMaybe(pair.getLeft(), generator)));
        }
    }
    

    And some scala to test it

    @RunWith(classOf[JUnitRunner])
    class UnfoldTest extends AnyFlatSpec with Matchers {
      "flowableUnfold" should "work with range of 0 to 10" in {
        val flowable = Unfold.flowableUnfold[(Int, Int), Int]((0, 10), {
          case (start: Int, end: Int) if start < end =>
            Option(new Pair(start + 1 -> end, start)).asJava
          case _ => None.asJava
        })
        flowable.toList.blockingGet().asScala shouldBe (0 until 10)
      }
    
      "flowableUnfoldMaybe" should "work with range of 0 to 10" in {
        val flowable = Unfold.flowableUnfoldMaybe[(Int, Int), Int]((0, 10), {
          case (start: Int, end: Int) if start < end =>
            Maybe.fromCallable(() => new Pair(start + 1 -> end, start))
          case _ => Maybe.empty()
        })
        flowable.toList.blockingGet().asScala shouldBe (0 until 10)
      }
    }
    
    Feature-Request 3.x 
    opened by scr-oath 17
  • UnicastSubjectTest > fusedNoConcurrentCleanDueToCancel FAILED

    UnicastSubjectTest > fusedNoConcurrentCleanDueToCancel FAILED

    io.reactivex.rxjava3.subjects.UnicastSubjectTest > fusedNoConcurrentCleanDueToCancel FAILED
        org.junit.runners.model.TestTimedOutException: test timed out after 5 minutes
            at io.reactivex.rxjava3.subjects.UnicastSubject.drainFused(UnicastSubject.java:416)
            at io.reactivex.rxjava3.subjects.UnicastSubject.drain(UnicastSubject.java:464)
            at io.reactivex.rxjava3.subjects.UnicastSubject.onNext(UnicastSubject.java:319)
            at io.reactivex.rxjava3.subjects.UnicastSubjectTest.fusedNoConcurrentCleanDueToCancel(UnicastSubjectTest.java:481)
    
    Test-Failures 3.x Investigating 
    opened by akarnokd 0
Releases(v3.1.5)
  • v3.1.5(Jun 1, 2022)

  • v3.1.4(Mar 21, 2022)

  • v3.1.3(Nov 23, 2021)

  • v3.1.2(Oct 12, 2021)

    Maven JavaDocs

    Compatibility

    • Add full Java 9 module descriptor. (#7241)

    Bugfixes

    • Fix missing nullability on Single.subscribe(BiConsumer). (#7331)

    Documentation

    • Fix javadoc wording of {Publish|Behavior}Processor::offer(). (#7328)
    • Indicate takeUntil stops on completion of other. (#7341)

    Other

    • Update assert messages format to be compliant with GradleRunner and JUnitRunner. (#7345)
    Source code(tar.gz)
    Source code(zip)
  • v3.1.1(Aug 30, 2021)

    Maven JavaDocs

    API promotions

    • The operator fusion-related interfaces and two atomic queue implementations have been promoted to standard, thus officially supported in the io.reactivex.rxjava3.operators package. (#7320)

    Bugfixes

    • Specify proper OSGi unique bundle symbolic name of io.reactivex.rxjava3.rxjava. (#7319)
    • Fix ExecutorScheduler initializing Schedulers prematurely when using RxJavaPlugins.createExecutorScheduler. (#7323)
    • Fix the LamdbaConsumerIntrospection of Completable's lambda-based observer to use the same missing onError indicator as the other types' lambda-based consumers. (#7326)
    Source code(tar.gz)
    Source code(zip)
  • v3.1.0(Aug 9, 2021)

    Maven JavaDocs

    :warning: With this release, the minimum required Android API level is API 21 (Android 5.0).

    :warning: Note that the 3.0.x patch line won't be developed further.

    API promotions

    • Flowable.onBackpressureReduce() + 1 (#7296)
    • RxJavaPlugins.getOnParallelSubscribe() and RxJavaPlugins.setOnParallelSubscribe() (#7296)
    • TestScheduler([...] boolean useOnScheduleHook) (#7296)

    API additions

    • subscribe([...], DisposableContainer) for better Disposable management and reference cleanup. (#7298)
    • RxJavaPlugins.createExecutorScheduler() for creating an Executor-based Scheduler before the Schedulers class (and thus the standard schedulers) gets initialized. (#7306)

    Behavior changes

    • The scheduler purge thread has been removed. Removing cancelled timed operations is now managed by the setRemoveOnCancelPolicy of the underlying ScheduledExecutorService. (#7293)

    Documentation

    • Fixed wording of the fair parameter of Schedulers.from. (#7301)
    • Update withLatestFrom javadoc about upstream early complete (#7289)

    Other

    • @NonNull annotations on generic type arguments were made consistent across. (#7302, #7303)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.13(Jun 1, 2021)

    Maven JavaDocs

    :warning: RxJava is now signed with a new private key. The new public key fingerprint is 1D9AA7F9E1E2824728B8CD1794B291AEF984A085.

    Documentation

    • Fix wording of *OnSubscribe interfaces (#7274)

    Other

    • Mitigated the security risks caused by the Codecov backdoor (#7237).
    • Improve the build process (#7225, #7253, #7255, #7257, #7258, #7260, #7261, #7262, #7264, #7263)
    • Upgrade to Gradle 7.0 (#7259)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.13-RC5(May 26, 2021)

  • v3.0.13-RC4(May 3, 2021)

  • v3.0.13-RC3(Apr 26, 2021)

  • v3.0.13-RC2(Apr 17, 2021)

  • v3.0.13-RC1(Apr 17, 2021)

  • v3.0.12(Apr 8, 2021)

    Maven JavaDocs

    Bugfix

    • CompositeException.printStackTrace to write directly into PrintStream/PrintWriter. (#7212)

    Documentation

    • Fix wrong reference in Single.flattenStreamAsObservable javadoc. (#7206)
    • Fix style violating Javadoc. (#7210)

    Other

    • Fix POM_URL (#7214)
    • Upgrade Gradle to 6.8.3 (#7208)
    • Bump gradle to 6.8.3 & optimize gradle config (#7207)
    • Added Javadoc checks to Checkstyle. Fix violating Javadoc. (#7210)
    • Modernize gradle plugin block, change maven to maven-publish (#7219)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.12-RC1(Apr 8, 2021)

  • v3.0.11(Mar 6, 2021)

    Maven JavaDocs

    ℹ️ RxJava 2 is now end-of-life (EOL) and no further development or support will be provided by the project.

    Enhancement

    • Add onSubscribe hook to ParallelFlowable operators (#7191)

    Bugfix

    • Allow Single.zip and Maybe.zip result to be garbage collected (#7196)
    • Direct scheduling via Schedulers.from to honor the interruptibleWorker setting (#7203)

    Documentation

    • Fix typos in Schedulers.java (#7178)

    Other

    • Release to Sonatype directly (#7181)
    • Upgrade to Gradle 6.8.2 (#7184)
    • Cleanup of source code headers (#7205)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC5(Feb 16, 2021)

  • v3.0.11-RC4(Feb 13, 2021)

  • v2.2.21(Feb 13, 2021)

    Maven JavaDocs

    :warning: This is the last planned update for the 2.x version line. After February 28, 2021, 2.x becomes End-of-Life (EoL); no further patches, bugfixes, enhancements, documentation or support will be provided by the project.

    Enhancements

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7162)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7170)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC3(Feb 5, 2021)

  • v3.0.11-RC2(Feb 5, 2021)

    Specify the staging profile name to be "io.reactivex" so the close operation finds the repo.

    Unfortunately, there is no other way to test the release process.

    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC1(Feb 5, 2021)

  • v3.0.10(Feb 1, 2021)

    Maven JavaDocs

    Enhancement

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7160)
    • Add TestScheduler option to use onSchedule hook. (#7163)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7169)
    • Add fusion support to concatMap{Maybe|Single|Completable}. (#7165)

    Documentation

    • Update marbles of amb(), ambArray() and ambWith() (#7144)
    • Fix take() mentioning the old limit() operator (#7145)
    • Document Schedulers.from vs. RejectedExecutionException behavior. (#7150)
    • Update documentation for NewThreadWorker.scheduleActual method. (#7164)
    • Improve Javadocs style of Schedulers. (#7168)

    Other

    • onReduceBackpressure internals cleanup (#7151)
    • Workaround for FutureTask.toString recursion on JDK 10+. (#7173)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.9(Dec 30, 2020)

  • v3.0.8(Dec 2, 2020)

  • v3.0.8-RC3(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC2(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC1(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.7(Oct 7, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Observable.toFlowable(ERROR) not cancelling on MissingBackpressureException. (#7083)
    • Fix Flowable.concatMap backpressure with scalars. (#7089)

    Documentation

    • fromRunnable/fromAction javadoc improvements. (#7071)
    • Patch out duplicate @NonNull annotation in generated javadocs. (#7073)
    • Clarify the documentation for scan operators. (#7093)
    Source code(tar.gz)
    Source code(zip)
  • v2.2.20(Oct 6, 2020)

    Maven JavaDocs

    :warning: The 2.x version line is now in maintenance mode and will be supported only through bugfixes until February 28, 2021. No new features, behavior changes or documentation adjustments will be accepted or applied to 2.x. It is recommended to migrate to 3.x within this time period.

    Bugfixes

    • Fix Observable.flatMap with maxConcurrency hangs (#6960)
    • Fix Observable.toFlowable(ERROR) not cancelling upon MissingBackpressureException (#7084)
    • Fix Flowable.concatMap backpressure with scalars. (#7091)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.6(Aug 20, 2020)

Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
Powerful event-bus optimized for high throughput in multi-threaded applications. Features: Sync and Async event publication, weak/strong references, event filtering, annotation driven

MBassador MBassador is a light-weight, high-performance event bus implementing the publish subscribe pattern. It is designed for ease of use and aims

Benjamin Diedrichsen 931 Dec 23, 2022
Kotlin/JVM compensated summation of Double sequences to calculate sum, mean, standard deviation

precise Implements compensated summation for sequences of Double. Reduces rounding errors associated with limited precision of floating-point numbers.

Artёm IG 0 Apr 20, 2022
Run Kotlin/JS libraries in Kotlin/JVM and Kotlin/Native programs

Zipline This library streamlines using Kotlin/JS libraries from Kotlin/JVM and Kotlin/Native programs. It makes it possible to do continuous deploymen

Cash App 1.5k Dec 30, 2022
Sample app to try compose and showcase principles from Composing (UI) beyond the UI

Composing clocks sample app This is a sample app to show how to build an app that follows the practices described in the series of articles Compose (U

Jordi Saumell 84 Dec 15, 2022
Postman is a reactive One-tap SMS verification library. This library allows the usage of RxJava with The SMS User Consent API

What is Postman? Postman is a reactive One-tap SMS verification library. This library allows the usage of RxJava with The SMS User Consent API Usage P

Cafer Mert Ceyhan 129 Dec 24, 2022
Showcase project of Functional Reactive Programming on Android, using RxJava.

FunctionalAndroidReference FunctionalAndroidReference is a showcase project of Functional Reactive Programming on Android, using RxJava. It's a compan

Paco 278 Nov 18, 2022
Pet project using Clean Architecture + MVVM + Reactive Extensions + Android Architecture Components. The data are fetched from LondonTheatreDirect API. 🎭

Theatre Pet project using Clean Architecture + MVVM + Reactive Extensions + Android Architecture Components. The data is fetched from LondonTheatreDir

André Mion 646 Jan 9, 2023
🔥The Android Startup library provides a straightforward, performant way to initialize components at the application startup. Both library developers and app developers can use Android Startup to streamline startup sequences and explicitly set the order of initialization.

??The Android Startup library provides a straightforward, performant way to initialize components at the application startup. Both library developers and app developers can use Android Startup to streamline startup sequences and explicitly set the order of initialization.

Rouse 1.3k Dec 30, 2022
Event-driven application uses React, reactive Spring Boot WebFlux, R2DBC, MySQL and Liquibase

Product delivery Event-driven application uses React, reactive Spring Boot WebFlux, R2DBC, MySQL and Liquibase Status: IN PROGRESS if [[ "" != `docker

Maksim Kostromin 2 Aug 17, 2022
Reactive extensions for SimpleNoSQL

RxSimpleNoSQL Reactive extensions for SimpleNoSQL. Manipulate entities using Observables and Completables. Examples Suppose we have the following enti

xmartlabs 37 Aug 29, 2021
Reactive extensions for SimpleNoSQL

RxSimpleNoSQL Reactive extensions for SimpleNoSQL. Manipulate entities using Observables and Completables. Examples Suppose we have the following enti

xmartlabs 37 Aug 29, 2021
A Kotlin library used to analyse discrete Markov chains, in order to generate plausible sequences

Markov Markov is a Kotlin library used to analyse discrete Markov chains, in order to generate plausible sequences. Using This project is still under

Xavier F. Gouchet 0 Nov 14, 2021
This prototype app provides a list of events to be held under an organization (school, college, club, etc.) and the users can manually set event reminders at their scheduled time so that they do not miss an event.

E-CELL NITS Sample App This prototype app provides a list of events to be held under E-Cell NIT Silchar (for example, Srijan 2.0) and the users can ma

Ritam Nath 1 Nov 7, 2021
LiteGo is a Java-based asynchronous concurrency library. It has a smart executor, which can be freely set the maximum number of concurrent at same time , and the number of threads in waiting queue. It can also set waiting policies and overload strategies.

LiteGo:「迷你」的Android异步并发类库 LiteGo是一款基于Java语言的「异步并发类库」,它的核心是一枚「迷你」并发器,它可以自由地设置同一时段的最大「并发」数量,等待「排队」线程数量,还可以设置「排队策略」和「超载策略」。 LiteGo可以直接投入Runnable、Callable

马天宇 189 Nov 10, 2022
Event pattern & event properties framework

Renetik Android - Event & Property https://github.com/renetik/renetik-android-event Documentation Framework to enjoy, improve and speed up your applic

Renetik 2 Nov 2, 2022
Create beautiful film credit sequences—without pain.

Cinecred Create beautiful film credit sequences—without pain. Visit the website at https://loadingbyte.com/cinecred/ for further information about the

Felix Mujkanovic 7 Dec 24, 2022
Gits-android-extensions - A collection of Kotlin extensions to simplify Android development

gits-android-extensions A collection of Kotlin extensions to simplify Android de

GITS Indonesia 3 Feb 3, 2022
This project aims to simplify creation of basic Arduino programs by just editing a UI on Android.

ArdUI A video explanation If you are more a fun of video explanation go to this youtube video Project aim This project aims to simplify creation of ba

Targist 21 Nov 19, 2022
Tool for exporting Old School RuneScape environments so they can be used in 3D modeling programs like Blender.

OSRS Environment Exporter Tool for exporting Old School RuneScape environments so that they can be used in 3D modeling programs like Blender. Download

Connor 21 Dec 29, 2022