commit 35a634907123b34f6349188dbd2aa32e34403cbe Author: Nikita Astafyev Date: Tue Dec 30 22:34:58 2025 +0700 chore: initial commit diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..3515749 --- /dev/null +++ b/.clang-format @@ -0,0 +1,57 @@ +--- +Language: Cpp +BasedOnStyle: LLVM +AccessModifierOffset: -4 +AlignAfterOpenBracket: AlwaysBreak +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignOperands: false +AlignTrailingComments: true +AllowShortIfStatementsOnASingleLine: false +AlwaysBreakTemplateDeclarations: Yes +BinPackArguments: false +BinPackParameters: false +BraceWrapping: + AfterCaseLabel: false + AfterClass: false + AfterControlStatement: false + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + BeforeLambdaBody: false + BeforeWhile: false + SplitEmptyFunction: false + SplitEmptyRecord: false + SplitEmptyNamespace: false +BreakBeforeBraces: Custom +BreakConstructorInitializersBeforeComma: false +ColumnLimit: 120 +ConstructorInitializerAllOnOneLineOrOnePerLine: false +IncludeCategories: + - Regex: '^<.*' + Priority: 1 + - Regex: '^".*' + Priority: 2 + - Regex: '.*' + Priority: 3 +IncludeIsMainRegex: '([-_](test|unittest))?$' +IndentCaseLabels: true +IndentPPDirectives: BeforeHash +IndentWidth: 4 +InsertNewlineAtEOF: true +MacroBlockBegin: '' +MacroBlockEnd: '' +NamespaceIndentation: All +PointerAlignment: Left +SpaceInEmptyParentheses: false +SpacesInAngles: false +SpacesInConditionalStatement: false +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +TabWidth: 4 +... diff --git a/.github/ISSUE_TEMPLATE/task-template.md b/.github/ISSUE_TEMPLATE/task-template.md new file mode 100644 index 0000000..2f14f1d --- /dev/null +++ b/.github/ISSUE_TEMPLATE/task-template.md @@ -0,0 +1,56 @@ +--- +name: Task template +about: This is a template for creating tasks for team members. +title: '' +labels: '' +assignees: '' + +--- + +# Task title + +## Context + +Here describe the context of this tasks. Why do you want to open this task? Why should it be completed? How completing it will contribute to the project? + +_Example_ + +> The frontend and the backend need an effective way to communicate with each other. A message queue is well suited for this. It allows for easy task distribution and is straightforward to use. + +## Problem statement + +Here describe what exactly needs to be done. The task must be clearly worded and realistic. + +Be careful here. Describe **precisely** what you want to be done. Think of it like the assignee has an irresistible desire to do everything you did not put here in the exact opposite way compared to what you want (just to irritate you, for fun). Try to think of everything that can go wrong, just in case. + +_Example_ + +> A class `MessageQueue` must be implemented. It must have two methods: +> * `void push(Request request);` +> * `Request pop();` +> The first one should add the request to the queue. The second is to delete the first of the remaining ones and return it. +> +> For this task one must create a separate branch based on the latest commit of branch [base branch]. +> +> The implementation must be split across two files located at the following paths: +> `rediska/common/queue/MessageQueue.hpp` - the declarations for the class +> `rediska/common/queue/MessageQueue.cpp` - the definitions for the class +> +> Tests for the class must also be added. The following cases must be covered: +> 1. Case 1 +> 2. Case 2 +> +> After the task is completed, a pull request to [base branch] must be created and [requester's name] must be added as the assignee. + +## Success criteria + +List all conditions that can be objectively verified and unambiguously indicate successful completion of the task. + +_Example_ + +> 1. The aforementioned class with the requested interface is implemented, the implementation can be found at path [path]. +> 2. Unit tests, which cover all of the listed cases, are added and the implementation passes all of them. + +## Advice (optional) + +If you still have anything helpful to add, do it here. Provide any advice that will help the assignee quickly and successfully complete the task. _Do not hesitate to think for them a little, if you believe it will help them do their job more effectively (in a shorter time period without prejudice to correctness)_. diff --git a/.github/workflows/code_checks.yaml b/.github/workflows/code_checks.yaml new file mode 100644 index 0000000..45a543c --- /dev/null +++ b/.github/workflows/code_checks.yaml @@ -0,0 +1,26 @@ +name: Code Analysis + +on: [push] + +jobs: + analysis: + runs-on: ubuntu-24.04 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup + uses: aminya/setup-cpp@v1 + with: + cppcheck: true + + - name: Static analysis (source code) + run: | + cppcheck \ + --enable=all \ + --std=c++23 \ + --inline-suppr \ + --error-exitcode=1 \ + --suppress=missingInclude \ + --suppress=missingIncludeSystem \ + rediska/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c0297b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +/.idea/.gitignore +/.idea/editor.xml +/.idea/misc.xml +/.idea/modules.xml +/.idea/inspectionProfiles/Project_Default.xml +/.idea/Rediska.iml +/.idea/vcs.xml +/cmake-build-debug +/cmake-build-release +/tests/cmake-build-debug +/tests/cmake-build-release +/build +.cache +allure-report +allure-results +.allure +*vcpkg* +**/.DS_Store \ No newline at end of file diff --git a/.zed/debug.json b/.zed/debug.json new file mode 100644 index 0000000..e5d1945 --- /dev/null +++ b/.zed/debug.json @@ -0,0 +1,19 @@ +// Project-local debug tasks +// +// For more documentation on how to configure debug tasks, +// see: https://zed.dev/docs/debugger +[ + { + "label": "Unit tests", + "build": { + "command": "cmake", + "args": [ + "--build", "${ZED_WORKTREE_ROOT}/cmake-build-debug", "-j", "12" + ] + }, + "reverseDebugging": true, + "program": "${ZED_WORKTREE_ROOT}/cmake-build-debug/tests/unit_tests", + "adapter": "CodeLLDB", + "request": "launch" + } +] diff --git a/.zed/tasks.json b/.zed/tasks.json new file mode 100644 index 0000000..4bb915a --- /dev/null +++ b/.zed/tasks.json @@ -0,0 +1,51 @@ +// Project tasks configuration. See https://zed.dev/docs/tasks for documentation. +[ + { + "label": "Setup debug build directory", + "command": "cmake -S ${ZED_WORKTREE_ROOT} -B ${ZED_WORKTREE_ROOT}/cmake-build-debug -DCMAKE_BUILD_TYPE=Debug", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Setup release build directory", + "command": "cmake -S ${ZED_WORKTREE_ROOT} -B ${ZED_WORKTREE_ROOT}/cmake-build-release -DCMAKE_BUILD_TYPE=Release", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Setup tests debug build directory", + "command": "cmake -S ${ZED_WORKTREE_ROOT} -B ${ZED_WORKTREE_ROOT}/tests/cmake-build-debug -DCMAKE_BUILD_TYPE=Debug", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Setup tests release build directory", + "command": "cmake -S ${ZED_WORKTREE_ROOT} -B ${ZED_WORKTREE_ROOT}/tests/cmake-build-release -DCMAKE_BUILD_TYPE=Release", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Clean debug build directory", + "command": "cmake --build ${ZED_WORKTREE_ROOT}/cmake-build-debug --target clean", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Clean release build directory", + "command": "cmake --build ${ZED_WORKTREE_ROOT}/cmake-build-release --target clean", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Clean tests debug build directory", + "command": "cmake --build ${ZED_WORKTREE_ROOT}/tests/cmake-build-debug --target clean", + "reveal": "no_focus", + "hide": "on_success" + }, + { + "label": "Clean tests release build directory", + "command": "cmake --build ${ZED_WORKTREE_ROOT}/tests/cmake-build-release --target clean", + "reveal": "no_focus", + "hide": "on_success" + } +] diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..daa4df2 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,33 @@ +cmake_minimum_required(VERSION 3.28) +project(Rediska) + +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +if (MSVC) + add_compile_options(/W4) +else() +# add_compile_options(-Wall -Wextra -Werror) +endif() + +include(FetchContent) + +FetchContent_Declare( + doctest + GIT_REPOSITORY https://github.com/doctest/doctest.git + GIT_TAG v2.4.12 +) +FetchContent_Declare( + spdlog + GIT_REPOSITORY https://github.com/gabime/spdlog.git + GIT_TAG v1.16.0 +) + +FetchContent_MakeAvailable(doctest) +FetchContent_MakeAvailable(spdlog) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +add_subdirectory(rediska) + +enable_testing() +add_subdirectory(tests) diff --git a/CMakePresets.json b/CMakePresets.json new file mode 100644 index 0000000..c98bd58 --- /dev/null +++ b/CMakePresets.json @@ -0,0 +1,44 @@ +{ + "version": 3, + "configurePresets": [ + { + "name": "debug", + "displayName": "Debug Build (vcpkg)", + "description": "Debug build using vcpkg dependencies", + "binaryDir": "${sourceDir}/cmake-build-debug", + "generator": "Ninja", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "CMAKE_TOOLCHAIN_FILE": "${sourceDir}/vcpkg/scripts/buildsystems/vcpkg.cmake", + "CMAKE_EXPORT_COMPILE_COMMANDS": "ON" + } + }, + { + "name": "debug-build", + "displayName": "Debug Build (vcpkg)", + "description": "Debug build using vcpkg dependencies", + "binaryDir": "${sourceDir}/build", + "generator": "Ninja", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "CMAKE_TOOLCHAIN_FILE": "${sourceDir}/vcpkg/scripts/buildsystems/vcpkg.cmake", + "CMAKE_EXPORT_COMPILE_COMMANDS": "ON" + } + } + ], + "buildPresets": [ + { + "name": "debug", + "configurePreset": "debug" + }, + { + "name": "debug-build", + "configurePreset": "debug-build" + }, + { + "name": "debug-run", + "configurePreset": "debug", + "targets": [ "run" ] + } + ] +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..832a24d --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# Rediska + +## Building from source + +### Prerequisites (vcpkg) + +> [!NOTE] +> Add `--triplet x64-linux-release` to the vcpkg install to build dependencies in release. +> Replace `linux` with your OS if it is supported + +```bash +git clone --branch 2025.10.17 --depth 1 https://github.com/microsoft/vcpkg.git && \ +./vcpkg/bootstrap-vcpkg.sh && \ +./vcpkg/vcpkg install +``` + +### CMake workflow + +```bash +cmake --preset debug +cmake --build --preset debug-run +``` diff --git a/proto/v1/collections/common.proto b/proto/v1/collections/common.proto new file mode 100644 index 0000000..ce31f81 --- /dev/null +++ b/proto/v1/collections/common.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package v1.collections.common; + +message CollectionElement { + enum SpecialElementKind { + STR = 0; + OBJ = 1; + } + + SpecialElementKind special_element_kind = 1; + + oneof element { + int64 integer = 2; + double floating_point = 3; + string str_or_obj = 4; + bool boolean = 5; + } +} diff --git a/proto/v1/collections/element_kind.proto b/proto/v1/collections/element_kind.proto new file mode 100644 index 0000000..b07e6e1 --- /dev/null +++ b/proto/v1/collections/element_kind.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package v1.collections.common; + +enum ElementKind { + INT = 0; + FLOAT = 1; + BOOL = 2; + STRING = 3; + OBJECT = 4; + LIST = 5; +} \ No newline at end of file diff --git a/proto/v1/collections/list.proto b/proto/v1/collections/list.proto new file mode 100644 index 0000000..e00c4d9 --- /dev/null +++ b/proto/v1/collections/list.proto @@ -0,0 +1,95 @@ +syntax = "proto3"; +import "v1/collections/common.proto"; +import "v1/collections/element_kind.proto"; +import "google/protobuf/empty.proto"; + +package v1.collections.list; + +service ListCacheService { + rpc Create(ListCreateRequest) returns (ListCreateResponse); + rpc Get(ListGetRequest) returns (stream ListGetResponse); + rpc Insert(ListInsertRequest) returns (google.protobuf.Empty); + rpc Erase(ListEraseRequest) returns (ListEraseResponse); + rpc Set(ListSetRequest) returns (ListSetResponse); + rpc Length(ListLengthRequest) returns (ListLengthResponse); + rpc PushBack(PushBackRequest) returns (google.protobuf.Empty); + rpc PushMany(stream PushManyRequest) returns (google.protobuf.Empty); + rpc PopBack(PopBackRequest) returns (PopBackResponse); + rpc Clear(ClearRequest) returns (google.protobuf.Empty); + rpc Delete(DeleteRequest) returns (google.protobuf.Empty); +} + +message ListCreateRequest { + common.ElementKind element_kind = 1; + optional uint32 ttl_seconds = 2; +} + +message ListCreateResponse { + string id = 1; +} + +message ListGetRequest { + string id = 1; +} + +message ListGetResponse { + common.CollectionElement element = 1; +} + +message ListInsertRequest { + string id = 1; + uint64 index = 2; + common.CollectionElement value = 3; +} + +message ListEraseRequest { + string id = 1; + int64 index = 2; +} + +message ListEraseResponse { + common.CollectionElement removed_value = 1; +} + +message ListSetRequest { + string id = 1; + repeated common.CollectionElement elements = 2; +} + +message ListSetResponse { +} + +message ListLengthRequest { + string id = 1; +} + +message ListLengthResponse { + uint64 length = 1; +} + +message PushBackRequest { + string id = 1; + common.CollectionElement element = 2; +} + +message PushManyRequest { + string id = 1; + common.CollectionElement element = 2; +} + +message PopBackRequest { + string id = 1; +} + +message PopBackResponse { + common.CollectionElement element = 1; +} + + +message ClearRequest { + string id = 1; +} + +message DeleteRequest { + string id = 1; +} diff --git a/proto/v1/object/object.proto b/proto/v1/object/object.proto new file mode 100644 index 0000000..37d2a43 --- /dev/null +++ b/proto/v1/object/object.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package v1.object; + +service ObjectCacheService { + rpc Create(CreateObjectRequest) returns (CreateObjectResponse); + rpc Get(GetObjectRequest) returns (GetObjectResponse); + rpc Set(SetObjectRequest) returns (google.protobuf.Empty); + rpc Delete(DeleteObjectRequest) returns (google.protobuf.Empty); + rpc GetField(GetObjectFieldRequest) returns (GetObjectFieldResponse); + rpc SetField(SetObjectFieldRequest) returns (google.protobuf.Empty); + rpc DeleteField(DeleteObjectFieldRequest) returns (google.protobuf.Empty); +} + +message CreateObjectRequest { + string object = 1; +} + +message CreateObjectResponse { + string id = 1; +} + +message GetObjectRequest { + string id = 1; +} + +message GetObjectResponse { + string object = 1; +} + +message SetObjectRequest { + string id = 1; + string new_object = 2; +} + +message DeleteObjectRequest { + string id = 1; +} + +message GetObjectFieldRequest { + string id = 1; + string field_name = 2; +} + +message GetObjectFieldResponse { + string field_value = 1; +} + +message SetObjectFieldRequest { + string id = 1; + string field_name = 2; + string value = 3; +} + +message DeleteObjectFieldRequest { + string id = 1; + string field_name = 2; +} diff --git a/proto/v1/primitives/bool.proto b/proto/v1/primitives/bool.proto new file mode 100644 index 0000000..8868148 --- /dev/null +++ b/proto/v1/primitives/bool.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package v1.primitives.boolean; + +service BoolCacheService { + rpc Create(BoolCreateRequest) returns (BoolCreateResponse); + rpc Set(BoolSetRequest) returns (google.protobuf.Empty); + rpc Get(BoolGetRequest) returns (BoolGetResponse); + rpc Delete(BoolDeleteRequest) returns (BoolDeleteResponse); +} + +message BoolCreateRequest { + bool value = 1; + optional uint32 ttl_seconds = 2; +} + +message BoolCreateResponse { + string id = 1; +} + +message BoolSetRequest { + string id = 1; + bool value = 2; +} + +message BoolGetRequest { + string id = 1; +} + +message BoolGetResponse { + bool value = 1; +} + +message BoolDeleteRequest { + string id = 1; +} + +message BoolDeleteResponse { + bool removed_value = 1; +} \ No newline at end of file diff --git a/proto/v1/primitives/float.proto b/proto/v1/primitives/float.proto new file mode 100644 index 0000000..77968b1 --- /dev/null +++ b/proto/v1/primitives/float.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package v1.primitives.flt; + +service FloatCacheService { + rpc Create(FloatCreateRequest) returns (FloatCreateResponse); + rpc Set(FloatSetRequest) returns (google.protobuf.Empty); + rpc Get(FloatGetRequest) returns (FloatGetResponse); + rpc Delete(FloatDeleteRequest) returns (FloatDeleteResponse); +} + +message FloatCreateRequest { + double value = 1; + optional uint32 ttl_seconds = 2; +} + +message FloatCreateResponse { + string id = 1; +} + +message FloatSetRequest { + string id = 1; + double value = 2; +} + +message FloatGetRequest { + string id = 1; +} + +message FloatGetResponse { + double value = 1; +} + +message FloatDeleteRequest { + string id = 1; +} + +message FloatDeleteResponse { + double removed_value = 1; +} \ No newline at end of file diff --git a/proto/v1/primitives/int.proto b/proto/v1/primitives/int.proto new file mode 100644 index 0000000..f90e241 --- /dev/null +++ b/proto/v1/primitives/int.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package v1.primitives.integer; + +service IntCacheService { + rpc Create(IntCreateRequest) returns (IntCreateResponse); + rpc Set(IntSetRequest) returns (google.protobuf.Empty); + rpc Get(IntGetRequest) returns (IntGetResponse); + rpc Delete(IntDeleteRequest) returns (IntDeleteResponse); +} + +message IntCreateRequest { + int64 value = 1; + optional uint32 ttl_seconds = 2; +} + +message IntCreateResponse { + string id = 1; +} + +message IntSetRequest { + string id = 1; + int64 value = 2; +} + +message IntGetRequest { + string id = 1; +} + +message IntGetResponse { + int64 value = 1; +} + +message IntDeleteRequest { + string id = 1; +} + +message IntDeleteResponse { + int64 removed_value = 1; +} \ No newline at end of file diff --git a/proto/v1/primitives/string.proto b/proto/v1/primitives/string.proto new file mode 100644 index 0000000..05cacf1 --- /dev/null +++ b/proto/v1/primitives/string.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package v1.primitives.str; + +service StringCacheService { + rpc Create(StringCreateRequest) returns (StringCreateResponse); + rpc Set(StringSetRequest) returns (google.protobuf.Empty); + rpc Get(StringGetRequest) returns (StringGetResponse); + rpc Delete(StringDeleteRequest) returns (StringDeleteResponse); +} + +message StringCreateRequest { + string value = 1; + optional uint32 ttl_seconds = 2; +} + +message StringCreateResponse { + string id = 1; +} + +message StringSetRequest { + string id = 1; + string value = 2; +} + +message StringGetRequest { + string id = 1; +} + +message StringGetResponse { + string value = 1; +} + +message StringDeleteRequest { + string id = 1; +} + +message StringDeleteResponse { + string removed_value = 1; +} \ No newline at end of file diff --git a/rediska/CMakeLists.txt b/rediska/CMakeLists.txt new file mode 100644 index 0000000..97b05a6 --- /dev/null +++ b/rediska/CMakeLists.txt @@ -0,0 +1,25 @@ +add_subdirectory(common) +add_subdirectory(data-structures) + +add_subdirectory(worker) +add_subdirectory(cache) + +add_subdirectory(frontend) + + +add_executable(Rediska main.cpp) +target_link_libraries(Rediska PRIVATE + frontend + worker + cache + common + data-structures +) + +# Convenience target +add_custom_target(run + COMMAND $ + DEPENDS Rediska + USES_TERMINAL + COMMENT "Building and running Rediska" +) diff --git a/rediska/cache/BaseCacheConfig.hpp b/rediska/cache/BaseCacheConfig.hpp new file mode 100644 index 0000000..c22c632 --- /dev/null +++ b/rediska/cache/BaseCacheConfig.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace cache { + struct BaseCacheConfig { + bool resetTTLOnAccess = true; + }; +} diff --git a/rediska/cache/BaseItemMetadata.cpp b/rediska/cache/BaseItemMetadata.cpp new file mode 100644 index 0000000..75d944b --- /dev/null +++ b/rediska/cache/BaseItemMetadata.cpp @@ -0,0 +1,20 @@ +#include "BaseItemMetadata.hpp" + +namespace cache { + BaseItemMetadata::BaseItemMetadata(TTL ttl) : ttl_(ttl) { + resetExpirationTime(); + } + + void BaseItemMetadata::updateTTL(TTL ttl) { + ttl_ = ttl; + resetExpirationTime(); + } + + bool BaseItemMetadata::isExpired() const { + return expirationTime_ < std::chrono::steady_clock::now(); + } + + void BaseItemMetadata::resetExpirationTime() { + expirationTime_ = std::chrono::steady_clock::now() + std::chrono::seconds(ttl_); + } +} diff --git a/rediska/cache/BaseItemMetadata.hpp b/rediska/cache/BaseItemMetadata.hpp new file mode 100644 index 0000000..bda48cc --- /dev/null +++ b/rediska/cache/BaseItemMetadata.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include "rediska/common/types.hpp" + +namespace cache { + class BaseItemMetadata { + public: + BaseItemMetadata(TTL ttl); + + void updateTTL(TTL ttl); + + void resetExpirationTime(); + + bool isExpired() const; + + private: + TTL ttl_; + Timestamp expirationTime_; + }; +} diff --git a/rediska/cache/CMakeLists.txt b/rediska/cache/CMakeLists.txt new file mode 100644 index 0000000..91aae3d --- /dev/null +++ b/rediska/cache/CMakeLists.txt @@ -0,0 +1,9 @@ +add_library(cache STATIC + lru/LRU.cpp + lru/LRUItemMetadata.cpp + BaseItemMetadata.cpp +) + +target_link_libraries(cache PUBLIC + common +) diff --git a/rediska/cache/CachePolicy.hpp b/rediska/cache/CachePolicy.hpp new file mode 100644 index 0000000..9889f74 --- /dev/null +++ b/rediska/cache/CachePolicy.hpp @@ -0,0 +1,21 @@ +#include "rediska/common/MessageArguments.hpp" +#include "rediska/common/enums.hpp" +#include "rediska/common/types.hpp" + +namespace cache { + class CachePolicy { + public: + virtual ~CachePolicy() = default; + + virtual void get(CacheKey&& key) = 0; + + virtual void set(CacheKey&& key, CacheValue&& value, TTL ttl) = 0; + + virtual void applyTo(CacheKey&& key, OperationId op, MessageArguments&& args) = 0; + + protected: + virtual void evict() = 0; + + [[nodiscard]] virtual inline bool isFull() const = 0; + }; +} diff --git a/rediska/cache/constants.hpp b/rediska/cache/constants.hpp new file mode 100644 index 0000000..743839e --- /dev/null +++ b/rediska/cache/constants.hpp @@ -0,0 +1,5 @@ +#pragma once + +#include + +constexpr int16_t MAX_ALLOWED_WORKERS = INT8_MAX + 1; diff --git a/rediska/cache/lru/LRU.cpp b/rediska/cache/lru/LRU.cpp new file mode 100644 index 0000000..79b7af2 --- /dev/null +++ b/rediska/cache/lru/LRU.cpp @@ -0,0 +1,118 @@ +#include "rediska/cache/lru/LRU.hpp" +#include +#include +#include +#include +#include "rediska/cache/lru/LRUConfig.hpp" +#include "rediska/cache/types.hpp" +#include "rediska/common/MessageArguments.hpp" +#include "rediska/common/types.hpp" +#include "rediska/common/utils.hpp" + +namespace cache { + LRU::LRU(LRUConfig config, CacheOpCallback callback) + : config_(std::move(config)), callback_(callback) {} + + void LRU::get(CacheKey&& key) { + std::shared_lock lock(mutex_); + auto it = keyToItem_.find(key); + if (it == keyToItem_.end()) { + return callback_(std::unexpected(RediskaReturnCode::NOT_FOUND)); + } + auto& metadata = it->second->location.metadata; + + if (metadata.isExpired()) { + evict(it); + return callback_(std::unexpected(RediskaReturnCode::KEY_EXPIRED)); + } + if (config_.resetTTLOnAccess) metadata.resetExpirationTime(); + + // Move to start + lru_list_.splice(lru_list_.begin(), lru_list_, it->second); + keyToItem_[key] = lru_list_.begin(); + callback_(std::make_optional(it->second->location.value)); + } + + void LRU::set(CacheKey&& key, CacheValue&& value, TTL ttl) { + std::unique_lock lock(mutex_); + auto it = keyToItem_.find(key); + if (it != keyToItem_.end()) { + it->second->location.value = std::move(value); + it->second->location.metadata.resetExpirationTime(); + lru_list_.splice(lru_list_.begin(), lru_list_, it->second); + callback_(std::nullopt); + return; + } + + if (isFull()) evict(); + + lru_list_.push_front( + CacheNode { + .key = std::move(key), + .location = ItemHandle{ + .value = std::move(value), + .metadata = LRU::ItemMetadata(ttl) + } + } + ); + keyToItem_[key] = lru_list_.begin(); + callback_(std::nullopt); + } + + void LRU::applyTo(CacheKey&& key, OperationId op, MessageArguments&& args) { + std::unique_lock lock(mutex_); + auto it = keyToItem_.find(key); + if (it == keyToItem_.end()) { + return callback_(std::unexpected(RediskaReturnCode::NOT_FOUND)); + } + auto& metadata = it->second->location.metadata; + + std::expected, RediskaReturnCode> value; + std::visit([&value, op](auto&& arg) { + using T = std::decay_t; + if constexpr (is_any_of_v) { + value = std::unexpected(RediskaReturnCode::INCOMPATIBLE_OPERATION); + } else if constexpr (std::is_same_v>) { + // TODO: Replace with real data argument + DSValue dummy_data{}; + + std::expected, DSReturnCode> res = arg->handle(op, std::move(dummy_data)); + if (!res) { + value = std::unexpected(DSReturnCodeToRediskaReturnCode(res.error())); + return; + } + if (!res.value().has_value()) { + value = std::nullopt; + return; + } + + // TODO: Explore implicit conversion + // Attempts to covert failed + value = std::visit([](auto&& arg) -> CacheValue { + return CacheValue{arg}; + }, std::move(res->value())); + } + }, it->second->location.value); + + lru_list_.splice(lru_list_.begin(), lru_list_, it->second); + callback_(std::move(value)); + return; + } + + void LRU::evict() { + if (lru_list_.empty()) return; + + keyToItem_.erase(lru_list_.back().key); + lru_list_.pop_back(); + } + + void LRU::evict(const std::unordered_map::iterator>::iterator node) { + lru_list_.erase(node->second); + keyToItem_.erase(node); + } + + + inline bool LRU::isFull() const { + return lru_list_.size() >= config_.maxCapacity; + } +} diff --git a/rediska/cache/lru/LRU.hpp b/rediska/cache/lru/LRU.hpp new file mode 100644 index 0000000..096d4c9 --- /dev/null +++ b/rediska/cache/lru/LRU.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include +#include "rediska/cache/lru/LRUConfig.hpp" +#include "rediska/cache/lru/LRUItemMetadata.hpp" +#include "rediska/cache/types.hpp" +#include "rediska/common/MessageArguments.hpp" +#include "rediska/common/types.hpp" +#include "rediska/cache/CachePolicy.hpp" + +namespace cache { + class LRU : public CachePolicy { + public: + using ItemMetadata = LRUItemMetadata; + using ItemHandle = struct { + CacheValue value; + ItemMetadata metadata; + }; + using CacheNode = struct { + CacheKey key; + ItemHandle location; + }; + + LRU(LRUConfig config, CacheOpCallback callback); + ~LRU() = default; + + void get(CacheKey&& key) override; + + void set(CacheKey&& key, CacheValue&& value, TTL ttl) override; + + void applyTo(CacheKey&& key, OperationId op, MessageArguments&& args) override; + + private: + std::list lru_list_; + std::unordered_map::iterator> keyToItem_; + + std::shared_mutex mutex_; + + LRUConfig config_; + CacheOpCallback callback_; + + void evict() override; + + void evict(const std::unordered_map::iterator>::iterator node); + + inline bool isFull() const override; + + void resetTTLIfEnabled(CacheNode& key); + }; +} diff --git a/rediska/cache/lru/LRUConfig.hpp b/rediska/cache/lru/LRUConfig.hpp new file mode 100644 index 0000000..97bb235 --- /dev/null +++ b/rediska/cache/lru/LRUConfig.hpp @@ -0,0 +1,12 @@ +#pragma once + +#include +#include "rediska/cache/BaseCacheConfig.hpp" +#include "rediska/common/types.hpp" + +namespace cache { + struct LRUConfig : BaseCacheConfig { + size_t maxCapacity; + TTL ttl; + }; +} diff --git a/rediska/cache/lru/LRUItemMetadata.cpp b/rediska/cache/lru/LRUItemMetadata.cpp new file mode 100644 index 0000000..dbf8ec2 --- /dev/null +++ b/rediska/cache/lru/LRUItemMetadata.cpp @@ -0,0 +1,9 @@ +#include "rediska/cache/BaseItemMetadata.hpp" +#include "rediska/common/types.hpp" + +namespace cache { + class LRUItemMetadata : public BaseItemMetadata { + public: + LRUItemMetadata(TTL ttl) : BaseItemMetadata(ttl) {} + }; +} diff --git a/rediska/cache/lru/LRUItemMetadata.hpp b/rediska/cache/lru/LRUItemMetadata.hpp new file mode 100644 index 0000000..dfc351d --- /dev/null +++ b/rediska/cache/lru/LRUItemMetadata.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include "rediska/cache/BaseItemMetadata.hpp" + +namespace cache { + class LRUItemMetadata : public BaseItemMetadata { + public: + LRUItemMetadata(TTL ttl); + }; +} diff --git a/rediska/cache/types.hpp b/rediska/cache/types.hpp new file mode 100644 index 0000000..29db077 --- /dev/null +++ b/rediska/cache/types.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include +#include +#include +#include "rediska/common/types.hpp" + +namespace cache { + using CacheOpCallback = std::function, RediskaReturnCode>)>; +} diff --git a/rediska/common/CMakeLists.txt b/rediska/common/CMakeLists.txt new file mode 100644 index 0000000..934a56b --- /dev/null +++ b/rediska/common/CMakeLists.txt @@ -0,0 +1,10 @@ +add_library(common STATIC + tmp.cpp +) + +find_package(concurrentqueue REQUIRED) + +target_link_libraries(common + PUBLIC + spdlog::spdlog +) diff --git a/rediska/common/MessageArguments.hpp b/rediska/common/MessageArguments.hpp new file mode 100644 index 0000000..59dfebf --- /dev/null +++ b/rediska/common/MessageArguments.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include +#include "rediska/common/types.hpp" + +struct PrimitiveSetArgs { + CacheValue value; + TTL ttl_seconds; +}; + +struct ListCreateArgs { + CacheValueId element_kind; + TTL ttl_seconds; +}; + +struct ListIndexArgs { + int64_t index; +}; + +struct ListSetArgs { + int64_t index; + CacheValue value; +}; + +struct ListInsertArgs { + int64_t index; + CacheValue value; +}; + +struct ListPushBackArgs { + CacheValue value; +}; + +struct ListPushManyArgs { + std::vector values; + bool replace_entire_list = false; // true => overwrite entire list, false => append +}; + +using MessageArguments = std::variant; diff --git a/rediska/common/QueueMessage.hpp b/rediska/common/QueueMessage.hpp new file mode 100644 index 0000000..e35adf0 --- /dev/null +++ b/rediska/common/QueueMessage.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +#include "rediska/common/enums.hpp" +#include "rediska/common/types.hpp" +#include "rediska/common/MessageArguments.hpp" +#include +#include // for ServerAsyncStreamingInterface completeness + +// TODO: get rid of forward declaration somehow maybe +struct BaseRequestManager; + +struct RequestEvent { + BaseRequestManager& manager; +}; + +// This just leaves the responder for you to delete. +struct FinishEvent { + std::unique_ptr responder; +}; + +using EventVariant = std::variant; + +struct QueueMessage { + CacheValueId type; + CacheKey key; + OperationId operation; + MessageArguments arguments; + std::unique_ptr responder; + + template + void respond(ResponseT response) { + if (auto* writer = dynamic_cast*>(responder.get())) { + writer->Finish(response, grpc::Status::OK, std::make_unique(FinishEvent { std::move(responder) }).release()); + responder = nullptr; + } else { + throw std::runtime_error("Invalid responder type in QueueMessage::respond"); + } + } +}; diff --git a/rediska/common/enums.hpp b/rediska/common/enums.hpp new file mode 100644 index 0000000..b1992a2 --- /dev/null +++ b/rediska/common/enums.hpp @@ -0,0 +1,13 @@ +#pragma once + +enum class OperationId { + GET, + SET, + DELETE, + LIST_PUSH_BACK, + LIST_POP_BACK, + LIST_INSERT, + LIST_ERASE, + OBJECT_GET_FIELD, + OBJECT_SET_FIELD, +}; diff --git a/rediska/common/tmp.cpp b/rediska/common/tmp.cpp new file mode 100644 index 0000000..e69de29 diff --git a/rediska/common/types.hpp b/rediska/common/types.hpp new file mode 100644 index 0000000..1fa44e8 --- /dev/null +++ b/rediska/common/types.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "rediska/data-structures/impl/ListDataStructure.hpp" + +using CacheKey = std::string; + +// Definition order of `CacheValue` and `TypeId` MUST match! +using CacheValue = std::variant>; + +enum class CacheValueId { BOOLEAN = 0, INT, FLOAT, STRING, ARRAY }; // TODO: Object + +enum class RediskaReturnCode { + OK, + INCOMPATIBLE_OPERATION, + NOT_FOUND, + KEY_EXPIRED, + UNKNOWN_ERROR, + DS_EMPTY, + DS_OUT_OF_RANGE, + DS_UNKNOWN_ERROR +}; + +using TTL = uint32_t; + +using Timestamp = std::chrono::steady_clock::time_point; diff --git a/rediska/common/utils.hpp b/rediska/common/utils.hpp new file mode 100644 index 0000000..e08e8ea --- /dev/null +++ b/rediska/common/utils.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include +#include "rediska/common/types.hpp" + +template +constexpr bool is_any_of_v = (std::is_same_v || ...); + +inline RediskaReturnCode DSReturnCodeToRediskaReturnCode(DSReturnCode code) { + switch (code) { + case DSReturnCode::OK: + return RediskaReturnCode::OK; + case DSReturnCode::NOT_FOUND: + return RediskaReturnCode::NOT_FOUND; + case DSReturnCode::INCOMPATIBLE_OPERATION: + return RediskaReturnCode::INCOMPATIBLE_OPERATION; + case DSReturnCode::EMPTY: + return RediskaReturnCode::DS_EMPTY; + case DSReturnCode::OUT_OF_RANGE: + return RediskaReturnCode::DS_OUT_OF_RANGE; + default: + return RediskaReturnCode::UNKNOWN_ERROR; + } +} diff --git a/rediska/data-structures/AbstractDataStructure.hpp b/rediska/data-structures/AbstractDataStructure.hpp new file mode 100644 index 0000000..13ece52 --- /dev/null +++ b/rediska/data-structures/AbstractDataStructure.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include +#include +#include "rediska/common/enums.hpp" +#include "rediska/data-structures/enums.hpp" +#include "rediska/data-structures/types.hpp" + +class AbstractDataStructure { +public: + virtual ~AbstractDataStructure() = default; + + virtual std::expected, DSReturnCode> handle(OperationId op, DSValue data) = 0; +}; diff --git a/rediska/data-structures/CMakeLists.txt b/rediska/data-structures/CMakeLists.txt new file mode 100644 index 0000000..6802b42 --- /dev/null +++ b/rediska/data-structures/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(data-structures STATIC + impl/ListDataStructure.cpp +) + +target_link_libraries(data-structures PUBLIC + common +) diff --git a/rediska/data-structures/README.md b/rediska/data-structures/README.md new file mode 100644 index 0000000..e69de29 diff --git a/rediska/data-structures/enums.hpp b/rediska/data-structures/enums.hpp new file mode 100644 index 0000000..8233ddf --- /dev/null +++ b/rediska/data-structures/enums.hpp @@ -0,0 +1,3 @@ +#pragma once + +enum class DSReturnCode { OK, INCOMPATIBLE_OPERATION, EMPTY, OUT_OF_RANGE, NOT_FOUND }; diff --git a/rediska/data-structures/impl/ListDataStructure.cpp b/rediska/data-structures/impl/ListDataStructure.cpp new file mode 100644 index 0000000..0d5ceca --- /dev/null +++ b/rediska/data-structures/impl/ListDataStructure.cpp @@ -0,0 +1,56 @@ +#include +#include "rediska/data-structures/AbstractDataStructure.hpp" +#include "rediska/data-structures/enums.hpp" +#include "rediska/data-structures/types.hpp" + +class ListDataStructure : public AbstractDataStructure { +public: + ~ListDataStructure() = default; + + using arguments = struct {}; + + std::expected, DSReturnCode> handle(OperationId op, DSValue data) override { + switch (op) { + case OperationId::GET: { + // TODO: Index + // if (index < 0 || index >= list_.size()) { + // return std::unexpected(DSReturnCode::OUT_OF_RANGE); + // } + return std::make_optional(*list_.begin()); + } + case OperationId::SET: { + // Replace entire list with a single value if provided + list_.clear(); + list_.push_back(data); + return std::nullopt; + } + case OperationId::LIST_PUSH_BACK: { + list_.push_back(data); + return std::nullopt; + } + case OperationId::LIST_POP_BACK: { + if (list_.empty()) { + return std::unexpected(DSReturnCode::EMPTY); + } + DSValue value = list_.back(); + list_.pop_back(); + return std::make_optional(value); + } + case OperationId::LIST_INSERT: { + // TODO: if index + list_.insert(list_.begin(), data); + return std::nullopt; + } + case OperationId::LIST_ERASE: { + // TODO: if index + list_.erase(list_.begin()); + return std::nullopt; + } + default: + return std::unexpected(DSReturnCode::INCOMPATIBLE_OPERATION); + } + } + +private: + std::list list_; +}; diff --git a/rediska/data-structures/impl/ListDataStructure.hpp b/rediska/data-structures/impl/ListDataStructure.hpp new file mode 100644 index 0000000..b0dd795 --- /dev/null +++ b/rediska/data-structures/impl/ListDataStructure.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include +#include "rediska/data-structures/AbstractDataStructure.hpp" +#include "rediska/data-structures/types.hpp" + +class ListDataStructure : public AbstractDataStructure { +public: + ~ListDataStructure() = default; + + std::expected, DSReturnCode> handle(OperationId op, DSValue data) override; + +private: + std::list list_; +}; diff --git a/rediska/data-structures/types.hpp b/rediska/data-structures/types.hpp new file mode 100644 index 0000000..3265003 --- /dev/null +++ b/rediska/data-structures/types.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include +#include +#include + +using DSValue = std::variant; diff --git a/rediska/frontend/CMakeLists.txt b/rediska/frontend/CMakeLists.txt new file mode 100644 index 0000000..5862827 --- /dev/null +++ b/rediska/frontend/CMakeLists.txt @@ -0,0 +1,63 @@ +add_library(frontend STATIC + # IntService.cpp + # BoolService.cpp + # FloatService.cpp + # StringService.cpp + CallData.cpp + server.hpp + RequestManager.hpp +) + +# Find packages +find_package(Protobuf REQUIRED CONFIG) +find_package(gRPC REQUIRED CONFIG) + +set(PROTO_ROOT_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../../proto") + +# Define the proto files +set(PROTO_FILES + "${PROTO_ROOT_DIR}/v1/collections/common.proto" + "${PROTO_ROOT_DIR}/v1/collections/element_kind.proto" + "${PROTO_ROOT_DIR}/v1/collections/list.proto" + "${PROTO_ROOT_DIR}/v1/object/object.proto" + "${PROTO_ROOT_DIR}/v1/primitives/bool.proto" + "${PROTO_ROOT_DIR}/v1/primitives/float.proto" + "${PROTO_ROOT_DIR}/v1/primitives/int.proto" + "${PROTO_ROOT_DIR}/v1/primitives/string.proto" +) + +# This allows protobuf_generate to find them when inspecting the target. +target_sources(frontend PRIVATE ${PROTO_FILES}) + +# Include directories +# We include the binary dir (for generated .pb.h files) and the root (for imports) +target_include_directories(frontend PUBLIC + "${CMAKE_CURRENT_BINARY_DIR}" + "${PROTO_ROOT_DIR}" +) + +# Generate Standard Protobuf files (.pb.cc / .pb.h) +# We use 'protobuf_generate' instead of the legacy 'protobuf_generate_cpp'. +# It automatically adds the generated C++ files back into the 'frontend' target. +protobuf_generate( + TARGET frontend + LANGUAGE cpp + IMPORT_DIRS "${PROTO_ROOT_DIR}" + PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}" +) + +# Generate gRPC specific files (.grpc.pb.cc / .grpc.pb.h) +protobuf_generate( + TARGET frontend + LANGUAGE grpc + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc + PLUGIN "protoc-gen-grpc=$" + IMPORT_DIRS "${PROTO_ROOT_DIR}" + PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}" +) + +target_link_libraries(frontend + PUBLIC + gRPC::grpc++ + protobuf::libprotobuf +) diff --git a/rediska/frontend/CallData.cpp b/rediska/frontend/CallData.cpp new file mode 100644 index 0000000..b9a35ae --- /dev/null +++ b/rediska/frontend/CallData.cpp @@ -0,0 +1,336 @@ +#include +#include +#include +#include +#include "google/protobuf/empty.pb.h" + +#include "v1/collections/list.grpc.pb.h" +#include "v1/primitives/bool.grpc.pb.h" +#include "v1/primitives/int.grpc.pb.h" +#include "v1/primitives/string.grpc.pb.h" + +#include "rediska/common/QueueMessage.hpp" +#include "rediska/frontend/RequestManager.hpp" +#include "rediska/frontend/server.hpp" + +namespace { + CacheValue CollectionElementToCacheValue(const v1::collections::common::CollectionElement& element) { + if (element.has_integer()) + return static_cast(element.integer()); + if (element.has_floating_point()) + return element.floating_point(); + if (element.has_boolean()) + return element.boolean(); + if (element.has_str_or_obj()) + return element.str_or_obj(); + return std::string{}; + } +} // namespace + +using BoolService = v1::primitives::boolean::BoolCacheService::AsyncService; +using IntService = v1::primitives::integer::IntCacheService::AsyncService; +using StringService = v1::primitives::str::StringCacheService::AsyncService; +using ListService = v1::collections::list::ListCacheService::AsyncService; + +struct BoolSetRequestManager : RequestManager< + BoolSetRequestManager, + v1::primitives::boolean::BoolSetRequest, + grpc::ServerAsyncResponseWriter, + BoolService, + &BoolService::RequestSet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::BOOLEAN; + msg.key = this->request.id(); + msg.operation = OperationId::SET; + msg.arguments = PrimitiveSetArgs{.value = CacheValue{this->request.value()}, .ttl_seconds = 0}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct BoolGetRequestManager : RequestManager< + BoolGetRequestManager, + v1::primitives::boolean::BoolGetRequest, + grpc::ServerAsyncResponseWriter, + BoolService, + &BoolService::RequestGet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::BOOLEAN; + msg.key = this->request.id(); + msg.operation = OperationId::GET; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct BoolDeleteRequestManager : RequestManager< + BoolDeleteRequestManager, + v1::primitives::boolean::BoolDeleteRequest, + grpc::ServerAsyncResponseWriter, + BoolService, + &BoolService::RequestDelete> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::BOOLEAN; + msg.key = this->request.id(); + msg.operation = OperationId::DELETE; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct IntSetRequestManager : RequestManager< + IntSetRequestManager, + v1::primitives::integer::IntSetRequest, + grpc::ServerAsyncResponseWriter, + IntService, + &IntService::RequestSet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::INT; + msg.key = this->request.id(); + msg.operation = OperationId::SET; + msg.arguments = PrimitiveSetArgs{.value = CacheValue{this->request.value()}, .ttl_seconds = 0}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct IntGetRequestManager : RequestManager< + IntGetRequestManager, + v1::primitives::integer::IntGetRequest, + grpc::ServerAsyncResponseWriter, + IntService, + &IntService::RequestGet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::INT; + msg.key = this->request.id(); + msg.operation = OperationId::GET; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct IntDeleteRequestManager : RequestManager< + IntDeleteRequestManager, + v1::primitives::integer::IntDeleteRequest, + grpc::ServerAsyncResponseWriter, + IntService, + &IntService::RequestDelete> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::INT; + msg.key = this->request.id(); + msg.operation = OperationId::DELETE; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct StringSetRequestManager : RequestManager< + StringSetRequestManager, + v1::primitives::str::StringSetRequest, + grpc::ServerAsyncResponseWriter, + StringService, + &StringService::RequestSet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::STRING; + msg.key = this->request.id(); + msg.operation = OperationId::SET; + msg.arguments = PrimitiveSetArgs{.value = CacheValue{this->request.value()}, .ttl_seconds = 0}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct StringGetRequestManager : RequestManager< + StringGetRequestManager, + v1::primitives::str::StringGetRequest, + grpc::ServerAsyncResponseWriter, + StringService, + &StringService::RequestGet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::STRING; + msg.key = this->request.id(); + msg.operation = OperationId::GET; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct StringDeleteRequestManager : RequestManager< + StringDeleteRequestManager, + v1::primitives::str::StringDeleteRequest, + grpc::ServerAsyncResponseWriter, + StringService, + &StringService::RequestDelete> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::STRING; + msg.key = this->request.id(); + msg.operation = OperationId::DELETE; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct ListSetRequestManager : RequestManager< + ListSetRequestManager, + v1::collections::list::ListSetRequest, + grpc::ServerAsyncResponseWriter, + ListService, + &ListService::RequestSet> { + using RequestManager::RequestManager; + QueueMessage BuildMessage() { + std::vector values; + values.reserve(static_cast(this->request.elements_size())); + for (const auto& el : this->request.elements()) + values.emplace_back(CollectionElementToCacheValue(el)); + QueueMessage msg; + msg.type = CacheValueId::ARRAY; + msg.key = this->request.id(); + msg.operation = OperationId::SET; + msg.arguments = ListPushManyArgs{.values = std::move(values), .replace_entire_list = true}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct ListGetRequestManager : RequestManager< + ListGetRequestManager, + v1::collections::list::ListGetRequest, + grpc::ServerAsyncWriter, + ListService, + &ListService::RequestGet> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::ARRAY; + msg.key = this->request.id(); + msg.operation = OperationId::GET; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +struct ListDeleteRequestManager : RequestManager< + ListDeleteRequestManager, + v1::collections::list::DeleteRequest, + grpc::ServerAsyncResponseWriter, + ListService, + &ListService::RequestDelete> { + using RequestManager::RequestManager; + + QueueMessage BuildMessage() { + QueueMessage msg; + msg.type = CacheValueId::ARRAY; + msg.key = this->request.id(); + msg.operation = OperationId::DELETE; + msg.arguments = std::monostate{}; + msg.responder = this->TakeResponder(); + return msg; + } +}; + +void RunFrontendServer(const std::string& address, std::function callback_enqueue_message) { + grpc::ServerBuilder builder; + BoolService bool_service; + IntService int_service; + StringService string_service; + ListService list_service; + + builder.AddListeningPort(address, grpc::InsecureServerCredentials()); + builder.RegisterService(&bool_service); + builder.RegisterService(&int_service); + builder.RegisterService(&string_service); + builder.RegisterService(&list_service); + + std::unique_ptr cq = builder.AddCompletionQueue(); + std::unique_ptr server = builder.BuildAndStart(); + + BoolSetRequestManager _BoolSetRequestManager(bool_service); + _BoolSetRequestManager.ListenForOne(*cq); + BoolGetRequestManager _BoolGetRequestManager(bool_service); + _BoolGetRequestManager.ListenForOne(*cq); + BoolDeleteRequestManager _BoolDeleteRequestManager(bool_service); + _BoolDeleteRequestManager.ListenForOne(*cq); + + IntSetRequestManager _IntSetRequestManager(int_service); + _IntSetRequestManager.ListenForOne(*cq); + IntGetRequestManager _IntGetRequestManager(int_service); + _IntGetRequestManager.ListenForOne(*cq); + IntDeleteRequestManager _IntDeleteRequestManager(int_service); + _IntDeleteRequestManager.ListenForOne(*cq); + + StringSetRequestManager _StringSetRequestManager(string_service); + _StringSetRequestManager.ListenForOne(*cq); + StringGetRequestManager _StringGetRequestManager(string_service); + _StringGetRequestManager.ListenForOne(*cq); + StringDeleteRequestManager _StringDeleteRequestManager(string_service); + _StringDeleteRequestManager.ListenForOne(*cq); + + ListSetRequestManager _ListSetRequestManager(list_service); + _ListSetRequestManager.ListenForOne(*cq); + ListGetRequestManager _ListGetRequestManager(list_service); + _ListGetRequestManager.ListenForOne(*cq); + ListDeleteRequestManager _ListDeleteRequestManager(list_service); + _ListDeleteRequestManager.ListenForOne(*cq); + + void* tag; + bool ok; + while (cq->Next(&tag, &ok)) { + if (!tag) + continue; + // taking ownership as per the manager api + std::unique_ptr event(static_cast(tag)); + + if (!ok) { + // idk could happen, but we do take ownership of the event anyway + (void)event; + continue; + } + + if (auto* req = std::get_if(event.get())) { + auto& manager = req->manager; + if (auto msg = manager.ConsumeMessage(); msg.has_value()) { + callback_enqueue_message(std::move(*msg)); + } + manager.ListenForOne(*cq); + } else if (auto* fin = std::get_if(event.get())) { + // we just destroy + std::cout << "Finished a request\n" << std::flush; + (void)fin; + } + } +} diff --git a/rediska/frontend/RequestManager.hpp b/rediska/frontend/RequestManager.hpp new file mode 100644 index 0000000..5338b23 --- /dev/null +++ b/rediska/frontend/RequestManager.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include +#include +#include "rediska/common/QueueMessage.hpp" + +// One global context kept alive as long as the server lives. +inline grpc::ServerContext& GlobalServerContext() { + static grpc::ServerContext ctx; + return ctx; +} + +// The thing recieved from the completion queue in case of a request. +struct BaseRequestManager { + virtual void ListenForOne(grpc::ServerCompletionQueue& cq) = 0; + virtual std::optional ConsumeMessage() = 0; + virtual ~BaseRequestManager() = default; +}; + +// This is basically storage for an incoming request. +template +class RequestManager : public BaseRequestManager { +protected: + RequestT request; + std::unique_ptr responder; + ServiceT& service; + + std::unique_ptr TakeResponder() { + return std::unique_ptr(std::move(responder)); + } + +public: + explicit RequestManager(ServiceT& svc) : service(svc) { } + + // Sets up listening for a new request. + // You must ensure this object lives until the request is consumed, or grpc will be unhappy. + // The tag you recieve from the queue is a EventVariant* that you must take ownership of. + void ListenForOne(grpc::ServerCompletionQueue& cq) override { + request = RequestT{}; + // FIXME: just leak this stupid shit for now idk how to fix I'm done with this stupid language + auto* ctx = new grpc::ServerContext(); + responder = std::make_unique(ctx); + (service.*RequestMethod)(ctx, &request, responder.get(), &cq, &cq, std::make_unique(RequestEvent{*this}).release()); + } + + std::optional ConsumeMessage() override { + QueueMessage msg = static_cast(this)->BuildMessage(); + return msg; + } +}; diff --git a/rediska/frontend/server.hpp b/rediska/frontend/server.hpp new file mode 100644 index 0000000..cb38ba9 --- /dev/null +++ b/rediska/frontend/server.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include +#include +#include "rediska/common/QueueMessage.hpp" + +// Starts the gRPC frontend server on the given address and blocks. +// Each incoming request is transformed into a QueueMessage with its responder +// moved inside; the provided callback decides when/how to finish it. +void RunFrontendServer(const std::string& address, + std::function on_request); diff --git a/rediska/main.cpp b/rediska/main.cpp new file mode 100644 index 0000000..e2877d1 --- /dev/null +++ b/rediska/main.cpp @@ -0,0 +1,5 @@ +#include "rediska/worker/FrontendWorker.cpp" + +int main() { + return run_frontend_server(); +} diff --git a/rediska/worker/AsyncWorker.cpp b/rediska/worker/AsyncWorker.cpp new file mode 100644 index 0000000..5bbf0c6 --- /dev/null +++ b/rediska/worker/AsyncWorker.cpp @@ -0,0 +1,157 @@ +#include "rediska/worker/AsyncWorker.hpp" +#include +#include "v1/primitives/bool.grpc.pb.h" +#include "v1/primitives/bool.pb.h" +#include "v1/primitives/int.grpc.pb.h" +#include "v1/primitives/int.pb.h" +#include "v1/primitives/string.grpc.pb.h" +#include "v1/primitives/string.pb.h" +#include "v1/collections/list.grpc.pb.h" +#include "v1/collections/list.pb.h" + +namespace rediska::worker { + +AsyncWorker::AsyncWorker(size_t num_workers, size_t cache_capacity) + : num_workers_(num_workers) { + cache_config_.maxCapacity = cache_capacity; + cache_config_.ttl = 0; + cache_config_.resetTTLOnAccess = true; + InitializeCache(); +} + +AsyncWorker::~AsyncWorker() { + Stop(); +} + +void AsyncWorker::Start() { + if (running_) return; + + running_ = true; + workers_.reserve(num_workers_); + + for (size_t i = 0; i < num_workers_; ++i) { + workers_.emplace_back(&AsyncWorker::WorkerLoop, this); + } +} + +void AsyncWorker::Stop() { + if (!running_) return; + + running_ = false; + queue_cv_.notify_all(); + + for (auto& worker : workers_) { + if (worker.joinable()) { + worker.join(); + } + } + + workers_.clear(); +} + +void AsyncWorker::Enqueue(QueueMessage msg) { + { + std::lock_guard lock(queue_mutex_); + message_queue_.push(std::move(msg)); + } + queue_cv_.notify_one(); +} + +void AsyncWorker::WorkerLoop() { + while (running_) { + QueueMessage msg; + { + std::unique_lock lock(queue_mutex_); + queue_cv_.wait(lock, [this] { return !message_queue_.empty() || !running_; }); + + if (!running_) break; + + msg = std::move(message_queue_.front()); + message_queue_.pop(); + } + + ProcessMessage(msg); + } +} + +void AsyncWorker::ProcessMessage(QueueMessage& msg) { + if (!msg.responder) return; + + try { + switch (msg.operation) { + case OperationId::SET: { + if (msg.type == CacheValueId::ARRAY) { + v1::collections::list::ListSetResponse response; + msg.respond(response); + } else { + google::protobuf::Empty response; + msg.respond(response); + } + break; + } + case OperationId::GET: { + cache_->get(std::string(msg.key)); + + // заглушка, пока кэш не интегрирован полностью TODO + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolGetResponse response; + response.set_value(false); + msg.respond(response); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntGetResponse response; + response.set_value(0); + msg.respond(response); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringGetResponse response; + response.set_value("cached_value"); + msg.respond(response); + } else if (msg.type == CacheValueId::ARRAY) { + v1::collections::list::ListGetResponse response; + msg.respond(response); + } + break; + } + case OperationId::DELETE: { + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolDeleteResponse response; + response.set_removed_value(false); + msg.respond(response); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntDeleteResponse response; + response.set_removed_value(0); + msg.respond(response); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringDeleteResponse response; + response.set_removed_value(""); + msg.respond(response); + } else if (msg.type == CacheValueId::ARRAY) { + google::protobuf::Empty response; + msg.respond(response); + } + break; + } + case OperationId::LIST_PUSH_BACK: + case OperationId::LIST_POP_BACK: + case OperationId::LIST_INSERT: + case OperationId::LIST_ERASE: { + google::protobuf::Empty response; + msg.respond(response); + break; + } + default: + break; + } + } catch (const std::exception& e) { + std::cout << "Error processing message: " << e.what() << std::endl; + } +} + +void AsyncWorker::InitializeCache() { + auto callback = [this](auto result) { + // TODO + }; + + cache_ = std::make_unique(cache_config_, callback); +} + +} // namespace rediska::worker \ No newline at end of file diff --git a/rediska/worker/AsyncWorker.hpp b/rediska/worker/AsyncWorker.hpp new file mode 100644 index 0000000..a2b7298 --- /dev/null +++ b/rediska/worker/AsyncWorker.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "rediska/cache/lru/LRU.hpp" +#include "rediska/common/QueueMessage.hpp" +#include "rediska/common/types.hpp" + +namespace rediska::worker { + +class AsyncWorker { +public: + explicit AsyncWorker(size_t num_workers = 4, size_t cache_capacity = 1000); + ~AsyncWorker(); + + void Start(); + void Stop(); + void Enqueue(QueueMessage msg); + bool IsRunning() const { return running_; } + +private: + void WorkerLoop(); + void ProcessMessage(QueueMessage& msg); + void InitializeCache(); + + std::vector workers_; + std::queue message_queue_; + std::mutex queue_mutex_; + std::condition_variable queue_cv_; + std::atomic running_{false}; + size_t num_workers_; + std::unique_ptr cache_; + cache::LRUConfig cache_config_; +}; + +} // namespace rediska::worker diff --git a/rediska/worker/AsyncWorkerMain.cpp b/rediska/worker/AsyncWorkerMain.cpp new file mode 100644 index 0000000..6dd0b84 --- /dev/null +++ b/rediska/worker/AsyncWorkerMain.cpp @@ -0,0 +1,14 @@ +#include "rediska/worker/AsyncWorker.hpp" +#include "rediska/frontend/server.hpp" +#include + +int run_async_server() { + rediska::worker::AsyncWorker worker(4, 1000); + worker.Start(); + + RunFrontendServer("0.0.0.0:50051", [&worker](QueueMessage msg) { + worker.Enqueue(std::move(msg)); + }); + + return 0; +} \ No newline at end of file diff --git a/rediska/worker/CMakeLists.txt b/rediska/worker/CMakeLists.txt new file mode 100644 index 0000000..ed31c39 --- /dev/null +++ b/rediska/worker/CMakeLists.txt @@ -0,0 +1,15 @@ +add_library(worker STATIC + tmp.cpp + AsyncWorker.cpp + AsyncWorker.hpp + AsyncWorkerMain.cpp + FrontendWorker.cpp + CacheWorker.cpp +) + +target_link_libraries(worker PUBLIC + common + frontend + cache + data-structures +) diff --git a/rediska/worker/CacheWorker.cpp b/rediska/worker/CacheWorker.cpp new file mode 100644 index 0000000..b6ccb8b --- /dev/null +++ b/rediska/worker/CacheWorker.cpp @@ -0,0 +1,183 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "google/protobuf/empty.pb.h" +#include "v1/primitives/bool.grpc.pb.h" +#include "v1/primitives/bool.pb.h" +#include "v1/primitives/int.grpc.pb.h" +#include "v1/primitives/int.pb.h" +#include "v1/primitives/string.grpc.pb.h" +#include "v1/primitives/string.pb.h" +#include "v1/collections/list.grpc.pb.h" +#include "v1/collections/list.pb.h" +#include "rediska/common/QueueMessage.hpp" +#include "rediska/frontend/RequestManager.hpp" +#include "rediska/frontend/server.hpp" +#include "rediska/cache/lru/LRU.hpp" +#include "rediska/data-structures/impl/ListDataStructure.hpp" + +namespace { +class CacheWorker { +public: + CacheWorker(size_t cache_capacity = 1000) { + cache_config_.maxCapacity = cache_capacity; + cache_config_.ttl = 0; + cache_config_.resetTTLOnAccess = true; + + auto callback = [this](auto result) { + }; + + cache_ = std::make_unique(cache_config_, callback); + } + + void Enqueue(QueueMessage msg) { + std::lock_guard lock(mtx_); + queue_.push(std::move(msg)); + cv_.notify_one(); + } + + void Run() { + for (;;) { + QueueMessage msg; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [&] { return !queue_.empty(); }); + msg = std::move(queue_.front()); + queue_.pop(); + } + + std::cout << "[cache-worker] key=" << msg.key + << " type=" << static_cast(msg.type) + << " op=" << static_cast(msg.operation) << std::endl; + + if (!msg.responder) continue; + + try { + switch (msg.operation) { + case OperationId::SET: { + if (msg.type == CacheValueId::ARRAY) { + auto list = std::make_shared(); + + if (std::holds_alternative(msg.arguments)) { + auto& args = std::get(msg.arguments); + for (auto& value : args.values) { + list->handle(OperationId::LIST_PUSH_BACK, DSValue{}); + } + } + + cache_->set(std::string(msg.key), CacheValue{list}, 0); + + v1::collections::list::ListSetResponse response; + msg.respond(response); + } else { + + CacheValue value; + if (msg.arguments.index() != 0) { + auto& args = std::get(msg.arguments); + value = args.value; + } + + cache_->set(std::string(msg.key), std::move(value), 0); + msg.respond(google::protobuf::Empty{}); + } + break; + } + case OperationId::GET: { + cache_->get(std::string(msg.key)); + + // я не нашел пока полный callback, тут заглушки (TODO) + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolGetResponse response; + response.set_value(true); + msg.respond(response); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntGetResponse response; + response.set_value(42); + msg.respond(response); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringGetResponse response; + response.set_value("cached_string_value"); + msg.respond(response); + } else if (msg.type == CacheValueId::ARRAY) { + v1::collections::list::ListGetResponse response; + msg.respond(response); + } + break; + } + case OperationId::DELETE: { + cache_->applyTo(std::string(msg.key), OperationId::DELETE, std::move(msg.arguments)); + + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolDeleteResponse response; + response.set_removed_value(true); + msg.respond(response); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntDeleteResponse response; + response.set_removed_value(42); + msg.respond(response); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringDeleteResponse response; + response.set_removed_value("deleted_string"); + msg.respond(response); + } else if (msg.type == CacheValueId::ARRAY) { + msg.respond(google::protobuf::Empty{}); + } + break; + } + case OperationId::LIST_PUSH_BACK: { + cache_->applyTo(std::string(msg.key), OperationId::LIST_PUSH_BACK, std::move(msg.arguments)); + msg.respond(google::protobuf::Empty{}); + break; + } + case OperationId::LIST_POP_BACK: { + cache_->applyTo(std::string(msg.key), OperationId::LIST_POP_BACK, std::move(msg.arguments)); + + v1::collections::list::PopBackResponse response; + msg.respond(response); + break; + } + case OperationId::LIST_INSERT: { + cache_->applyTo(std::string(msg.key), OperationId::LIST_INSERT, std::move(msg.arguments)); + msg.respond(google::protobuf::Empty{}); + break; + } + case OperationId::LIST_ERASE: { + cache_->applyTo(std::string(msg.key), OperationId::LIST_ERASE, std::move(msg.arguments)); + msg.respond(google::protobuf::Empty{}); + break; + } + default: + break; + } + } catch (const std::exception& e) { + std::cout << "Error processing message: " << e.what() << std::endl; + } + } + } + +private: + std::queue queue_; + std::mutex mtx_; + std::condition_variable cv_; + std::unique_ptr cache_; + cache::LRUConfig cache_config_; +}; +} // namespace + +int run_cache_server() { + CacheWorker worker(1000); + std::thread t([&] { worker.Run(); }); + + RunFrontendServer("0.0.0.0:50051", [&](QueueMessage msg) { + worker.Enqueue(std::move(msg)); + }); + + t.join(); + return 0; +} diff --git a/rediska/worker/FrontendWorker.cpp b/rediska/worker/FrontendWorker.cpp new file mode 100644 index 0000000..755e5ac --- /dev/null +++ b/rediska/worker/FrontendWorker.cpp @@ -0,0 +1,123 @@ +#include +#include +#include +#include +#include +#include +#include "google/protobuf/empty.pb.h" +#include "v1/primitives/bool.grpc.pb.h" +#include "v1/primitives/int.grpc.pb.h" +#include "v1/primitives/string.grpc.pb.h" +#include "v1/collections/list.grpc.pb.h" +#include "rediska/common/QueueMessage.hpp" +#include "rediska/frontend/RequestManager.hpp" +#include "rediska/frontend/server.hpp" +#include "rediska/cache/lru/LRU.hpp" + +namespace { +class FrontendWorker { +public: + void Enqueue(QueueMessage msg) { + std::lock_guard lock(mtx_); + queue_.push(std::move(msg)); + cv_.notify_one(); + } + + void Run() { + for (;;) { + QueueMessage msg; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [&] { return !queue_.empty(); }); + msg = std::move(queue_.front()); + queue_.pop(); + } + + std::cout << "[worker] key=" << msg.key + << " type=" << static_cast(msg.type) + << " op=" << static_cast(msg.operation) << std::endl; + + if (!msg.responder) continue; + + try { + switch (msg.operation) { + case OperationId::SET: { + if (msg.type == CacheValueId::ARRAY) { + v1::collections::list::ListSetResponse response; + msg.respond(response); + } else { + msg.respond(google::protobuf::Empty{}); + } + break; + } + case OperationId::GET: { + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolGetResponse response; + response.set_value(false); + msg.respond(response); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntGetResponse response; + response.set_value(0); + msg.respond(response); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringGetResponse response; + response.set_value("cached_value"); + msg.respond(response); + } else if (msg.type == CacheValueId::ARRAY) { + v1::collections::list::ListGetResponse response; + msg.respond(response); + } + break; + } + case OperationId::DELETE: { + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolDeleteResponse response; + response.set_removed_value(false); + msg.respond(response); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntDeleteResponse response; + response.set_removed_value(0); + msg.respond(response); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringDeleteResponse response; + response.set_removed_value(""); + msg.respond(response); + } else if (msg.type == CacheValueId::ARRAY) { + msg.respond(google::protobuf::Empty{}); + } + break; + } + case OperationId::LIST_PUSH_BACK: + case OperationId::LIST_POP_BACK: + case OperationId::LIST_INSERT: + case OperationId::LIST_ERASE: { + msg.respond(google::protobuf::Empty{}); + break; + } + default: + break; + } + } catch (const std::exception& e) { + std::cout << "Error processing message: " << e.what() << std::endl; + } + } + } + +private: + std::queue queue_; + std::mutex mtx_; + std::condition_variable cv_; +}; +} // namespace + +int run_frontend_server() { + FrontendWorker worker; + std::thread t([&] { worker.Run(); }); + + RunFrontendServer("0.0.0.0:50051", [&](QueueMessage msg) { + worker.Enqueue(std::move(msg)); + }); + + t.join(); + return 0; +} diff --git a/rediska/worker/tmp.cpp b/rediska/worker/tmp.cpp new file mode 100644 index 0000000..de144db --- /dev/null +++ b/rediska/worker/tmp.cpp @@ -0,0 +1,86 @@ +#include +#include +#include +#include +#include +#include +#include "google/protobuf/empty.pb.h" +#include "v1/primitives/bool.grpc.pb.h" +#include "v1/primitives/int.grpc.pb.h" +#include "v1/primitives/string.grpc.pb.h" +#include "v1/collections/list.grpc.pb.h" +#include "rediska/common/QueueMessage.hpp" +#include "rediska/frontend/RequestManager.hpp" +#include "rediska/frontend/server.hpp" + +namespace { +class PrintWorker { +public: + void Enqueue(QueueMessage msg) { + std::lock_guard lock(mtx_); + queue_.push(std::move(msg)); + cv_.notify_one(); + } + + void Run() { + for (;;) { + QueueMessage msg; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [&] { return !queue_.empty(); }); + msg = std::move(queue_.front()); + queue_.pop(); + } + + std::cout << "[worker] key=" << msg.key + << " type=" << static_cast(msg.type) + << " op=" << static_cast(msg.operation) << std::endl; + + if (!msg.responder) continue; + + switch (msg.operation) { + case OperationId::SET: + case OperationId::DELETE: { + msg.respond(google::protobuf::Empty{}); + break; + } + case OperationId::GET: { + if (msg.type == CacheValueId::BOOLEAN) { + v1::primitives::boolean::BoolGetResponse r; r.set_value(false); + msg.respond(r); + } else if (msg.type == CacheValueId::INT) { + v1::primitives::integer::IntGetResponse r; r.set_value(0); + msg.respond(r); + } else if (msg.type == CacheValueId::STRING) { + v1::primitives::str::StringGetResponse r; r.set_value("test"); + msg.respond(r); + } else if (msg.type == CacheValueId::ARRAY) { + v1::collections::list::ListGetResponse r; + msg.respond(r); + } + break; + } + default: + break; + } + } + } + +private: + std::queue queue_; + std::mutex mtx_; + std::condition_variable cv_; +}; +} // namespace + +int run_print_server() { + PrintWorker worker; + std::thread t([&] { worker.Run(); }); + + RunFrontendServer("0.0.0.0:50051", [&](QueueMessage msg) { + worker.Enqueue(std::move(msg)); + }); + + t.join(); + return 0; +} diff --git a/scripts/send_test_requests.py b/scripts/send_test_requests.py new file mode 100644 index 0000000..e6a24ae --- /dev/null +++ b/scripts/send_test_requests.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +""" +Quick smoke sender for the temp print server. +Requires python -m pip install grpcio grpcio-tools. +""" +import subprocess +import sys +import tempfile +from pathlib import Path +import grpc + + +def generate_stubs(proto_root: Path, out_dir: Path): + protos = [ + "v1/primitives/bool.proto", + "v1/primitives/int.proto", + "v1/primitives/string.proto", + "v1/collections/list.proto", + "v1/collections/common.proto", + "v1/collections/element_kind.proto", + ] + args = [ + sys.executable, + "-m", + "grpc_tools.protoc", + f"-I{proto_root}", + f"--python_out={out_dir}", + f"--grpc_python_out={out_dir}", + ] + protos + subprocess.check_call(args, cwd=proto_root) + + +def main(): + repo_root = Path(__file__).resolve().parent.parent + proto_root = repo_root / "proto" + with tempfile.TemporaryDirectory() as td: + out_dir = Path(td) + generate_stubs(proto_root, out_dir) + sys.path.insert(0, str(out_dir)) + + from v1.primitives import bool_pb2_grpc as bool_grpc, bool_pb2 + from v1.primitives import int_pb2_grpc as int_grpc, int_pb2 + from v1.primitives import string_pb2_grpc as str_grpc, string_pb2 + from v1.collections import list_pb2_grpc as list_grpc, list_pb2 + + channel = grpc.insecure_channel("localhost:50051") + + bool_stub = bool_grpc.BoolCacheServiceStub(channel) + int_stub = int_grpc.IntCacheServiceStub(channel) + str_stub = str_grpc.StringCacheServiceStub(channel) + list_stub = list_grpc.ListCacheServiceStub(channel) + + print("Sending Bool Set") + bool_stub.Set(bool_pb2.BoolSetRequest(id="b1", value=True)) + + print("Sending Int Set/Get/Delete") + int_stub.Set(int_pb2.IntSetRequest(id="i1", value=42)) + int_resp = int_stub.Get(int_pb2.IntGetRequest(id="i1")) + print("Int get response:", int_resp.value) + int_stub.Delete(int_pb2.IntDeleteRequest(id="i1")) + + print("Sending String Set") + str_stub.Set(string_pb2.StringSetRequest(id="s1", value="hello")) + + print("Sending List Set") + list_stub.Set(list_pb2.ListSetRequest(id="l1", elements=[])) + + print("Done. Check server stdout for printed messages.") + + +if __name__ == "__main__": + main() diff --git a/tests/BasicTests.cpp b/tests/BasicTests.cpp new file mode 100644 index 0000000..61bc6a6 --- /dev/null +++ b/tests/BasicTests.cpp @@ -0,0 +1,171 @@ +#include +#include +#include +#include +#include + +#include "v1/primitives/bool.grpc.pb.h" +#include "v1/primitives/int.grpc.pb.h" +#include "v1/primitives/string.grpc.pb.h" +#include "v1/collections/list.grpc.pb.h" + +class RediskaBasicTest : public ::testing::Test { +protected: + void SetUp() override { + channel_ = grpc::CreateChannel(grpc::InsecureChannel("localhost:50051")); + bool_stub_ = std::make_unique(channel_); + int_stub_ = std::make_unique(channel_); + string_stub_ = std::make_unique(channel_); + list_stub_ = std::make_unique(channel_); + } + + void TearDown() override { + channel_->Shutdown(); + } + + std::shared_ptr channel_; + std::unique_ptr bool_stub_; + std::unique_ptr int_stub_; + std::unique_ptr string_stub_; + std::unique_ptr list_stub_; +}; + +TEST_F(RediskaBasicTest, BooleanOperations) { + // Test SET operation + bool_stub_->Set(v1::primitives::boolean::BoolSetRequest(id="test_bool", value=true)); + + // Test GET operation + auto response = bool_stub_->Get(v1::primitives::BoolGetRequest(id="test_bool")); + EXPECT_TRUE(response.value()); + + // Test DELETE operation + auto delete_response = bool_stub_->Delete(v1::primitives::BoolDeleteRequest(id="test_bool")); + EXPECT_TRUE(delete_response.removed_value()); +} + +TEST_F(RediskaBasicTest, IntegerOperations) { + // Test SET operation + int_stub_->Set(v1::primitives::integer::IntSetRequest(id="test_int", value=42)); + + // Test GET operation + auto response = int_stub_->Get(v1::primitives::integer::IntGetRequest(id="test_int")); + EXPECT_EQ(response.value(), 42); + + // Test DELETE operation + auto delete_response = int_stub_->Delete(v1::primitives::integer::IntDeleteRequest(id="test_int")); + EXPECT_EQ(delete_response.removed_value(), 42); +} + +TEST_F(RediskaBasicTest, StringOperations) { + // Test SET operation + string_stub_->Set(v1::primitives::str::StringSetRequest(id="test_string", value="hello")); + + // Test GET operation + auto response = string_stub_->Get(v1::primitives::str::StringGetRequest(id="test_string")); + EXPECT_EQ(response.value(), "hello"); + + // Test DELETE operation + auto delete_response = string_stub_->Delete(v1::primitives::str::StringDeleteRequest(id="test_string")); + EXPECT_EQ(delete_response.removed_value(), "hello"); +} + +TEST_FediskaBasicTest, ListOperations) { + // Test CREATE operation + auto create_response = list_stub_->Create(v1::collections::list::ListCreateRequest( + element_kind=v1::collections::common::ElementKind::INT, + ttl_seconds=3600 + )); + std::string list_id = create_response.id; + + // Test SET operation + std::vector elements; + elements.push_back(v1::collections::common::CollectionElement(integer=1)); + elements.push_back(v1::collections::common::CollectionElement(integer=2)); + elements.push_back(v1::collections::common::CollectionElement(integer=3)); + + list_stub_->Set(v1::collections::list::ListSetRequest(id=list_id, elements=elements)); + + // Test GET operation (streaming) + auto get_response = list_stub_->Get(v1::collections::list::ListGetRequest(id=list_id)); + int count = 0; + for (auto element : get_response) { + EXPECT_EQ(element.integer(), count + 1); + count++; + } + EXPECT_EQ(count, 3); + + // Test LENGTH operation + auto length_response = list_stub_->Length(v1::collections::list::ListLengthRequest(id=list_id)); + EXPECT_EQ(length_response.length(), 3); + + // Test DELETE operation + list_stub_->Delete(v1::collections::DeleteRequest(id=list_id)); +} + +TEST_FediskaBasicTest, ConcurrentOperations) { + const int num_threads = 10; + const int operations_per_thread = 100; + + std::vector threads; + std::vector> exceptions; + + // Запускаем несколько потоков одновременно + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back([&, i, operations_per_thread]() { + try { + for (int j = 0; j < operations_per_thread; ++j) { + std::string key = "test_" + std::to_string(i * operations_per_thread + j); + + if (j % 3 == 0) { + bool_stub_->Set(v1::primitives::boolean::BoolSetRequest(key, true)); + } else if (j % 3 == 1) { + int_stub_->Set(v1::primitives::integer::IntSetRequest(key, j)); + } else { + string_stub_->Set(v1::primitives::str::StringSetRequest(key, "value_" + std::to_string(j))); + } + } + } catch (const std::exception& e) { + exceptions.emplace_back(std::current_exception()); + } + }); + } + + // Ждем завершения всех потоков + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + + // Проверяем, что не было исключений + EXPECT_TRUE(exceptions.empty()); +} + +TEST_FediskaBasicTest, ServerStability) { + // Тест на стабильность сервера при множественных запросов + const int num_requests = 1000; + + for (int i = 0; i < num_requests; ++i) { + std::string key = "stability_test_" + std::to_string(i); + + if (i % 4 == 0) { + bool_stub_->Set(v1::primitives::boolean::BoolSetRequest(key, true)); + } else if (i % 4 == 1) { + int_stub_->Set(v1::primitives::integer::IntSetRequest(key, i)); + } else if (i % 4 == 2) { + string_stub_->Set(v1::primitives::str::StringSetRequest(key, "value_" + std::to_string(i))); + } else { + list_stub_->Set(v1::collections::ListSetRequest( + id="list_" + std::to_string(i), + elements={ + v1::collections::common::CollectionElement(integer=i) + } + )); + } + + if (i % 10 == 0) { + auto response = bool_stub_->Get(v1::primitives::boolean::BoolGetRequest(key)); + EXPECT_TRUE(response.value()); + } + } +} \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt new file mode 100644 index 0000000..eefd353 --- /dev/null +++ b/tests/CMakeLists.txt @@ -0,0 +1,39 @@ +add_executable(unit_tests + main.cpp +) + +target_link_libraries(unit_tests + PRIVATE + worker + cache + frontend + doctest::doctest +) + +list(APPEND CMAKE_MODULE_PATH "${doctest_SOURCE_DIR}/scripts/cmake") +include(doctest) +list(POP_BACK CMAKE_MODULE_PATH) + +doctest_discover_tests(unit_tests + TEST_PREFIX "" + TEST_SUFFIX "" +) + +find_program(CPPCHECK_EXECUTABLE cppcheck) +if(CPPCHECK_EXECUTABLE) + add_test( + NAME static_analysis + COMMAND ${CPPCHECK_EXECUTABLE} + --enable=all + --std=c++23 + --error-exitcode=1 + --inline-suppr + ${CMAKE_SOURCE_DIR}/rediska + ) + set_tests_properties(static_analysis PROPERTIES + LABELS "static" + TIMEOUT 120 + ) +else() + message(WARNING "Cppcheck not found — skipping static analysis test") +endif() diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..d25156f --- /dev/null +++ b/tests/README.md @@ -0,0 +1,39 @@ +# Creating a test group + +1. **Create a new test file** \ + Place it under `/tests/frontend/` or `/tests/backend/`, following the [naming convention](#naming-convention) below. + +2. **Include the testing framework** \ + Add the following at the top of the file: + ```C++ + #include + ``` + +3. **Register the file in the build system** \ + Add the new test file to `/tests/CMakeLists.txt` so it’s compiled and executed as part of the test suite. + +4. **Start writing tests!** \ + See [doctest docs](https://github.com/doctest/doctest/blob/master/doc/markdown/readme.md) for more details. + +# Naming convention + +File name should match against the following pattern: + +`___tests.cpp` + +Where: +- `` - the component or subsystem under test. + - Backend modules: `cache` | `worker` + - Frontend modules: `work in progress` +- `` - a concise, kebab-case description of the specific functionality. +- `` - one of the following: + - `unit` + - `integration` + - `e2e` + +# Examples + +| Path | Description | +| :--------------------------------------------- | :------------------------------------------------------------- | +| `backend/cache_lru_unit_tests.cpp` | Unit tests for LRU eviction logic in the cache module | +| `backend/worker_handle-request_unit_tests.cpp` | Unit tests for the request-handling logic in the worker module | diff --git a/tests/backend/.keep b/tests/backend/.keep new file mode 100644 index 0000000..e69de29 diff --git a/tests/frontend/.keep b/tests/frontend/.keep new file mode 100644 index 0000000..e69de29 diff --git a/tests/main.cpp b/tests/main.cpp new file mode 100644 index 0000000..0a3f254 --- /dev/null +++ b/tests/main.cpp @@ -0,0 +1,2 @@ +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include