OK
Proccesing Transaction Logs in AWS

Proccesing Transaction Logs in AWS

Proccesing Transaction Logs in AWS

Have you ever thought how mobile applications are sending large volumes of data to be processed and analyzed on server side? One of such use-cases is collecting information about the operations that end user is doing on his application. This is essential information from a support team perspective in case customers report any issues. A support team is able to find a particular case in the system and according to recommendations respond to the customer with a proper outcome.

There are different ways to collect this type of information. One of them is simply logging actions to output stream and then reading and processing them by application. Data manipulation in the application layer may be time-consuming that is why finding a relevant solution for such use-case is so important. The solution has to allow an application to process reliably (in the meaning of being resistant to server downtime) and also quickly start processing after this type of situations. Nowadays building scalable systems is a must have, so applied service has to meet those requirements. If mobile application is also part of a system, gathering data from billions of users may be challenging task, because system has to respond fast and be able to uphold a lot of traffic.

All those functionalities are possible to achieve using AWS Cloud. Logs from each application instance can be aggregated in correlated log groups in Cloud Watch Logs (service to monitor, store and centralize logs from system components). One of the features of Log Groups is possibility to subscribe to them. Once a new log event appears in a group all relevant subscribers will be notified about it and will be able to fetch this piece of data. Services like Kinesis and Lambda are fed with real-time data. Subscriptions allow to adapt filters for data which is extremely helpful when specified logger name is used for providing business logs.

Figure 1. Cloud Watch Logs Insights

Kinesis Data Stream is one of possible destinations for data from Cloud Watch Logs. It is scalable solution for data distribution to application instances. The service is built from streams which contain set of shards that may be considered as buckets for data. The number of shards determines number of input and output operations possible on streams. To read from stream application can use AWS SDK which is responsible for assigning shards to application instances allowing programmers only to focus on application logic.

Figure 2. Kinesis Data Streams High-Level Architecture

Processing data from mobile application once the HTTP request comes is not reasonable solution, because such requests are to slow to be processed. The preferable idea is to put logs to a stream and process them asynchronously. It affects the time of appearance of processed actions in the system, because they may appear after some time, but it is a compromise between user’s experience and system performance. AWS SDK provides concept of producers, which also gives a possibility to put correlated data into one shard using partition key leading to process items in FIFO order. AWS created two libraries for this task, they are: Kinesis Client and Kinesis Producer. The main difference between those libraries, taking into consideration sending data to stream, is buffering feature and built in failure handling in Kinesis Producer.

public void send(Event event) {
    System.out.println("Send event " + event);
    kinesisClient.putRecord(PutRecordRequest.builder()
                    .partitionKey(partitionKey)
                    .data(SdkBytes.fromByteArray(event.getBytes()))
                    .streamName(senderConfig.getStreamName())
                    .build())
            .join();
}

Figure 3 Kinesis Client listing

The above code snippet shows how to send an Event object which can be any kind of data serializable to bytes, because Kinesis Client requires such format. PutRecordRequest puts record into shard according to partitionKey provided into request to the stream which has to be also specified.

@SneakyThrows
@Override
public void send(Event event) {
    System.out.println("Send event " + event);
    kinesisProducer.addUserRecord(
                    new UserRecord(
                            senderConfig.getStreamName(),
                            partitionKey,
                            ByteBuffer.wrap(event.getBytes())))
            .get();
}

Figure 4. Kinesis Producer listing

In the example the same Event can be sent using Kinesis Producer. UserRecord has similar requirements in terms of parameters which have to be provided into PutRecordRequest. The operations are synchronous here because of using get() and join(), however it also can done in asynchronous way.

To fetch data from stream Kinesis Client is a top-notch solution. Shard distribution is no longer an issue for programmers, because this library deals with it perfectly. Even in case of application instance failures it detects it automatically and shard allocated for this instance is immediately assigned to a health one. Data from shard are processed from last created checkpoint which is a pointer to a record in shard. Programmer can decide when checkpoint should be created, which may happen even after every item processing, but it affects performance because it requires database operation to save the state.

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    System.out.println("Processing shard " + shardId + " record(s) " + processRecordsInput.records().size());
    for (KinesisClientRecord kinesisClientRecord : processRecordsInput.records()) {
        try {
            Event event = Event.of(getBytes(kinesisClientRecord));
            System.out.println(now() + " Processing shard: " + shardId + " event: " + event);
            processEvent(event);
            processRecordsInput.checkpointer()
                    .checkpoint(kinesisClientRecord.sequenceNumber(), kinesisClientRecord.subSequenceNumber());
            System.out.println(now() + " End processing shard: " + shardId + " event: " + event);
        } catch (Exception e) {
            System.err.println("Error during event processing " + e);
            throw new RuntimeException(e);
        }
    }
}

Figure 5. Kinesis data processing

Kinesis Library puts records inside ProcessRecordsInput. Each item can be processed in any manner which represents processEvent method. There is a possibility to invoke checkpoint on KinesisClientRecord to store information that data were processed to make sure that the Event will not be returned again by the library.

Kinesis Data Stream is cutting-edge solution to overcome problems with performance, when asynchronous processing is possible and real-time processing is needed. It easily integrates with Cloud Watch, which may be beneficial when logs are a piece of data to be processed. It also ensures durability and flexibility in terms of scaling stream up and down, by changing number of shards, to never lose data before their expiration. Stream can have multiple consumers, so actions on the same event can be completely independent and concurrent.

References:

https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html

https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html

https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html

https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample

ul. Jaracza 62
90-251 Łódź