concurrency – Transaction logging algorithm

Question:

While working on the current project, a complex case of a transaction appeared, which should work flawlessly, but in itself is not atomic. The application itself is a small billing service that can scale horizontally and uses a single distributed data store (cassandra).

The application, among other things, operates with the essence of balance:

{
    "accountId": "admin",
    "currencyBalances": {
        "RUB": 200.22,
        "USD": 10
    },
    "createdAt": 0,
    "updatedAt": 1400000000
}

and the deposit or withdrawal operation applied to the balance

{
    "accountId": "admin",
    "currency": "RUB",
    "amount": -100.0
}

When a transaction is created, it should appear in two places: as a record of the transaction itself and as a balance change. The storage does not allow me to do this atomically, and I need an algorithm that will restore the integrity of the system, which falls between two records in the database.

The task itself is trivial (writing about the need to establish integrity for operation X in the log -> creating an X record -> establishing integrity, if the application instance crashes, the log of uncommitted operations is read and played -> the log entry is deleted), but I am limited by some framework applications and related services:

  • The store has a transactional mechanism only for operations on one key, so I cannot atomically make two records at once. Because of this, I cannot update the balance itself and make a record about it in the neighboring table – the system has the right to fall between these two records, just as the storage may be unavailable when one of the records is executed.
  • All entities are stored as a log of manipulations over them (event sourcing). The balance might look like this:

     [ { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000000, "currency": "RUB", "amount": 500, "operationId": "408279ae-ee9a-46f3-a9d2-88b0e13d476e" }, "version": 1 }, { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000001, "currency": "RUB", "amount": 100, "operationId": "87fb18b0-3b01-477f-b959-21b22d54af1f" }, "version": 2 }, { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000002, "currency": "EUR", "amount": 10, "operationId": "464608fd-d737-454a-8278-9c56fa52f9d6" }, "version": 3 } ]

    Which is ultimately transformed by successive application into

     { "accountId": "admin", "currencyBalances": { "RUB": 600, "EUR": 10 }, "updatedAt": 1400000002, "createdAt": 1400000000 }

    This takes away from me the ability to recalculate the entire user's balance: I can only add new manipulations, but I cannot change old ones (this is similar to deleting related entities in an RDBMS). In the same way, I cannot check whether the operation was applied to the balance: I can only get the operationId for a specific manipulation (manipulations are written directly as JSON strings, the storage supports searching only by exact values). I cannot specify all operationId directly in the collected balance entity, because it's a potentially endless list. In addition to the disadvantages described above, this format implies adding in optimistic locking mode (the next version calculated and used in INSERT ... IF NOT EXISTS ), so updates by another process are trivially detected.

  • Distributed locks are theoretically possible, but they can finally finish off all performance, we would like to avoid them while it is possible. Setting any flags on records (eg "this record is currently being edited") is meaningless due to the storage architecture.
  • The application scales horizontally, the exact number of nodes at a particular time is unknown. There is a possibility that a logged operation will be performed by several nodes at once, before this scenario it was accepted as a necessary evil, and all operations were simply idempotent.

Thus, I need to make two records (from the application's point of view, to perform two manipulations), while I have problems in determining whether a second record / manipulation was performed (I can get a manipulation by a specific id / version, but I cannot determine its existence by operationId ). One way or another, I need to implement some algorithm for the journaling system, which will pre-record the upcoming operation, and then can establish the integrity of the system, regardless of the number of its calls.

At the moment, there is an idea for only one algorithm:

  1. It turns out the current version of the balance
  2. The process gets the current record of the operation. If it contains information that it has been fully applied, the process considers that the integrity of the record has been reached and stops.
  3. The process checks for the presence of records in the journal table about which expected version (manipulation) of the operation will be reflected in the balance sheet.
    • If such a record exists, the process checks to see if the corresponding balance sheet version exists.
      • If such a version (manipulation) exists and it really corresponds to the application of the current operation, the process immediately goes to step 5.
      • If such a version exists and it records the application of a different operation, the process overwrites the log entry with the new version.
      • If such a version does not yet exist (and this can only be if the version immediately follows the current one), the process does nothing at this stage.
    • If no such entry exists, the process creates it.
  4. The process tries to create manipulation and tries to apply it to balance. If the manipulation fails, it means that the balance version has changed, and the process must be started from the beginning.
  5. The process updates the activity to indicate that it has been fully applied.

As you can see, the algorithm is absolutely wild, and I am not very sure about the safety of point 3. I am sure that this is all a bicycle design, and there are much better ways to resolve this situation, which, if anyone knows them, I would like to hear …

UPD

Thanks to the magical abilities of my colleague (completely absent from me), I managed to find a rational solution and turn the whole problem into a simple machine that seems to work. In addition to the above situation, the following conditions are added:

  1. An operation can have the states PENDING (set at creation), PROCESSING, PROCESSED.
  2. The balance retains a lot of operations that are being processed; it is believed that in a normally working system this set will not grow indefinitely due to time constraints.

The task is performed as follows (∋ means the presence of an operation in the list of outstanding balance operations):

n | Статус операции | ∋ | Следующий шаг
1 | ?               | ? | Чтение баланса
2 | ?               | - | Чтение операции
3 | PENDING         | - | Добавить операцию в незавершенные
4 | PENDING         | + | Сменить статус операции на PROCESSING
5 | PROCESSING      | + | Применить операцию, одновременно удалив ее из незавершенных
6 | PROCESSING      | - | Сменить статус операции на PROCESSED
7 | PROCESSED       | - | Yay!

The matter remained with the formal proof of the operability (or inoperability) of this machine (and I’m bad with this, I don’t know how to do it yet). The order of transitions 0-1 and 1-2 is necessary in order to prevent the transition 2-3 after transition 5-6. Updating each entity is atomically, if the entity is in some other state, its version will change, which will reset the algorithm to the beginning.

Answer:

maybe Batch transaction will help you.

Scroll to Top