feat(produce): add PipelinedWriter to eliminate lock contention#526
feat(produce): add PipelinedWriter to eliminate lock contention#526
Conversation
Introduce PipelinedWriter as an alternative to Writer that separates request validation from file writing using a lock-free queue pattern. Architecture: - Validation phase: concurrent, lock-free request processing - Writing phase: single-threaded batch consumption from queue - Configurable via produce.pipelined.writer.enabled (default: false) New components: - PipelinedWriter: main orchestrator with validation/writing threads - ValidatedRequest: immutable validated request container - ProduceWriter: interface abstracting Writer and PipelinedWriter - ActiveFile: extracted from Writer for shared file management Benchmark results (15min sustained load, non-timeout periods): ┌───────────────────────┬───────────┬─────────────────┬──────────┐ │ Metric │ Baseline │ PipelinedWriter │ Delta │ ├───────────────────────┼───────────┼─────────────────┼──────────┤ │ Avg latency - median │ 9,059 ms │ 8,767 ms │ -3.2% │ ├───────────────────────┼───────────┼─────────────────┼──────────┤ │ P50 latency - median │ 6,401 ms │ 5,271 ms │ -17.7% │ ├───────────────────────┼───────────┼─────────────────┼──────────┤ │ P99 latency - median │ 32,143 ms │ 24,284 ms │ -24.5% │ ├───────────────────────┼───────────┼─────────────────┼──────────┤ │ P999 latency - median │ 42,844 ms │ 30,363 ms │ -29.1% │ ├───────────────────────┼───────────┼─────────────────┼──────────┤ │ Max latency - median │ 44,810 ms │ 32,941 ms │ -26.5% │ └───────────────────────┴───────────┴─────────────────┴──────────┘ Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces a PipelinedWriter as an alternative to the existing lock-based Writer for the Inkless produce path. The key architectural change is separating record validation (done in parallel across N worker threads) from buffer writing (done by a single dedicated thread), connected via a bounded queue. This eliminates the global writer lock contention. A new ProduceWriter interface abstracts both implementations, and a configuration flag (produce.pipelined.enabled, default: false) controls which implementation is used.
Changes:
- Introduces
PipelinedWriterwith a SEDA architecture: parallel validation workers → bounded queue → single buffer writer thread, along withValidatedRequestas the inter-stage data container andProduceWriteras the common interface - Extends
ActiveFilewithaddBatchDirect()andaddAwaitingFuture()methods to support the pipelined flow where validation happens separately from buffer writing - Adds configuration options (
produce.pipelined.enabled,produce.pipelined.validation.threads) and the factory logic inAppendHandlerto select between writer implementations
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
PipelinedWriter.java |
Main SEDA-based writer with validation workers, buffer queue, and single buffer writer thread |
ValidatedRequest.java |
Immutable container transferring validated batches between stages |
ProduceWriter.java |
Interface abstracting Writer and PipelinedWriter |
ActiveFile.java |
New addBatchDirect() and addAwaitingFuture() methods for pipelined support |
Writer.java |
Now implements ProduceWriter, write() made public |
AppendHandler.java |
Factory method createWriter() selecting implementation based on config |
InklessConfig.java |
New produce.pipelined.enabled and produce.pipelined.validation.threads configs |
docs/inkless/configs.rst |
Documentation for new config options |
PipelinedWriterTest.java |
Tests for tick/size rotation, concurrent writes, lifecycle |
ValidatedRequestTest.java |
Tests for ValidatedRequest construction and accessors |
ActiveFileTest.java |
Tests for new pipelined support methods |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
- Use UNKNOWN_OFFSET constant instead of hardcoded -1L - Use Objects.requireNonNull for brokerTopicStats - Create own RequestLocal per validation worker to avoid thread-safety issues - Add exceptionally handler to runAsync to prevent hanging futures - Extract ValidationResult record to avoid throwaway CompletableFuture - Fix thread-safety in tickRotate by removing cross-thread activeFile.isEmpty() check - Implement maybeTickRotate to schedule rotation when queue is empty - Await validation tasks in close() before shutting down buffer writer - Clean up stale development comments - Add tests for producePipelinedEnabled and producePipelinedValidationThreads configs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java
Outdated
Show resolved
Hide resolved
- Fix double logging for InvalidProducerEpochException (add else clause) - Fix topic config validation to fail if ANY topic is missing config (use allMatch) - Add explicit isRotationTrigger flag to ValidatedRequest instead of relying on empty maps - Replace Math.max with simple assignment + warning for request ID tracking - Fix typo: 'millisecond' -> 'milliseconds' in configs.rst Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Change "millisecond" to "milliseconds" in the documentation string. This fixes the CI check for uncommitted documentation changes. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
Introduce PipelinedWriter as an alternative to Writer that separates request validation from file writing using a lock-free queue pattern.
Architecture:
produce.pipelined.writer.enabled(default: false)New components:
PipelinedWriter: main orchestrator with validation/writing threadsValidatedRequest: immutable validated request containerProduceWriter: interface abstracting Writer and PipelinedWriterActiveFile: extracted from Writer for shared file managementBenchmark results
15min sustained load:
Test plan
🤖 Generated with Claude Code