Skip to content

feat(produce): add PipelinedWriter to eliminate lock contention#526

Draft
Mwea wants to merge 4 commits intomainfrom
tchary/pipelined-writer-v2
Draft

feat(produce): add PipelinedWriter to eliminate lock contention#526
Mwea wants to merge 4 commits intomainfrom
tchary/pipelined-writer-v2

Conversation

@Mwea
Copy link
Copy Markdown
Contributor

@Mwea Mwea commented Mar 3, 2026

Summary

Introduce PipelinedWriter as an alternative to Writer that separates request validation from file writing using a lock-free queue pattern.

Architecture:

  • Validation phase: concurrent, lock-free request processing
  • Writing phase: single-threaded batch consumption from queue
  • Configurable via produce.pipelined.writer.enabled (default: false)

New components:

  • PipelinedWriter: main orchestrator with validation/writing threads
  • ValidatedRequest: immutable validated request container
  • ProduceWriter: interface abstracting Writer and PipelinedWriter
  • ActiveFile: extracted from Writer for shared file management

Benchmark results

15min sustained load:

Metric Baseline PipelinedWriter Delta
Avg latency - median 9,059 ms 8,767 ms -3.2%
P50 latency - median 6,401 ms 5,271 ms -17.7%
P99 latency - median 32,143 ms 24,284 ms -24.5%
P999 latency - median 42,844 ms 30,363 ms -29.1%
Max latency - median 44,810 ms 32,941 ms -26.5%

Test plan

  • Unit tests for PipelinedWriter, ValidatedRequest, ActiveFile
  • Integration testing with feature flag enabled

🤖 Generated with Claude Code

Introduce PipelinedWriter as an alternative to Writer that separates
request validation from file writing using a lock-free queue pattern.

Architecture:
- Validation phase: concurrent, lock-free request processing
- Writing phase: single-threaded batch consumption from queue
- Configurable via produce.pipelined.writer.enabled (default: false)

New components:
- PipelinedWriter: main orchestrator with validation/writing threads
- ValidatedRequest: immutable validated request container
- ProduceWriter: interface abstracting Writer and PipelinedWriter
- ActiveFile: extracted from Writer for shared file management

Benchmark results (15min sustained load, non-timeout periods):
┌───────────────────────┬───────────┬─────────────────┬──────────┐
│        Metric         │ Baseline  │ PipelinedWriter │  Delta   │
├───────────────────────┼───────────┼─────────────────┼──────────┤
│ Avg latency - median  │ 9,059 ms  │ 8,767 ms        │ -3.2%    │
├───────────────────────┼───────────┼─────────────────┼──────────┤
│ P50 latency - median  │ 6,401 ms  │ 5,271 ms        │ -17.7%   │
├───────────────────────┼───────────┼─────────────────┼──────────┤
│ P99 latency - median  │ 32,143 ms │ 24,284 ms       │ -24.5%   │
├───────────────────────┼───────────┼─────────────────┼──────────┤
│ P999 latency - median │ 42,844 ms │ 30,363 ms       │ -29.1%   │
├───────────────────────┼───────────┼─────────────────┼──────────┤
│ Max latency - median  │ 44,810 ms │ 32,941 ms       │ -26.5%   │
└───────────────────────┴───────────┴─────────────────┴──────────┘

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Mwea added a commit that referenced this pull request Mar 3, 2026
@Mwea Mwea requested a review from Copilot March 5, 2026 15:52
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a PipelinedWriter as an alternative to the existing lock-based Writer for the Inkless produce path. The key architectural change is separating record validation (done in parallel across N worker threads) from buffer writing (done by a single dedicated thread), connected via a bounded queue. This eliminates the global writer lock contention. A new ProduceWriter interface abstracts both implementations, and a configuration flag (produce.pipelined.enabled, default: false) controls which implementation is used.

Changes:

  • Introduces PipelinedWriter with a SEDA architecture: parallel validation workers → bounded queue → single buffer writer thread, along with ValidatedRequest as the inter-stage data container and ProduceWriter as the common interface
  • Extends ActiveFile with addBatchDirect() and addAwaitingFuture() methods to support the pipelined flow where validation happens separately from buffer writing
  • Adds configuration options (produce.pipelined.enabled, produce.pipelined.validation.threads) and the factory logic in AppendHandler to select between writer implementations

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
PipelinedWriter.java Main SEDA-based writer with validation workers, buffer queue, and single buffer writer thread
ValidatedRequest.java Immutable container transferring validated batches between stages
ProduceWriter.java Interface abstracting Writer and PipelinedWriter
ActiveFile.java New addBatchDirect() and addAwaitingFuture() methods for pipelined support
Writer.java Now implements ProduceWriter, write() made public
AppendHandler.java Factory method createWriter() selecting implementation based on config
InklessConfig.java New produce.pipelined.enabled and produce.pipelined.validation.threads configs
docs/inkless/configs.rst Documentation for new config options
PipelinedWriterTest.java Tests for tick/size rotation, concurrent writes, lifecycle
ValidatedRequestTest.java Tests for ValidatedRequest construction and accessors
ActiveFileTest.java Tests for new pipelined support methods

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

@Mwea Mwea requested a review from Copilot March 5, 2026 16:27
- Use UNKNOWN_OFFSET constant instead of hardcoded -1L
- Use Objects.requireNonNull for brokerTopicStats
- Create own RequestLocal per validation worker to avoid thread-safety issues
- Add exceptionally handler to runAsync to prevent hanging futures
- Extract ValidationResult record to avoid throwaway CompletableFuture
- Fix thread-safety in tickRotate by removing cross-thread activeFile.isEmpty() check
- Implement maybeTickRotate to schedule rotation when queue is empty
- Await validation tasks in close() before shutting down buffer writer
- Clean up stale development comments
- Add tests for producePipelinedEnabled and producePipelinedValidationThreads configs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Mwea and others added 2 commits March 6, 2026 08:39
- Fix double logging for InvalidProducerEpochException (add else clause)
- Fix topic config validation to fail if ANY topic is missing config (use allMatch)
- Add explicit isRotationTrigger flag to ValidatedRequest instead of relying on empty maps
- Replace Math.max with simple assignment + warning for request ID tracking
- Fix typo: 'millisecond' -> 'milliseconds' in configs.rst

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Change "millisecond" to "milliseconds" in the documentation string.
This fixes the CI check for uncommitted documentation changes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants