java – Thread-safe state check

Question:

  • Task: There are clients that connect to the server and periodically perform some operations. If for some time the client has not performed a single operation, then in the database its status changes to offline. When the operation is resumed, the status in the database changes to online. Clients live in different streams.
  • Load:

    • About 100-200 clients
    • average frequency of operations of the order of a second or two
    • idle time: 5 minutes
  • Algorithm:

    • There is a Map whose key is the client ID, and the value is the timeout. Clients shift this time each time they access the server.

    • There is a timer that runs once a minute, runs through the list of clients, finds those who have timed out, removes them from the list and changes their status in the database.

  • Implementation:

     public class ActiveClientManager implements Consumer<Integer> { private final Map<Integer, Long> clients; private final PoolDataSource pds; private final long timeout; private final Object dbMonitor; public ActiveClientManager(PoolDataSource pds, long timeout) { clients = new ConcurrentHashMap<>(); this.pds = pds; this.timeout = timeout; dbMonitor = new Object(); } // Действие клиента @Override public void accept(Integer clientID) { // Время после которого, клиент уходит в оффлайн long offlineTime = System.currentTimeMillis() + timeout; // Вставляем нового клиента, или обновляем существующего if (clients.put(clientID, offlineTime) == null) { // Если была произведена вставка, то меняем статус в базе try { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) { stmt.setInt(1, clientID); // Перекрываем кислород таймеру synchronized (dbMonitor) { stmt.executeUpdate(); } } } } catch (SQLException e) { e.printStackTrace(); } } } // Обработка события таймера public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); // Сюда занесем всех клиентов, которые уже отвалились List<Integer> removeClients = new ArrayList<>(); clients.replaceAll((key, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента removeClients.add(key); // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); // если есть кого удалять if (removeClients.size() > 0) { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) { // блокируем добавление нового клиента synchronized (dbMonitor) { for (Integer client : removeClients) { // если клиент не появился опять в общем списке if (!clients.containsKey(client)) { // то сбрасываем ему статус stmt.setInt(1, client); stmt.executeUpdate(); } } } } } } } }

Actually a question – whether I missed, something in synchronization?

update

It turned out that the design

clients.replaceAll((key, val) -> {
    if (val <= currentTime) {
        // если время уже прошло, то сохраняем нашего клиента
        removeClients.add(key);
        // и удаляем его из общего списка
        return null;
    }
    // иначе ничего не трогаем
    return val;
});

does not work. If null is returned from the lambda , then the method throws a NullPointerException. Rewrote like this

clients.entrySet().removeIf((entry) -> {
    if (entry.getValue() <= currentTime) {
        removeClients.add(entry.getKey());
        return true;
    }
    return false;
});

Answer:

By synchronizing on a single dbMonitor object, you limit the parallelism of your solution. That is, clients will "wait" on the database even if they are with different clientId . If you use the built-in synchronization in ConcurrentHashMap , then you get a lot of "parallelism" of the ActiveClientManager 'a.

But of course, it is better to check the performance with tests. Try to create a manager and call the accept , testTimeout from different threads.

I do not pretend to be the absolute truth of the decision, I only pointed out what caught my eye.

Example:

public class ActiveClientManager implements Consumer<Integer> {
    private final Map<Integer, Long> clients = new ConcurrentHashMap<>();
    private final PoolDataSource pds;
    private final long timeout;

    public ActiveClientManager(PoolDataSource pds, long timeout) {
        this.pds = pds;
        this.timeout = timeout;
    }

    // Действие клиента
    @Override
    public void accept(Integer clientID) {
        // Время после которого, клиент уходит в оффлайн
        long offlineTime = System.currentTimeMillis() + timeout;
        // Вставляем нового клиента, или обновляем существующего
        // синхронизируемся только если один и тот же clientID
        clients.compute(clientID, (oldVal, newVal) -> {
            if (oldVal == null) {
                try {
                    try (Connection con = pds.getConnection()) {
                        try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) {
                            stmt.setInt(1, clientID);
                            stmt.executeUpdate();
                        }
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            return offlineTime;
        });
    }

    // Обработка события таймера
    public void testTimeout() throws SQLException {
        long currentTime = System.currentTimeMillis();
        // Сюда занесем всех клиентов, которые уже отвалились
        // синхронизируемся на конерктном client
        clients.replaceAll((client, val) -> {
            if (val <= currentTime) {
                // если время уже прошло, то сохраняем нашего клиента
                try (Connection con = pds.getConnection()) {
                    try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) {
                        // блокируем добавление нового клиента
                        stmt.setInt(1, client);
                        stmt.executeUpdate();
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
                // и удаляем его из общего списка
                return null;
            }
            // иначе ничего не трогаем
            return val;
        });
    }
}

Update According to the second implementation, it is evident that you have not decided what to do if the database does not work (or returns errors). In the first case – accept – you collect a list of exceptions, and try to update "at least some" clients, even if the database starts to "fail". And in the second case – testTimeout just log the message to the console. If I were you, I would throw RuntimeException in both cases, because it seems not logical to continue working with a non-working database. But it depends on how you deal with exceptions above in the code.

About two cycles, it is logical not to create a connection to the database if it is not needed. But two identical test conditions only complicate the code. And in the future, if you need to change the condition, you will have to change it in two places. I would leave connection creation inside.

It also seems that it is logical to move the code for working with the database into two separate methods or even combine them into one.

There is a feeling that the Consumer interface is superfluous here. This suggests an interface with two methods get(), put(). It is also not clear from the code who will call the testTimeout method. It seems that this method should be called by the manager himself, after a certain period of time, because this is his area of ​​​​responsibility – keeping client statuses up to date. Then you should create, for example, Executors.newScheduledThreadPool(1) , and in it a thread that will call the private method testTimeout .

updated version

public class ActiveClientManager implements Consumer<Integer> {
    private final Map<Integer, Long> clients = new ConcurrentHashMap<>();;
    private final PoolDataSource pds;
    private final long timeoutMSec;
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException {
        this.pds = pds;
        this.timeoutMSec = timeoutMSec;
        executorService.scheduleWithFixedDelay(this::testTimeout, 5 ,5, TimeUnit.MINUTES);
    }

    private void testTimeout() {
        long currentTime = System.currentTimeMillis();

        clients.entrySet().removeIf((entry) -> {
            // если время уже прошло
            if (entry.getValue() < currentTime) {
                // обновляем БД
                updateState(entry.getKey(), State.notactive);
                // удаляем этот элемент
                return true;
            }
            return false;
        });
    }

    private void updateState(Integer clientId, State state) {
        try (Connection con = pds.getConnection();
             PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = " + state.getSqlState()  + " WHERE id = ?");) {
            stmt.setInt(1, clientId);
            stmt.executeUpdate();
        } catch (SQLException e) {
          throw new RuntimeException(e);
        }
    }

    @Override
    public void accept(Integer clientID) {
        long offlineTime = System.currentTimeMillis() + timeoutMSec;
        clients.compute(clientID, (key, oldval) -> {
            // Если запись новая - обновляем БД
            if (oldval == null) {
                updateState(clientID, State.active);
            }
            return offlineTime;
        });
    }

    public enum State {
        active("1"), notactive("NULL");

        private final String sqlState;

        State(String sqlState) {
            this.sqlState = sqlState;
        }

        public String getSqlState() {
            return sqlState;
        }
    }
}
Scroll to Top