What's new

Closed Tutorial - Java Concurrency with API as Example

Status
Not open for further replies.

pixkit

Honorary Poster
Joined
Mar 30, 2019
Posts
411
Reaction
197
Points
168
The Goal:
  • Show numerous ways of tackling concurrency in Java
  • Show real REST API examples
For this example we will be using JSONPlaceholder's free online REST API which is useful for testing and prototyping.
All the data that's returned are fake data but the endpoints are real. We will be using this endpoint
You do not have permission to view the full content of this post. Log in or register now.

Calling that endpoint returns something like:
Code:
{
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}

Let's create a simple Service class whose sole responsibility is to call this endpoint.

The sole purpose of this class is to call that endpoint and return the response. That's it and nothing else. It's a good example of SRP (Single Responsibility Principle). We pass an arbitrary id.
Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.Charset;

public class Service {

    private final static String API_URL = "https://jsonplaceholder.typicode.com/posts/%s";

    public static String getRecord(String id) throws IOException {
        HttpUriRequest request = RequestBuilder
                .get(String.format(API_URL, id))
                .build();
        CloseableHttpClient client = HttpClientBuilder
                .create()
                .build();
        HttpResponse response;

        try {
            response = client.execute(request);
            InputStream streamResponse = response.getEntity().getContent();
            StringWriter writer = new StringWriter();
            IOUtils.copy(streamResponse, writer, Charset.defaultCharset());
            return writer.toString();
        } finally {
            client.close();
        }
    }
}

We will start by calling the API three times with three different ids. We will call the API sequentially to get the response. All our Sequential class is doing is iterating the values and calling the endpoint. There's nothing magical here.

Sequential calls.
Code:
package concurrency;

public class App {
    public static void main(String[] args) throws Exception {
        String[] ids = {"1", "2", "3"};
        Sequential sequential = new Sequential(ids);
        System.out.println(sequential.getRecords());
    }
}

Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class Sequential {

    private String[] ids;

    public Sequential(String[] ids) {
        this.ids = ids;
    }

    public List<String> getRecords() throws IOException, InterruptedException {
        List<String> store = new ArrayList<>();
        for (String id: ids) {
            store.add(Service.getRecord(id));
        }
        return store;
    }
}
Is this concurrent? Not really. This is sequential but we get three responses so it's not bad.

Here's the response:
Code:
[{
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}, {
  "userId": 1,
  "id": 2,
  "title": "qui est esse",
  "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"
}, {
  "userId": 1,
  "id": 3,
  "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut",
  "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"
}]

Now the question is can I call the API endpoint concurrently (simultaneously) instead of calling them sequentially? Why would I want to do that? Let's assume we have three API endpoints from three different providers: Microsoft's API, Apple's API, VMware's API. All three are independent entities and that means we can call their API independently as well.

If we wanna make our calls concurrent, we will use threads and manage them via an ExecutorService.

Concurrent calls.
Code:
package concurrency;

public class App {
    public static void main(String[] args) throws Exception {
        String[] ids = {"1", "2", "3"};
        Scattered scattered = new Scattered(ids);
        System.out.println(scattered.getRecords());
    }
}

Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Scattered {

    private String[] ids;

    public Scattered(String[] ids) {
        this.ids = ids;
    }

    public List<String> getRecords() {
        ExecutorService executorService = Executors.newFixedThreadPool(ids.length);

        List<String> store = new ArrayList<>();
        for (String id: ids) {
            executorService.execute(new ScatteredWorker(id, store));
        }

        executorService.shutdown();
        return store;
    }
}

class ScatteredWorker implements Runnable {

    private String id;
    private List<String> store;

    public ScatteredWorker(String id, List<String> store) {
        this.id = id;
        this.store = store;
    }

    @Override
    public void run() {
        String response;
        try {
            response = Service.getRecord(this.id);
        } catch (IOException e) {
            response = null;
        }

        store.add(response);
    }
}

The response:
Code:
[]

Wow, it's empty! This is because as soon as the three threads are executed, the program immediately returns to the main thread. The main thread does not wait for the other threads. The other threads will still do their work. We just don't see it as part of the response.

Is this bad? That depends on the use case. Let's say we have an online reporting system where you can request a document and expect to get a response at a later time, i.e a day after, then this kind of concurrency is acceptable. We don't need to return the response immediately and instead return the response later as an email message, text sms, or door-to-door delivery.

But what if we want to return the response and still be concurrent? How do we do that. One way of doing this is by introducing a waiting time via Thread.sleep

Concurrent calls but with Thread.sleep

Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ScatteredThreadSleep {

    private String[] ids;

    public ScatteredThreadSleep(String[] ids) {
        this.ids = ids;
    }

    public List<String> getRecords() {
        ExecutorService executorService = Executors.newFixedThreadPool(ids.length);

        List<String> store = Collections.synchronizedList(new ArrayList<>());

        // Below is not thread-safe
        // List<String> store = new ArrayList<>();

        for (String id: ids) {
            executorService.execute(new ScatteredThreadSleepWorker(id, store));
        }

        executorService.shutdown();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return store;
    }
}

class ScatteredThreadSleepWorker implements Runnable {

    private String id;
    private List<String> store;

    public ScatteredThreadSleepWorker(String id, List<String> store) {
        this.id = id;
        this.store = store;
    }

    @Override
    public void run() {
        String response;
        try {
            response = Service.getRecord(this.id);
        } catch (IOException e) {
            response = null;
        }

        store.add(response);
    }
}

The response
Code:
[{
  "userId": 1,
  "id": 2,
  "title": "qui est esse",
  "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"
}, {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}, {
  "userId": 1,
  "id": 3,
  "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut",
  "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"
}]

Nice. We got three records. Notice, they are not in order. It's because the three threads are running concurrently. So whomever arrives first will be the first record in our list. If you run the program multiple times, you will see the order of record always changes. This is okay if we don't care about the ordering.

By adding Thread.sleep, we essentially force the program to wait for the other threads to complete their task. We also discovered our List<String> store = new ArrayList<>(); is not thread safe. This means multiple threads can potentially overwrite the existing values instead of appending new record. To resolve that we wrapped our list with thread safe list List<String> store = Collections.synchronizedList(new ArrayList<>());

We still have one glaring problem. We paused the main thread for 1000 milliseconds. What if the asynchronous call to the external website takes more than 1000 milliseconds, then it's possible we won't get any response just like the previous example. How do we solve this problem?

One way of addressing this new concern is to use Java's CompletableFuture. The good thing with this one is we don't need a Thread.sleep and we don't need an ExecutorService.

CompletableFuture calls
Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class Completable {

    private String[] ids;

    public Completable(String[] ids) {
        this.ids = ids;
    }

    public List<String> getRecords() {

        List<CompletableFuture<Void>> tasks = new ArrayList<>();

        List<String> store = Collections.synchronizedList(new ArrayList<>());

        // Below is not thread-safe
        // List<String> store = new ArrayList<>();

        for (String id: ids) {
            CompletableFuture<Void> task = CompletableFuture.runAsync(new CompletableWorker(id, store));
            tasks.add(task);
        }

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(CompletableFuture[]::new));
        try {
            allTasks.get();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return store;
    }
}

class CompletableWorker implements Runnable {

    private String id;
    private List<String> store;

    public CompletableWorker(String id, List<String> store) {
        this.id = id;
        this.store = store;
    }

    @Override
    public void run() {
        String response;
        try {
            response = Service.getRecord(this.id);
        } catch (IOException e) {
            response = null;
        }

        store.add(response);
    }
}

The response:
Code:
[{
  "userId": 1,
  "id": 3,
  "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut",
  "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"
}, {
  "userId": 1,
  "id": 2,
  "title": "qui est esse",
  "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"
}, {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}]

Cool. We got a similar response. Again the order is not guaranteed but you will get three items.

But with Java there are multiple ways to tackle the same problem. Let's try to solve the problem using Java's CountDownLatch. CountDownLatch is basically like a counter. Everytime you hit it, the counter decreases and once it reaches the target value, then it triggers the bomb (in this case our payload)

CountDownLatch calls
Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Collected {

    private String[] ids;

    public Collected(String[] ids) {
        this.ids = ids;
    }

    public List<String> getRecords() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(ids.length);
        CountDownLatch latch = new CountDownLatch(ids.length);

        List<String> store = Collections.synchronizedList(new ArrayList<>());

        // Below is not thread-safe
        // List<String> store = new ArrayList<>();

        for (String id: ids) {
            executorService.execute(new CollectedWorker(id, store, latch));
        }

        latch.await();
        executorService.shutdown();
        return store;
    }
}

class CollectedWorker implements Runnable {

    private CountDownLatch latch;
    private String id;
    private List<String> store;

    public CollectedWorker(String id, List<String> store, CountDownLatch latch) {
        this.id = id;
        this.store = store;
        this.latch = latch;
    }

    @Override
    public void run() {
        String response;
        try {
            response = Service.getRecord(this.id);
        } catch (IOException e) {
            response = null;
        }

        store.add(response);
        latch.countDown();;
    }
}

The response
Code:
[{
  "userId": 1,
  "id": 3,
  "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut",
  "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"
}, {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}, {
  "userId": 1,
  "id": 2,
  "title": "qui est esse",
  "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"
}]

Similar response. I suggest you Google online to know more about CountDownLatch. It's really a useful Java construct.
 
Last edited:
Continuation ...

Remember last time where we had to wrap our list with a thread safe list?

Code:
List<String> store = Collections.synchronizedList(new ArrayList<>());

// Below is not thread-safe
// List<String> store = new ArrayList<>();

Are there other ways of achieving the same goal without wrapping this list with a thread safe list? There is! Why would you wanna know this? Well, sometimes you need a screwdriver, or a hammer, and or a saw. All of them can be a weapon but they have special cases where they would be really useful by themselves.

Using our old time favorite synchronized
Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableWithSynchronized {

    private String[] ids;

    public CompletableWithSynchronized(String[] ids) {
        this.ids = ids;
    }

    public List<String> getRecords() {

        List<CompletableFuture<Void>> tasks = new ArrayList<>();

        // Below is not thread safe but we synchronized the add method so it becomes thread safe
        List<String> store = new ArrayList<>();

        for (String id: ids) {
            CompletableFuture<Void> task = CompletableFuture.runAsync(new CompletableWithSynchronizedWorker(id, store));
            tasks.add(task);
        }

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(CompletableFuture[]::new));
        try {
            allTasks.get(5, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return store;
    }
}

class CompletableWithSynchronizedWorker implements Runnable {

    private String id;
    private List<String> store;

    public CompletableWithSynchronizedWorker(String id, List<String> store) {
        this.id = id;
        this.store = store;
    }

    @Override
    public void run() {
        String response;
        try {
            response = Service.getRecord(this.id);
        } catch (IOException e) {
            response = null;
        }

        synchronized (store) {
            store.add(response);
        }

    }
}
[code]

The relevant code is on the latter part. Notice how we wrapped store.add with synchronized. This means no other threads can touch this until this running thread is done. We are essentially forcing other threads to wait!
[code]
synchronized (store) {
            store.add(response);
        }


Another alternative and modern way is using Locks
Code:
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package concurrency;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CompletableWithLock {

    private String[] ids;

    public CompletableWithLock(String[] ids) {
        this.ids = ids;
    }
    private Lock lock = new ReentrantLock();

    public List<String> getRecords() {

        List<CompletableFuture<Void>> tasks = new ArrayList<>();

        // Below is not thread safe but we add a lock on the add method so it becomes thread safe
        List<String> store = new ArrayList<>();

        for (String id: ids) {
            CompletableFuture<Void> task = CompletableFuture.runAsync(new CompletableWithLockWorker(id, store, lock));
            tasks.add(task);
        }

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(CompletableFuture[]::new));
        try {
            allTasks.get(5, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return store;
    }
}

class CompletableWithLockWorker implements Runnable {

    private String id;
    private List<String> store;
    private Lock lock;

    public CompletableWithLockWorker(String id, List<String> store, Lock lock) {
        this.id = id;
        this.store = store;
        this.lock = lock;
    }

    @Override
    public void run() {
        String response;
        try {
            response = Service.getRecord(this.id);
        } catch (IOException e) {
            response = null;
        }

        lock.lock();
        store.add(response);
        lock.unlock();
    }
}

Again the important difference is on the latter part
Code:
        lock.lock();
        store.add(response);
        lock.unlock();

In both examples, we get the same response. We get three items. But why would I use lock over synchronized? Well with locks you can decide where to put the lock and unlock. Which one is better?

There is a saying in chess, "When I am White, I win because I am White. When I am Black, I win because I am Bogolyubov." - Efim Bogoljubov
 
Here's the equivalent NodeJs (JavaScript) implementation:

Code:
const fetch = require('node-fetch');

const API_URL = 'https://jsonplaceholder.typicode.com/posts/';
const ids = [1, 2, 3];

// Aggregates our records
async function getRecords() {
    const list = [];
    for (let id of ids) {
        const response = await getRecord(id);
        list.push(response);
    }
    return list;
}

// Calls the endpoint
async function getRecord(id) {
    const response = await fetch(API_URL+id);
    const json = await response.json();
    return json;
}

// Our main function
getRecords()
    .then( response => console.log(response) );

The response:
Code:
[ { userId: 1,
    id: 1,
    title: 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit',
    body: 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto' },
  { userId: 1,
    id: 2,
    title: 'qui est esse',
    body: 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla' },
  { userId: 1,
    id: 3,
    title: 'ea molestias quasi exercitationem repellat qui ipsa sit aut',
    body: 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut' } ]

NodeJs is a single-threaded environment so we don't have to worry about threads. Honestly this is simpler than all the Java examples we tackled. So pick your poison. I like both but I tend to go with the simpler route if possible.
 
Status
Not open for further replies.
Back
Top