Reactive Extension (Rx) Adaptor for Netty

Related tags

Networking RxNetty
Overview

Project Status

2018-02-014

  • 1.0.x will be the RxNetty-2 update. It is currently pending an RFC for API changes.
  • 0.5.x is the current release branch. This is no longer under active development but will have major patches applied and will accept pull requests.
  • 0.4.x is now considered legancy and will only have critical patches applied.

Branch Status

This is the current branch for RxNetty and is now API stable.

Motivations

Motivations and detailed status of the breaking changes in 0.5.x can be found here RxNetty

Download Average time to resolve an issue Percentage of issues still open

Reactive Extension (Rx) Adaptor for Netty

Getting Started

The best place to start exploring this library is to look at the examples for some common usecases addressed by RxNetty.

A very simple HTTP server example can be found here and the corresponding HTTP client is here

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Maven:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxnetty-http</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex" name="rxnetty-http" rev="x.y.z" />

and for Gradle:

implementation 'io.reactivex:rxnetty-http:x.y.z'
Unintentional release artifacts

There are two artifacts in maven central 0.5.0 and 0.5.1 which were unintentionally released from 0.4.x branch. Do not use them. More details here

Build

To build:

$ git clone https://github.com/ReactiveX/RxNetty.git -b 0.5.x
$ cd RxNetty/
$ ./gradlew build

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright 2014 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Comments
  • Metrics

    Metrics

    In order to use RxNetty effectively in production, we would require to have some metrics providing insights into the clients and servers created by RxNetty. Currently, there isn't any metrics published by RxNetty.

    Pluggability

    Since, RxNetty strives to be not opinionated on many aspects but for the fact that it uses Netty for the network stack and RxJava for async interfaces. It would not be prudent for us to bind to a particular metrics framework like servo, yammer metrics, etc. Instead we should provide hooks into the system for anyone to be able to provide a plugin for various metrics framework.

    Hooks

    There are a few ways of providing hooks for the metrics system.

    Event based

    This essentially means that we publish events for any interesting state change inside RxNetty via Observables, from where people can infer these metrics. Current RxClient connection pool implementation provides such a hook here

    Advantages

    The advantage of this approach is that we do not have to maintain any state inside RxNetty and would not face problems around holding this metrics around overflow which without a metrics framework would create more work for us. One subtle advantage is that we do not have to upfront decide for everyone, which metrics are useful, people can infer newer metrics from the available events.

    Disadvantages

    The disadvantage is that for every plugin, people have to re-invent the wheel from the point of view of understanding these events and correlating them to infer these metrics. This gets complex when the metrics are to be inferred from a set of events instead of one.

    Data based

    We can also provide a data based plugin mechanism, wherein we store all the metrics of interest and provide a hook to transform and send these metrics to the metric framework of choice.

    Advantages

    • Lesser things to do for every plugin provider.
    • No one has to understand the infrastructure to create the metrics.

    Disadvantages

    • We would have to create basic metrics constructs like rolling counters, percentiles, etc. which comes for free with any metrics framework.

    Required metrics

    Client

    All required client side metrics are defined in the issue #96

    Server

    All required server side metrics are defined in the issue #97

    enhancement 
    opened by NiteshKant 25
  • "java.io.IOException: Connection closed by peer before sending a response" when using a timeout on the Observable

    Hi,

    I'm currently using rxnetty as a proxy server as explained in issue #405. Lately, I wanted to add a global timeout on this service:

    ...
                .compose(myTransformer) // This does some filtering and may return a ProxyResponse early to avoid the call to the final endpoint
                .switchIfEmpty(
                    content.compose(o -> finalEndpoint.submit(uri, o))
                )
                .timeout(TIMEOUT, TimeUnit.MILLISECONDS, Observable.just(TIMEOUT_RESPONSE))
                .flatMap(proxyResponse -> {
                    response.setStatus(proxyResponse.status);
                    if (proxyResponse.headers != null) {
                        for (Entry<String, String> header : proxyResponse.headers) {
                            response.getHeaders().add(header.getKey(), header.getValue());
                        }
                    }
                    return proxyResponse.content
                        .map(ByteBuf::retain)
                        .<Void> map(bb -> {
                            response.write(bb);
                            return null;
                        });
                })
    ...
    

    Adding this gets me the following error on our canary but I don't seem to be able to reproduce it on our dev server.

    java.io.IOException: Connection closed by peer before sending a response.
            at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.<clinit>(ClientRequestResponseConverter.java:91)
            at io.reactivex.netty.protocol.http.client.ClientRequiredConfigurator.configureNewPipeline(ClientRequiredConfigurator.java:42)
            at io.reactivex.netty.pipeline.PipelineConfiguratorComposite.configureNewPipeline(PipelineConfiguratorComposite.java:55)
            at io.reactivex.netty.pipeline.PipelineConfiguratorComposite.configureNewPipeline(PipelineConfiguratorComposite.java:55)
            at io.reactivex.netty.client.RxClientImpl$2.initChannel(RxClientImpl.java:127)
            at io.netty.channel.ChannelInitializer.channelRegistered(ChannelInitializer.java:68)
            at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelRegisteredNow(ChannelHandlerInvokerUtil.java:32)
            at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRegistered(DefaultChannelHandlerInvoker.java:50)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRegistered(AbstractChannelHandlerContext.java:114)
            at io.netty.channel.DefaultChannelPipeline.fireChannelRegistered(DefaultChannelPipeline.java:833)
            at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:503)
            at io.netty.channel.AbstractChannel$AbstractUnsafe.access$100(AbstractChannel.java:417)
            at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:477)
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:356)
            at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
            at java.lang.Thread.run(Thread.java:745)
    

    Am I doing something wrong with the timeout ? Is there some other way to set up a timeout ? My main concern is to have a timeout on the service but I thought I could apply it to the whole proxy server as all in all that's what i'd like to do.

    Thanks

    opened by Crystark 17
  • Connection pool refactoring

    Connection pool refactoring

    Made the following changes to ChannelPool:

    • Improved the abstraction earlier provided by ChannelPool by converting it to a "ConnectionPool" to pool ObservableConnection. This fits in better with the rest of the code.
    • Modified the interface to "assume" that a pool connects to one server. This also fits in better with the existing client abstraction.
    • Provided a shutdown method on the pool. (Issue #80, #81)
    • Modified the pool stats by not storing any monotonically increasing values as they overflow. Instead provided various callbacks which can be used to infer these stats.
    • Removed the use of semaphores as they are easily replaced by an AtomicInteger. Also modified the method to update the max connections limit as atomic increment/decrements rather than a set, which is hard to make thread-safe and consistent without synchronization. (Discussed partially under issue #83)
    • Fixed issue #82, #76 and #79

    There are three pending tasks after this change:

    • Provide a "cleanup thread" (using a threadpool) to cleanup idle connections.
    • Provide a composite pool stats object encompassing multiple connection pools.
    • Provide another PoolLimitDeterminationStrategy for max connections across pools which can be used to enforce connection limits over a pool of servers.
    opened by NiteshKant 17
  • RxNetty client hangs on creating a new connection *sometimes*

    RxNetty client hangs on creating a new connection *sometimes*

    We use RxNetty (0.5) in production for some of our non-critical services as a way of trying it out. So far we're loving it, but we did encounter an issue with the RxNetty client. In our test suite, some tests posting data using RxNetty client are blocked indefinitely. This seems to be entirely chance based. After digging a little deeper it seems that whenever one of these test fail RxNetty internally blocks on creating a new HTTP connection. In particular HttpClientRequestImpl#OnSubscribeFuncImpl.

    public OnSubscribeFuncImpl(final TcpClient<?, HttpClientResponse<O>> client, RawRequest<I, O> rawRequest) {
        this.client = client;
        ConnToResponseFunc<I, O> connToResponseFunc = new ConnToResponseFunc<>(rawRequest);
        Observable<HttpClientResponse<O>> source = this.client.createConnectionRequest()
                .doOnNext(x -> System.out.println("Established connection!"))        // This never gets printed when the test hangs
                .take(1)
                .switchMap(connToResponseFunc);
    
        if (null != rawRequest.getRedirector()) {
            source = source.switchMap(rawRequest.getRedirector());
        }
    
        this.source = source;
    }
    

    Digging even deeper we get to the ClientState class. I added some logging statements here as well to see what's happening. But in all cases all log statements get printed, suggesting that the subscriber is successfully subscribed.

    @Override
    public ConnectionObservable<R, W> newConnection(final SocketAddress hostAddress) {
        return ConnectionObservable.createNew(new OnSubcribeFunc<R, W>() {
                private final ListenersHolder<ClientEventListener> listeners = new ListenersHolder<>();
    
                @Override
                public void call(final Subscriber<? super Connection<R, W>> subscriber) {
                    //TODO: Optimize this by not creating a new bootstrap every time.
                    System.out.println("Bootstrapping...");                                // This gets printed
                    final Bootstrap nettyBootstrap = newBootstrap(listeners);
    
                    System.out.println("Connecting...");                                   // This gets printed
                    final ChannelFuture connectFuture = nettyBootstrap.connect(hostAddress);
    
                    System.out.println("Adding listener...");                              // This gets printed
                    connectFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            System.out.println("Operation completed...");                  // This gets printed
                            connectFuture.channel()
                                    .pipeline()
                                    .fireUserEventTriggered(new ClientConnectionSubscriberEvent<>(connectFuture, subscriber));
                            System.out.println("Fired event...");                          // This gets printed
                        }
                    });
                }
    
                @Override
                public Subscription subscribeForEvents(ClientEventListener eventListener) {
                    System.out.println("Subscribed for events...");                        // This gets printed
                    return listeners.subscribe(eventListener);
                }
            });
        }
    }
    

    We're pretty sure it's due to the client, because we've tested RxNetty client against both RxNetty server, and a RestEasy server. In both cases we encounter these problems. When using the RestEasy client against RxNetty server we don't have this issue.

    I'll investigate further, but if you have any ideas/suggestions they're more than welcome :smile:

    Tested on OSX 10.10.2 and Ubuntu 14.04 with Java 1.8.0_66-b17.

    Updates

    • It seems that even though the ClientConnectionSubscriberEvent is fired in the ClientState class, in case of the blocked tests, this event is never received by the ClientConnectionToChannelBridge class, whereas when the tests succeed this event is received.
    opened by michaeldejong 16
  • Handling multiple HTTP based protocols on a single server.

    Handling multiple HTTP based protocols on a single server.

    In some offline discussions and on pull request #196 it came to the fore that serving multiple higher level protocols (SSE, Websockets, plain request-response) over HTTP on a single HTTP server is not very intuitive.

    The following is the usecase:

    • Lets assume we have a single HTTP server on port 8080
    • URI /myapp/ws serves Web sockets traffic.
    • URI /myapp/sse serves SSE traffic.
    • URI /myapp/req-resp serves a simple HTTP based request-response traffic.

    In order to implement a RequestHandler for the above routing capabilties, one would have to do something like:

     new RequestHandler<ByteBuf, ByteBuf>() {
         @Override
         public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                        final HttpServerResponse<ByteBuf> response) {
             String uri = request.getUri();
             if (uri.startsWith("myapp/sse")) {
                 ChannelPipeline pipeline =
                         response.getChannelHandlerContext().pipeline();
                 // Configure pipeline for SSE
             } else {
                 printRequestHeader(request);
                 response.writeString("Welcome!!");
                 return response.close(false);
             }
         }
     }
    

    There are multiple issues here:

    • The above is non-intuitive i.e. how to get a handle of netty's pipeline.
    • The existing PipelineConfigurator implementations assume that they are only called at the creation of the pipeline and not incrementally during request processing. eg: The SSE configurator is:
            serverPipelineConfigurator.configureNewPipeline(pipeline);
            pipeline.addLast(SSE_ENCODER_HANDLER_NAME, SERVER_SENT_EVENT_ENCODER);
            pipeline.addLast(SSE_RESPONSE_HEADERS_COMPLETER, new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    if (HttpServerResponse.class.isAssignableFrom(msg.getClass())) {
                        @SuppressWarnings("rawtypes")
                        HttpServerResponse rxResponse = (HttpServerResponse) msg;
                        String contentTypeHeader = rxResponse.getHeaders().get(CONTENT_TYPE);
                        if (null == contentTypeHeader) {
                            rxResponse.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                        }
                    }
                    super.write(ctx, msg, promise);
                }
            });
    

    It configures the underlying (HTTP) configurator first.

    • The pipeline.addLast() semantics will not work as RxNetty adds its own handlers at the end of the pipeline and that is an invariant that has to be maintained in order for the callbacks to work correctly.
    opened by NiteshKant 16
  • Handling read timeout

    Handling read timeout

    Netty's handling of read timeout is not very intuitive: you need to add a ReadTimeoutHandler in the pipeline. However, channel read can still happen even if ReadTimeoutHandler has thrown out a ReadTimeoutException down the pipe.

    In the blocking socket, setting read timeout ensures that socket resources will be released after the timeout is passed. How does the read timeout play a role in non-blocking socket?

    Do we still care about read timeout ? Should it be handled in RxNetty, Ribbon, or through Observable APIs by the caller?

    opened by allenxwang 14
  • Seeing weird occasional response times

    Seeing weird occasional response times

    Hi,

    I'm seeing some response times that cannot be expected (around 1 second occasionally). Started seeing these on our regular tests with our services even with really low concurrent requests. Bumping up the number of concurrent requests even above 10 starts to give a lot of timeouts.

    I managed to make a small example to reproduce it and I see the high response times even with only 1 thread sending requests. Am I using it completely wrong or is this expected?

    Small test program:

      public static void main(String[] args) throws Exception
        {
            HttpClient<ByteBuf, ByteBuf> rxNetty = RxNetty.createHttpClient("127.0.0.1", 1337);
    
            HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(1337, (HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
                response.setStatus(HttpResponseStatus.OK);
                response.writeString("OK");
                return response.close();
            });
            server.start();
            //Just set the logging to INFO to not log too much
            LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
            Configuration config = ctx.getConfiguration();
            LoggerConfig loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME);
            loggerConfig.setLevel(Level.INFO);
            ctx.updateLoggers();
    
            List<Thread> threads = new ArrayList<>();
            int threadCount = 1;
            for (int i = 0; i < threadCount; i++)
            {
                Runnable runnable = new Runnable()
                {
                    @Override
                    public void run()
                    {
                        for (int x = 0; x < 10000; x++)
                        {
                            try
                            {
                                long start = System.currentTimeMillis();
                                HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.createGet("/test");
    
                                Observable<String> observable = rxNetty.submit(httpRequest)
                                        .flatMap((HttpClientResponse<ByteBuf> r) ->
                                                r.getContent().map(b -> b.toString(StandardCharsets.UTF_8))
                                        );
                                String response = observable.toBlocking().last();
                                long dur = System.currentTimeMillis() - start;
                                if (dur > 100)
                                {
                                    System.out.println(x + ":" + dur);
                                }
                            }
                            catch (Exception ex)
                            {
                                ex.printStackTrace();
                            }
                        }
                    }
                };
                Thread t = new Thread(runnable);
                t.start();
                threads.add(t);
            }
            for (Thread t : threads)
            {
                t.join();
            }
            server.shutdown();
        }
    
    opened by OskarKjellin 13
  • SSE Output Wrong

    SSE Output Wrong

    I am trying to create a Turbine server (https://github.com/Netflix/Turbine/tree/2.x) that outputs a stream to the Hystrix dashboard (https://github.com/Netflix/Hystrix/tree/master/hystrix-dashboard). The output when using ServerSentEvent does not work.

    Here is the manually written code that works and then the one using ServerSentEvent that doesn't work.

    // this outputs correct data
    return response.writeStringAndFlush("data: " + JsonUtility.mapToJson(data) + "\n\n");
    // this doesn't ...
    // return response.writeAndFlush(new ServerSentEvent("", "data", JsonUtility.mapToJson(data)));
    

    It appears related work was done in https://github.com/ReactiveX/RxNetty/issues/141 but the comments in that one suggest it may be the cause of this issue.

    Using ServerSentEvent it emits like this:

    event: 
    data: {"currentCorePoolSize":1070,"current ..
    

    what is needed is:

    data: {"currentCorePoolSize":1070,"current ..
    
    opened by benjchristensen 13
  • problems using netflix wrapper for rxnetty

    problems using netflix wrapper for rxnetty

    I am using the netflix client that wrapps rxnetty.

    After running this code:

    rx.Observable<HttpClientResponse<ByteBuf>> responseObservable = httpClient.submit(httpRequest, retryHandler, clientConfig);
    

    I then try to read the response content:

    HttpClientResponse response = responseObservable.toBlocking().first();
    ByteBuf buf = response.getContent().toBlocking().first();
                int length = buf.readableBytes();
    
                if (buf.hasArray()) {
                    responseBody = buf.array();
                } else {
                    responseBody = new byte[length];
                    buf.readBytes(responseBody);
                }
    

    this sometimes works and sometimes fails on:

    io.netty.util.IllegalReferenceCountException: refCnt: 0

    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1190)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1176)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1172)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:693)
    at io.netty.buffer.CompositeByteBuf.readBytes(CompositeByteBuf.java:1496)
    at io.netty.buffer.CompositeByteBuf.readBytes(CompositeByteBuf.java:42)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:701)
    at io.netty.buffer.CompositeByteBuf.readBytes(CompositeByteBuf.java:1491)
    at io.netty.buffer.CompositeByteBuf.readBytes(CompositeByteBuf.java:42)
    
    not a bug 
    opened by yairogen 12
  • Required Client side metrics

    Required Client side metrics

    As there was no insight initially on which metrics should be provided. This comment is updated post-implementation to provide information about the available metrics. Following metrics will be available for out of the box servo metrics plugin.

    TCP
    • Live Connections: The number of open connections from this clients to a server. This is a gauge.
    • Connection count: The total number of connections ever created by this client. This is a monotonically increasing counter.
    • Pending Connections: The number of connections that are pending. This is a gauge.
    • Failed connects: Total number of connect failures.
    • Connection times: Time taken to establish a connection.
    • Pending connection close: Number of connections which are requested to be closed but are not yet closed. This is a gauge.
    • Failed connection close: Number of times when the connection close failed. This is a monotonically increasing counter.
    • Pending pool acquires: For clients with a connection pool, the number of acquires that are pending. This is a gauge.
    • Failed pool acquires: For clients with a connection pool, the number of acquires that failed. This is a monotonically increasing counter.
    • Pool acquire times: For clients with a connection pool, time taken to acquire a connection from the pool.
    • Pending pool releases: For clients with a connection pool, the number of releases that are pending. This is a gauge.
    • Failed pool releases: For clients with a connection pool, the number of releases that failed. This is a monotonically increasing counter.
    • Pool releases times: For clients with a connection pool, time taken to release a connection to the pool.
    • Pool acquires: For clients with a connection pool, the total number of acquires from the pool.
    • Pool evictions: For clients with a connection pool, the total number of evictions from the pool.
    • Pool reuse: For clients with a connection pool, the total number of times a connection from the pool was reused.
    • Pool releases: For clients with a connection pool, the total number of releases to the pool.
    • Pending Writes: Writes that are pending to be written over the socket. This includes writes which are not flushed. This is a gauge.
    • Pending Flushes: Flushes that are issued but are not over yet. This is a gauge.
    • Bytes Written: Total number of bytes written. This is a monotonically increasing counter.
    • Write Times: The time taken to finish a write.
    • Bytes Read: The total number of bytes read. This is a monotonically increasing counter.
    • Failed Writes: The total number of writes that failed. This is a monotonically increasing counter.
    • Failed Flushes: The total number of flushes that failed. This is a monotonically increasing counter.
    • Flush times: The time taken to finish a flush.
    HTTP

    HTTP contains all the metrics that are available from TCP. The following metrics are specific to HTTP:

    • Request backlog: The number of requests that have been submitted but not started processing. This is a gauge.
    • Inflight requests: The number of requests that have been started processing but not yet finished processing. This is a gauge.
    • Processed Requests: Total number of requests processed. This is a monotonically increasing counter.
    • Request Write Times: Time taken to write requests, including headers and content.
    • Response Read Times: Time taken to read a response.
    • Failed Responses: Total number of responses that failed i.e. for which the requests were sent but response was an error.
    • Failed request writes: Total number of requests for which the writes failed.
    UDP

    UDP contains all the metrics that are available from TCP.

    enhancement 
    opened by NiteshKant 11
  • Server onError Handling

    Server onError Handling

    The server does not have default error handling when Observable<Void> calls onError.

    Right now I have to manually do it or errors are swallowed:

                        return testEndpoint(request, response).onErrorFlatMap(error -> {
                            return writeError(request, response, "Unknown error: " + error.getMessage());
                        });
    
    opened by benjchristensen 11
  • Improve GRADLE build Performance

    Improve GRADLE build Performance

    Parallel builds. This project contains multiple modules. Parallel builds can improve the build speed by executing tasks in parallel. We can enable this feature by setting org.gradle.parallel=true.

    Configuration on demand. Configuration on demand tells Gradle to configure modules that only are relevant to the requested tasks instead of configuring all of them. We can enable this feature by setting org.gradle.configureondemand=true.

    gradle caching. Shared caches can reduce the number of tasks you need to execute by reusing outputs already generated elsewhere. This can significantly decrease build times. We can enable this feature by setting org.gradle.caching=true.

    Gradle daemon. The Daemon is a long-lived process that help to avoid the cost of JVM startup for every build. Since Gradle 3.0, Gradle daemon is enabled by default. For an older version, you should enable it by setting org.gradle.daemon=true.

    ===================== If there are any inappropriate modifications in this PR, please give me a reply and I will change them.

    opened by xiayingfeng 0
  • HTTPS Server with RxNetty and existing certificate

    HTTPS Server with RxNetty and existing certificate

    Hi! I'm trying to write microservice using RxNetty and stuck on implementing HTTPS support for it. I have certificate chain in PEM format and a private key for domain ikeybase.ru, that are already in used in Apache that serves website and in some other services at this domain. I have following code for starting server:

        fun start() {
            server = HttpServer.newServer(environment.serverConfiguration.port)
            environment.serverConfiguration.sslConfiguration.let { ssl ->
                if(ssl != null) {
                    val certFile = File(ssl.certificatePath)
                    val keyFile = File(ssl.privatePath)
                    val sslContext = SslContextBuilder.forServer(certFile.inputStream(), keyFile.inputStream())
                        .sslProvider(SslProvider.JDK)
                        .clientAuth(ClientAuth.REQUIRE)
                        .build()
                    val sslEngine = sslContext.newEngine(UnpooledByteBufAllocator.DEFAULT, "ikeybase.ru", 2228)
                    server = server.secure(sslEngine)
                    server.start { httpRequest, httpResponse ->
                        router.route(httpRequest, httpResponse)
                    }
                } else {
                    server.start { httpRequest, httpResponse ->
                        router.route(httpRequest, httpResponse)
                    }
                }
            }
        }
    

    where ssl.certificatePath is path to PEM ca-bundle and ssl.privatePath is path to private key, converted from *.key file to pk8 format with following command:

    openssl pkcs8 -topk8 -inform PEM -outform PEM -in ikeybase.key -out ikeybase.pk8 -nocrypt
    

    After starting server with sslConfiguration defined, server seems to be started normally, however when I'm trying to access any content, nothing happens. Router.route not called and I got following error at first service query and no output at others, however, when sslConfiguration not defined, server runs perfectly:

    // HTTPS on (sslConfiguration defined):
    curl -i "https://ikeybase.ru:2228/"
    curl: (35) error:0407E086:rsa routines:RSA_verify_PKCS1_PSS_mgf1:last octet invalid
    curl -v "https://ikeybase.ru:2228/"
    *   Trying 94.188.60.102:2228...
    * TCP_NODELAY set
    * Connected to ikeybase.ru (94.188.60.102) port 2228 (#0)
    * ALPN, offering h2
    * ALPN, offering http/1.1
    * successfully set certificate verify locations:
    *   CAfile: /etc/ssl/certs/ca-certificates.crt
      CApath: /etc/ssl/certs
    * TLSv1.3 (OUT), TLS handshake, Client hello (1):
    * TLSv1.3 (IN), TLS handshake, Server hello (2):
    * TLSv1.2 (IN), TLS handshake, Certificate (11):
    * TLSv1.2 (IN), TLS handshake, Server key exchange (12):
    * TLSv1.2 (OUT), TLS alert, decrypt error (563):
    * error:0407E086:rsa routines:RSA_verify_PKCS1_PSS_mgf1:last octet invalid
    * Closing connection 0
    curl: (35) error:0407E086:rsa routines:RSA_verify_PKCS1_PSS_mgf1:last octet invalid
    // HTTPS off (no sslConfiguration defined):
    curl -i "http://ikeybase.ru:2228/"
    HTTP/1.1 200 OK
    content-length: 190
    
    {
      "serverName": "iKey Share",
      "serverVersion": "v0.0.1",
      "instanceName": "iKey Share",
      "httpErrorsAmountSinceStarted": 0,
      "uptime": "00 days : 00 hours : 00 minutes : 16 seconds"
    }
    

    Gradle setup:

        // Server dependencies
        //compile group: 'io.netty', name: 'netty-all', version: '4.1.59.Final'
        compile group: 'io.netty', name: 'netty-all', version: '4.1.31.Final'
        compile 'io.reactivex:rxnetty-http:0.5.3'
        compile 'io.reactivex:rxnetty-tcp:0.5.3'
        compile 'io.reactivex:rxnetty-common:0.5.3'
        compile 'io.reactivex:rxnetty-spectator-http:0.5.3'
        compile 'io.reactivex:rxnetty-spectator-tcp:0.5.3'
        compile 'io.reactivex:rxjava:1.2.+'
    

    How can I use HTTPS certificate and pkey to enable HTTPS support for server side RxNetty? Any help appreciated, thanks in advance!

    opened by ikey-ru 0
  • ResourceLeakDetector,LEAK: ByteBuf.release()

    ResourceLeakDetector,LEAK: ByteBuf.release()

    Hi we are using netty version : 4.1.29.Final Maven dependencies are below

    <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>io.reactivex</groupId> <artifactId>rxnetty</artifactId> <version>0.4.20</version> </dependency> <dependency> <groupId>io.reactivex</groupId> <artifactId>rxnetty-servo</artifactId> <version>0.4.20</version> </dependency> <dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.3.8</version> </dependency> <dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava-computation-expressions</artifactId> <version>0.21.0</version> </dependency>

    Seeing the below exception.

    Tried the below option '-Dio.netty.leakDetectionLevel=advanced' part of advice provided in https://netty.io/wiki/reference-counted-objects.html

    But that didnt help couldnt see which class in our project is causing the leak. May be the leak is in netty library itself? Please suggest on any solutions.

    2018-07-30T07:32:39,230,level=ERROR,thread=rxnetty-nio-eventloop-4-2,category=ResourceLeakDetector,LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: #011io.netty.buffer.PooledByteBufAllocator.newHeapBuffer(PooledByteBufAllocator.java:314) #011io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:166) #011io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:157) #011io.netty.handler.codec.compression.JdkZlibDecoder.decode(JdkZlibDecoder.java:180) #011io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) #011io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) #011io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) #011io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) #011io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:325) #011io.netty.handler.codec.http.HttpContentDecoder.decode(HttpContentDecoder.java:234) #011io.netty.handler.codec.http.HttpContentDecoder.decodeContent(HttpContentDecoder.java:155) #011io.netty.handler.codec.http.HttpContentDecoder.decode(HttpContentDecoder.java:147) #011io.netty.handler.codec.http.HttpContentDecoder.decode(HttpContentDecoder.java:46) #011io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) #011io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) #011io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) #011io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) #011io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) #011io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) #011io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) #011io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) #011io.reactivex.netty.pipeline.InternalReadTimeoutHandler.channelRead(InternalReadTimeoutHandler.java:108) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) #011io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) #011io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) #011io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) #011io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) #011io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) #011io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) #011io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) #011io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) #011io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) #011io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) #011java.lang.Thread.run(Thread.java:748)Collapse

    opened by kotlapraveen 0
  • [SECURITY] unsafeSecure() should not be used in samples

    [SECURITY] unsafeSecure() should not be used in samples

    Insecure example code leads to insecure production code

    By offering "working" code that is insecure by default, you are inherently suggesting that users use insecure coding practices by default.

    For example, the samples recommend the use of unsafeSecure().

    This is itself a security risk to the users of a library.

    opened by JLLeitschuh 1
  • HTTP POST example (REST)

    HTTP POST example (REST)

    Hi all,

    I am after a working HTTP POST example. I can get the TCP example to send data to server but all the HTTP examples are too simplified. Is the POST not working? Anything I tried will lead to an empty content sent to the server.

    I saw you have a Router in the pipeline, when will that be released? My plan is to build a simple REST server so POST, PUT, GET, ... is desired examples.

    Thanks.

    opened by gadieichhorn 4
  • require for documentation around how backpressure works

    require for documentation around how backpressure works

    I want to lean how backpressure works in RxNetty. Could you please provide such documentation as you mentioned in FAQs? I would appreciate it if you can help me

    opened by cytnju 4
Releases(0.5.3)
Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
Lightweight service for creating standalone mock, written in pure Kotlin with Netty container.

MockService The lightweight service for creating a standalone mock, written in pure Kotlin with Netty container. The service allows getting config fil

null 2 Oct 28, 2021
Successor to ProxyBuilder - Uses Spring & Netty for testing proxies, interacting with a local MariaDB.

Successor to ProxyBuilder - Uses Spring & Netty for testing proxies, interacting with a local MariaDB.

Kai o((>ω< ))o 8 Dec 6, 2022
Support extension to use Android ViewBinding quickly and smooth transform from kotlin-synthetic-extension.

ViewBinding Delegate Extension Support extension to use Android ViewBinding quickly and smooth transform from kotlin-synthetic-extension. Binding inst

Cuong V. Nguyen 3 Dec 13, 2021
A lightweight wrapper around SQLiteOpenHelper which introduces reactive stream semantics to SQL operations.

SQL Brite A lightweight wrapper around SupportSQLiteOpenHelper and ContentResolver which introduces reactive stream semantics to queries. Deprecated T

Square 4.6k Jan 5, 2023
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RxJava: Reactive Extensions for the JVM RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-base

ReactiveX 46.8k Jan 5, 2023
Reactive extensions for SimpleNoSQL

RxSimpleNoSQL Reactive extensions for SimpleNoSQL. Manipulate entities using Observables and Completables. Examples Suppose we have the following enti

xmartlabs 37 Aug 29, 2021
Reactive extensions for SimpleNoSQL

RxSimpleNoSQL Reactive extensions for SimpleNoSQL. Manipulate entities using Observables and Completables. Examples Suppose we have the following enti

xmartlabs 37 Aug 29, 2021
Postman is a reactive One-tap SMS verification library. This library allows the usage of RxJava with The SMS User Consent API

What is Postman? Postman is a reactive One-tap SMS verification library. This library allows the usage of RxJava with The SMS User Consent API Usage P

Cafer Mert Ceyhan 129 Dec 24, 2022
A reactive, interface-driven central role Bluetooth LE library for Android

RxCentralBle RxCentralBle provides a simple reactive paradigm for connecting to and communicating with Bluetooth LE peripherals from the central role.

Uber Open Source 198 Nov 29, 2022
Pet project using Clean Architecture + MVVM + Reactive Extensions + Android Architecture Components. The data are fetched from LondonTheatreDirect API. 🎭

Theatre Pet project using Clean Architecture + MVVM + Reactive Extensions + Android Architecture Components. The data is fetched from LondonTheatreDir

André Mion 646 Jan 9, 2023
A Kotlin library for reactive and boilerplate-free SharedPreferences in Android

KPreferences A Kotlin library for reactive and boilerplate-free Shared Preferences in Android. With KPreferences you can use Kotlin's marvelous delega

Mohamad Amin Mohamadi 19 Dec 16, 2020
A Kotlin Android library for content provider queries with reactive streams and coroutines.

Pickpocket An Android library for content provider queries with reactive streams and coroutines. Calendar Contacts SMS MMS Files/Media Call Log Bookma

Chris Basinger 27 Nov 14, 2022
Showcase project of Functional Reactive Programming on Android, using RxJava.

FunctionalAndroidReference FunctionalAndroidReference is a showcase project of Functional Reactive Programming on Android, using RxJava. It's a compan

Paco 278 Nov 18, 2022
Clean Code and Reactive Programming PlayGround for Bangkit 2021

Clean Code and Reactive Programming PlayGround for Bangkit 2021 Hello! This repo contains the IntelliJ project that I use to present my talk, "Clean A

raditya gumay 3 May 16, 2021
Simple Twitter Client just for tweeting, written in Kotlin with reactive MVVM-like approach

Monotweety Simple Twitter Client just for tweeting. Monotweety is also available at F-Droid compatible repository called IzzyOnDroid F-Droid Repositor

Yasuhiro SHIMIZU 110 Nov 11, 2022
A sample skeleton backend app built using Spring Boot kotlin, Expedia Kotlin Graphql, Reactive Web that can be deployed to Google App Engine Flexible environmennt

spring-kotlin-gql-gae This is a sample skeleton of a backend app that was built using: Spring Boot(Kotlin) Reactive Web Sprinng Data R2DBC with MYSQL

Dario Mungoi 7 Sep 17, 2022
My personal template for a Spring Boot REST reactive webapp

My personal spring boot kotlin reactive template Features Spring Security implementation with JWT access and refresh token MongoDB database Project Co

Dušan 2 Dec 1, 2021
A project to learn about Reactive Microservices experimenting with architectures and patterns

reactive-microservices-workshop Copyright © 2021 Aleix Morgadas - Licenced under CC BY-SA 4.0 A project to learn about Reactive Microservices experime

Aleix Morgadas 7 Feb 21, 2022
Veneer: reactive buttons for Jetpack Compose

veneer reactive buttons for Jetpack Compose veneer is a library for reactive buttons. The buttons react depededing upon the roll, pitch and azimuth an

null 17 Jul 3, 2022
Reactive setup with Spring WebFlux , Kotlin, Postgres and Spring Data R2DBC

Reactive Spring with Kotlin and Pg Spring WebFlux with Netty instead of Spring Web with Tomcat Mono and Flux in all layers (controller, service, repo)

Bimal Raj Gyawali 7 Dec 9, 2022