feat(storage:inkless): InitDisklessLog Diskless Controller API#528
feat(storage:inkless): InitDisklessLog Diskless Controller API#528
Conversation
21b3106 to
c834fe6
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new Control Plane API to initialize a diskless log (and optional producer state) at a specific start offset, backed by a new Postgres function and schema changes, to support classic→diskless migration workflows.
Changes:
- Add
diskless_start_offsettologsand introduceinit_diskless_log_v1(...)Postgres function + jOOQ regeneration (schema v11). - Add ControlPlane API surface (
initDisklessLog) with Postgres and in-memory implementations, plus query metrics. - Extend integration/unit tests to cover
initDisklessLogbehavior and accommodate the newlogscolumn.
Reviewed changes
Copilot reviewed 14 out of 131 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java | Update expected LogsRecord shape for new diskless_start_offset column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java | Update expected LogsRecord shape for new diskless_start_offset column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java | Update expected LogsRecord shape for new diskless_start_offset column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java | Update expected LogsRecord shape for new diskless_start_offset column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/AbstractControlPlaneTest.java | Add test coverage for initDisklessLog API, including producer-state bootstrap. |
| storage/inkless/src/main/resources/db/migration/V11__Init_diskless_log.sql | Add diskless_start_offset column and init_diskless_log_v1 function + related UDT/ENUM types. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ReleaseFileMergeWorkItemResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsRequestV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogResponseV1Record.java | jOOQ: new UDT record for init-diskless-log response. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogRequestV1Record.java | jOOQ: new UDT record for init-diskless-log request. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogProducerStateV1Record.java | jOOQ: new UDT record for producer-state bootstrap entries. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesRequestV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FileMergeWorkItemResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FileMergeWorkItemResponseFileV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FileMergeWorkItemResponseBatchV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionRequestV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsRequestV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitFileMergeWorkItemResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitFileMergeWorkItemBatchV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchResponseV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchRequestV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchMetadataV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchInfoV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ReleaseFileMergeWorkItemResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsRequestV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogResponseV1Path.java | jOOQ: new UDT path for init-diskless-log response. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogRequestV1Path.java | jOOQ: new UDT path for init-diskless-log request. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogProducerStateV1Path.java | jOOQ: new UDT path for producer-state bootstrap entries. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesRequestV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FileMergeWorkItemResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FileMergeWorkItemResponseFileV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FileMergeWorkItemResponseBatchV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionRequestV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsRequestV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitFileMergeWorkItemResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitFileMergeWorkItemBatchV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchResponseV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchRequestV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchMetadataV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchInfoV1Path.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ReleaseFileMergeWorkItemResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsRequestV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogResponseV1.java | jOOQ: new UDT for init-diskless-log response. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogRequestV1.java | jOOQ: new UDT for init-diskless-log request. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogProducerStateV1.java | jOOQ: new UDT for producer-state bootstrap entries. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesRequestV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FileMergeWorkItemResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FileMergeWorkItemResponseFileV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FileMergeWorkItemResponseBatchV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionRequestV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsRequestV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitFileMergeWorkItemResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitFileMergeWorkItemBatchV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchResponseV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchRequestV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchMetadataV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchInfoV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ProducerStateRecord.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/LogsRecord.java | jOOQ: add disklessStartOffset accessor + constructor arg. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ListOffsetsV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/InitDisklessLogV1Record.java | jOOQ: new table-record wrapper for the init function. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/GetFileMergeWorkItemV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV2Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FilesRecord.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FileMergeWorkItemsRecord.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FileMergeWorkItemFilesRecord.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV2Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/DeleteRecordsV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/CommitFileV1Record.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/BatchesRecord.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/ProducerState.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Logs.java | jOOQ: add DISKLESS_START_OFFSET field + bump schema version. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/ListOffsetsV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/InitDisklessLogV1.java | jOOQ: new function-table wrapper for init_diskless_log_v1. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/GetFileMergeWorkItemV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV2.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Files.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FileMergeWorkItems.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FileMergeWorkItemFiles.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV2.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/DeleteRecordsV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/CommitFileV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Batches.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/ReleaseFileMergeWorkItemV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/MarkFileToDeleteV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteTopicV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteFilesV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteBatchV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/CommitFileMergeWorkItemV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/BatchTimestamp.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/ReleaseFileMergeWorkItemErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/ListOffsetsResponseErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/InitDisklessLogErrorV1.java | jOOQ: new enum for init-diskless-log error values. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FindBatchesResponseErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FileStateT.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FileReasonT.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/EnforceRetentionResponseErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/DeleteRecordsResponseErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/CommitFileMergeWorkItemErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/CommitBatchResponseErrorV1.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/UDTs.java | jOOQ: register new init-diskless-log UDTs + bump schema version. |
| storage/inkless/src/main/jooq/org/jooq/generated/Tables.java | jOOQ: register init-diskless-log function table + bump schema version. |
| storage/inkless/src/main/jooq/org/jooq/generated/Routines.java | jOOQ: add init-diskless-log routine wrapper + bump schema version. |
| storage/inkless/src/main/jooq/org/jooq/generated/Keys.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/Indexes.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/Domains.java | jOOQ regen: bump schema version to 11. |
| storage/inkless/src/main/jooq/org/jooq/generated/DefaultSchema.java | jOOQ: register init-diskless-log artifacts in schema + bump version. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java | Add query metrics tracking for InitDisklessLog. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java | Wire initDisklessLog API to a new Postgres job. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java | Implement Postgres-backed init job calling init_diskless_log_v1. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogResponse.java | Add API response type for init operation. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java | Add API request type including diskless start offset and producer states. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogProducerState.java | Add producer-state bootstrap entry model. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java | Add in-memory implementation of initDisklessLog + track diskless start offset. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java | Add initDisklessLog method to the ControlPlane interface with docs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/resources/db/migration/V11__Init_diskless_log.sql
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java
Outdated
Show resolved
Hide resolved
c834fe6 to
0e2aa83
Compare
storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/resources/db/migration/V11__Init_diskless_log.sql
Outdated
Show resolved
Hide resolved
0e2aa83 to
5ecd9fa
Compare
Introduce a new Control Plane (aka Diskless Controller) API for initializng a diskless log from an already existing partition. This will be used for migrating a classic topic to diskless.
5ecd9fa to
0a4137c
Compare
jeqo
left a comment
There was a problem hiding this comment.
latest changes look good; just a couple more comments
| } | ||
|
|
||
| @Nested | ||
| class InitDisklessLog { |
There was a problem hiding this comment.
Should we expand getLogInfo to properly validate disklessStartOffset?
There was a problem hiding this comment.
Added a new commit for getting the disklessStartOffset from getLogInfo
|
|
||
| assertThat(controlPlane.getLogInfo(List.of(new GetLogInfoRequest(NEW_TOPIC_ID, 0)))) | ||
| .containsExactly(GetLogInfoResponse.success(100, 100, 0)); | ||
| } |
There was a problem hiding this comment.
we can acknowledge here that there is no means to list producer state (yet) and only log info is validated
There was a problem hiding this comment.
yes right, I can add another control plane API for retrieving the producer states in another PR
There was a problem hiding this comment.
sure. Btw there's a kafka api DESCRIBE_PRODUCERS that may require this
* feat(storage:inkless): InitDisklessLog Diskless Controller API Introduce a new Control Plane (aka Diskless Controller) API for initializng a diskless log from an already existing partition. This will be used for migrating a classic topic to diskless. * feat(storage:inkless): return disklessStartOffset from getLogInfo (cherry picked from commit acba06f)
* feat(storage:inkless): InitDisklessLog Diskless Controller API Introduce a new Control Plane (aka Diskless Controller) API for initializng a diskless log from an already existing partition. This will be used for migrating a classic topic to diskless. * feat(storage:inkless): return disklessStartOffset from getLogInfo
* feat(storage:inkless): InitDisklessLog Diskless Controller API Introduce a new Control Plane (aka Diskless Controller) API for initializng a diskless log from an already existing partition. This will be used for migrating a classic topic to diskless. * feat(storage:inkless): return disklessStartOffset from getLogInfo (cherry picked from commit acba06f)
Introduce a new Control Plane (aka Diskless Controller) API for initializing a diskless log from an already existing partition. This will be used for migrating a classic topic to diskless.