Skip to main content

High-Throughput Batch Pipelines

1. What this document is about

This document describes how to design and operate a production-grade batch file processing pipeline in .NET capable of handling high throughput under strict memory and reliability constraints.

It applies to systems that:

  • Process large volume of files concurrently
  • Operate under bounded memory
  • Require predictable throughput, safe replay, and auditable lineage
  • Must tolerate malformed, oversized or adversarial inputs

It does not apply to:

  • Small ad-hoc batch jobs
  • One-off ETL scripts
  • Worloads where files can be fully loaded into memory without risk
  • Systems where throughput, replayability and operational control are non-requirements

2. Why this matters in real systems

This problem emerges naturally when systems evolve beyond simple batch jobs.

Typical pressures include:

  • Growth in file volume and size
  • Skewed distributions (many small files a few very large ones)
  • Horizontal scaling with multiple workers
  • Downstream systems (databae, brokers) that throttle or degrade under load

When this is ignored:

  • Memory usage becomes unbounded
  • Throughput collapses under backpressure
  • Failures require full reprocessing
  • Operators lose visibility into partial progress

Simpler approaches fail because they assume:

  • Files are small
  • Failures are rare
  • Memory is effectively infinite
  • Procesing can be restarted from scratch

None of those assumptions hold in long-lived production systems.


3. Core concept (mental model)

The correct mental model is a bounded, staged flow of work, not a loop over files.

Think in terms of:

  • Ingress: files are claimed and validated
  • Decomposition: files are incrementally broken into work units
  • Processing: work units flow through constrained stages
  • Egress: results are persisted and optionally published

At every boundary:

  • Capacity is finite
  • Progress is observable
  • Failure is isolated
  • Memory usage is predictable

The system behaves more like a factory conveyor belt with pressure valves than a task scheduler.


4. How it works (step-by-step)

Step 1 — File discovery and leasing

Workers discover candidate files from object storage

A lease or lock is acquired per file to ensure:

  • Exactly one worker processes a file at a time
  • Crashes release ownership
  • Horizontal scaling does not cause duplication

Invariant: a file is processed by at most one worker concurrently.


Step 2 — Streaming read with decomposition

Files are opened as streams and read incrementally.

Depending on workload characteristics, decomposition happens by:

  • File (coarse)
  • Chunck (byte or line ranges)
  • Record (individual logical entries)

No stage is allowed to accumlate unbounded data.

Invariant: memory usage grows with configured limit, not input size.


Step 3 — Bounded work queues

Each stage feeds the next through bounded channels or blocks.

Backpressure ensures:

  • Upstream slows when downstream Degrades
  • No stage overwhelms the next
  • The system fails predictably, not catastrophically

Invariant: queue depth is always observable and bounded.


Step 4 — Procesing and persistence

Records or batches are processed and written to the database

Key properties:

  • Writes are idempotent
  • Batching is used to reduce roundtrips
  • Transactions define the consistency boundary

If integration events are required, they are emitted after persistence, optionally via an Outbox Pattern.


Step 5 — Checkpoint and completion

Progress is checkpointed using:

  • Byte offsets
  • Line numbers
  • Chunk identifiers

On failure:

  • Work resumes from the last safe checkpoint
  • Partial progress is preserved
  • Reprocessing cost is bounded

5. Minimal concrete example (.NET)

This section intentionally goes beyond "toy code". The goal is a small but credible skeleton that demonstrates the core mechanics you actually need in production:

  • Per-file leasing (avoid double processing across replicas)
  • Streaming read (no full-file loading)
  • Chunk → record decomposition with explicit boundaries
  • Backpressure via bounded channels
  • Controlled parallelism (separate IO vs CPU vs DB)
  • Checkpoint (offset-based, resumable)
  • Idempotent persistence (dedup key per record)
  • Poison handling (quarantine and stop-the-bleeding)
  • Minimal observability hooks (correlation identifiers)

Scenarios

  • File live in Blob/S3-like storage.
  • Each file is a newline-delimited format (CSV/JSONL/fixed-width also works with a different parser).
  • The pipeline:
    1. claims a file lease
    2. streams it and emits chunks
    3. parses chunks into records
    4. batches records and persists idempotently
    5. checkpoints progress

Key types (data contracts)


public sealed record FileDescriptor(string StorageKey, long Length, string ETag);

public sealed record FileLeaser(FileDescriptor File, string LeaseId);

public readonly record struct Chunk(
string StorageKey,
long StartOffset,
ReadOnlyMemory<byte> Data,
bool IsFinal);

public sealed record RecordEnvelope(
string StorageKey,
long RecordIndex,
string IdempotencyKey, // stable: hash(fileId + record payload)
object Payload);

Pipeline configuration

A "minimal" production pipeline still needs explicit limits. Thease are not tuning knobs for later; they are correctness boundaries.


public sealed record PipelineOptions(
int MaxConcurrentFiles,
int ChunkBytes,
int ChunkQueueCapacity,
int RecordQueueCapacity,
int ParseDegree,
int DbWriters,
int DbBatchSize,
TimeSpan LeaseRenewInterval,
int MaxPoisonRecordsPerFile);

Core orchestration

This uses file-level concurrency plus a bounded multi-stage pipeline per file. That is a common and robust baseline.


public sealed class BatchIngestionService
{
private readonly PipelineOptions _options;
private readonly IFileSource _fileSource; // list files
private readonly ILeaseManager _leaseManager; // acquire/renew/release
private readonly ICheckpointStore _checkpoint; // read/write offsets
private readonly IQuarantineSink _quarantine; // poison handling
private readonly IRecordWriter _writer; // idempotent DB write

public BatchIngestionService(
PipelineOptions options,
IFileSource fileSource,
ILeaseManager leaseManager,
ICheckpointStore checkpoint,
IQuarantineSink quarantine,
IRecordWriter write)
{
_options = options;
_fileSource = fileSource;
_leaseManager = leaseManager;
_checkpoint = checkpoint;
_quarantine = quarantine;
_writer = writer;
}

public async Task RunAsync(CancellationToken cancellationToken)
{
var files = await _fileSource.ListPendingAsync(cancellationToken);

await Parallel.ForEachAsync(
files,
new ParallelOptions
{
MaxDegreeOfParallelism = _options.MaxConcurrentFiles,
CancellationToken = cancellationToken
},
async (file, cancellationToken) =>
{
var lease = await _leaseManager.TryAcquireAsync(file, cancellationToken);

if(lease is null)
return;

try
{
await ProcessFileAsync(lease, cancellationToken);
}
catch (Exception ex)
{
await _quarantine.QuarantineFileAsync(file, reason: "unhandled", details: ex.ToString(), cancellationToken);
}
finally
{
await _leaseManager.ReleaseAsync(lease, cancellationToken);
}
});
}

private async Task ProcessFileAsync(FileLease lease, CancellationToken cancellationToken)
{
// Correlation anchors: batchId/fileId should flow through logs/traces
var file = lease.File;

// Resume from last comitted checkpoint
var startOffset = await _checkpoint.GetOffsetAsync(file.StorageKey, file.ETag, cancellationToken);

// Stage 1: chunks (bounded)
var chunks = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_options.ChunkQueueCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = true, // one reader thread produces chunks
SingleReader = false
});

// Stage 2: records (bounded)
var records = Channel.CreateBounded<RecordEnvelope>(new BoundedChannelOptions(_options.RecordQueueCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false,
SingleReader = false
});

// Lease renew loop (keeps the claim alive while processing).
using var leaseRenewCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var renewTask = RenewLeaseLoopAsync(lease, leaseRenewCancellationToken.Token);

// Start pipeline tasks
var readTask = ReadChunkAsync(file, startOffset, chunks.Writer, cancellationToken);

// Parsing: multiple consumers from chunks.Reader writing to records.Writer
var parseTasks = Enumerable.Range(0, _options.ParseDegree)
.Select(_ => ParseChunkAsync(file, chunks.Reader, records.Writer, cancellationToken))
.ToArray();

// DB wrting: multiple writers, batching records, idempotent persistence
var writeTasks = Enumerable.Range(0, _options.DbWriters)
.Select(_ => WriteBatchesAsync(file, records.Reader, cancellationToken))
.ToArray();

// Await read + parse completion order
await readTask;
await Task.WhenAll(parseTasks);

// Close record channel after parsers finish
records.Writer.TryComplete();

// Await db writers
await Task.WhenAll(writeTasks);

// Stop lease renew
leaseRenewCancellationToken.Cancel();
await renewTask;

// File is considered complete only after persistence + final checkpoint
await _checkpoint.MarkCompleteAsync(file.StorageKey, file.ETag, cancellationToken);
}

private async Task RenewLeaseLoopAsync(FileLease lease, CancellationToken cancellationToken)
{
while(!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(_options.LeaseRenewInterval, cancellationToken);
await _leaseManager.RenewAsync(lease, cancellationToken);
}
catch(OperationCanceledException) { }
catch
{
// If renew fails repeatedly, you should stop processing to avoid duplicate ownership.
// Minimal example: ignore; production code: fail-fast with escalation.
}
}
}
}

Streaming chunk reader

This chunk reader demonstrates the principle: bounded output + offset tracking. It does not accumulate the file.


private async Task ReadChunksAsync(
FileDescriptor file,
long startOffset,
ChannelWriter<Chunk> writer,
CancellationToken cancellationToken)
{
try
{
await using var stream = await _fileSource.OpenReadAsync(file.StorageKey, startOffset, cancellationToken);

var buffer = new byte[_options.ChunkBytes];
long currentOffset = startOffset;

while(true)
{
var read = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken);
if (read == 0) break;

// Copy only what was read. In a tuned system, you'd use ArrayPool/MemoryPool.
var data = buffer.AsMemory(0, read).ToArray(); // keep the example simple and explicit
await writer.WriteAsync(new Chunk(file.StorageKey, currentOffset, data, IsFinal: false), cancellationToken);

currentOffset += read;
}

await writer.WriteAsync(new Chunk(file.StorageKey, currentOffset, ReadOnlyMemory<byte>.Empty, IsFinal: true), cancellationToken);
writer.TryComplete();

}
catch (Exception ex)
{
writer.TryComplete(ex);
throw;
}
}

Why this exists

  • ChunkBytes defines peak memory per in-flight chunk.
  • ChunkQueueCapacity defines the maximum number of chunks resident in memory.
  • Together they define a hard upper bound on memory from this stage.

Chunk → record parser with boundary handling

This is where chunk-level partitioning becomes real: you must handle records split across chunks.

This version uses a rolling "carry" buffer for partial lines. It is intentionally explicit; you can swap it for System.IO.Pipelines + SequenceReader<byte> for higher performance once the behavior is correct.


private async Task ParseChunksAsync(
FileDescriptor file,
ChannelReader<Chunk> chunks,
ChannelWriter<RecordEnvelope> records,
CancellationToken cancellationToken)
{
var carry = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
long recordIndex = 0;
int poisonCount = 0;

await foreach (var chunk in chunks.ReadAllAsync(cancellationToken))
{
if(chunk.IsFinal)
break;

// Concatenate carry + chunk.Data for parsing.
// Production: avoid allocations using pooled buffers or Pipelines.
var combined = Combine(carry, chunk.Data.Span);

int lineStart = 0;

for (int i = 0; i < combined.Length; i++)
{
if (combined[i] == (byte)'\n')
{
var line = combined.AsSpan(lineStart, i - lineStart);
lineStart = i + 1;

// Trim CR if present
if (line.Length > 0 && line[^1] == (byte)'\r') line = line[..^1];

if (line.Length == 0)
continue;

try
{
var payload = ParseRecord(line); //decode + validate schema
var idKey = ComputeIdempotencyKey(file, recordIndex, line);

var envelope = new RecordEnvelope(
file.StorageKey,
recordIndex,
idKey,
payload);

await records.WriteAsync(envelope, cancellationToken);
recordIndex++;
}
catch (Exception ex)
{
poisonCount++;
await _quarantine.QuarantineRecordAsync(
file.StorageKey,
recordIndex,
reason: "parse/validation",
details: ex.Message,
cancellationToken);

if (poisonCount >= _options.MaxPoisonRecordsPerFile)
throw new InvalidOperationException($"Pison threshold exceeded for file {file.StorageKey}.");
}
}
}

// Anything after last newline becomes carry.
carry = SliceToCarry(combined, lineStart);

// Checkpoint strategy: for parsers, checkpoint at *safe* boundaries (e.g. after a batch is committed).
// This example checkpoints in the writer stage after DB commit (preferred).
}

// If the file ends without newline, process remaining carry as final line (optional policy).
if (carry.Count > 0)
{
try
{
var line = carry.AsSpan();
var payload = ParseRecord(line);
var idKey = ComputeIdempotencyKey(file, recordIndex, line);

await records.WriteAsync(new RecordEnvelope(file.StorageKey, recordIndex, idKey, payload), cancellationToken);
}
catch (Exception ex)
{
await _quarantine.QuarantineRecordAsync(file.StorageKey, recordIndex, "parse/validation-final", ex.Message, cancellationToken);
}
}
}

private static byte[] Combine(ArraySegment<byte> carry, ReadOnlySpan<byte> next)
{
var result = new byte[carry.Count + next.Length];

if (carry.Count > 0)
Buffer.BlockCopy(carry.Array!, carry.Offset, result, 0, carry.Count);

next.CopyTo(result.AsSpan(carry.Count));
return result;
}

private static ArraySegment<byte> SliceToCarry(byte[] combined, int start)
{
if (start >= combined.Length) return new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
return new ArraySegment<byte>(combined, start, combined.Length - start);
}

What this demonstrates

  • Chunk boundaries are not record boundaries.
  • Poison behavior is explicit (quarantine + threshold).
  • Records are emitted to a bounded channel so parsing cannot outrun DB writes indefinitely

Idempotent DB writer with baching + checkpointing

This is where most pipelines fail in real life: they can parse fast, but they either overwhelm the DB or cannot resume safely.

Key ideas:

  • Batch records to reduce roundtrips
  • Persist idempotently (unique key on IdempotencyKey)
  • Commit a checkpoint only after the DB commit succeeds

private async Task WriteBatchesAsync(
FileDescriptor file,
ChannelReader<RecordEnvelope> records,
CancellationToken cancellationToken)
{
var batch = new List<RecordEnvelope>(_options.DbBatchSize);
long maxRecordIndexCommitted = -1;

await foreach (var record in records.ReadAllAsync(cancellationToken))
{
batch.Add(record);

if (batch.Count >= _options.DbBatchSize)
{
maxRecordIndexCommitted = await FlushAsync(file, batch, cancellationToken);
batch.Clear();

// Record-index checkpoint (or byte-offset if you track offsets through parsing).
await _checkpoint.SetRecordIndexAsync(file.StorageKey, file.ETag, maxRecordIndexCommitted, cancellationToken);
}
}

if (batch.Count > 0)
{
maxRecordIndexCommitted = await FlushAsync(file, batch, cancellationToken);
await _checkpoint.SetRecordIndexAsync(file.StorageKey, file.ETag, maxRecordIndexCommitted, cancellationToken);
}
}

private async Task<long> FlushAsync(FileDescriptor file, List<RecordEnvelope> batch, CancellationToken cancellationToken)
{
// Retries should be selective (transient only) and bounded.
var policy = Polly.Policy
.Handle<TimeoutException>()
.Or<IOException>()
.WaitAndRetryAsync(3, attempt => TimeSpan.FromMilliseconds(100 * attempt));

await policy.ExecuteAsync(async () =>
{
// Implementation detail: use COPY/Bulk/TVP/upsert pattern depending on DB.
// The essential requirement: idempotency at the DB boundary.
await _writer.WriteIdempotentAsync(batch, cancellationToken);
});

// Return the highest committed record index to support checkpointing.
return batch.Max(x => x.RecordIndex);
}


Idempotency in practice

  • The DB must enforce it, not your code.
  • Typical patterns:
    • Unique constraint on IdempotencyKey
    • Upsert (INSERT ... ON CONFLICT DO NOTHING / MERGE with care)
    • Separate ingestion table keyed by idempotency, then downstream merge

Checkpointing in practice

  • Checkpoint after DB commit, not after parse.
  • Checkpoint granularity is a trade-off: more frequent means less replay, but more checkpoint writes.
  • Checkpoint identity should include a file version marker (ETag) to avoid mixing progress across file revisions.

Mapping back to the concept

This example is intentionally designed to make the key constraints unavoidable:

  • Bounded memory: only bounded channels hold in-flight chunks/records.
  • Backpress: parse slows if DB slows because channels fill.
  • Partitioning:
    • file-level concurrency (across workers and within a worker)
    • chunk-level decomposition (streaming units)
    • record-level processing (logical units for persistence)
  • Reliability:
    • leasing prevents concurrent double-processing
    • idempotency at DB boundary supports at-least-once ingestion
    • checkpointing makes restart bounded-cost, not full replay
  • Safety:
    • poison quarantine is explicit
    • there is a hard poison threshold to stop silent corruption

This is the smallest skeleton that still respects production reality.


6. Design trade-offs

ApproachGainsCosts
File-level parallelismSimplicityPoor utilization with large files
Chunk-level procesingBalanced throughput and memoryBoundary handling complexity
Record-level processingMaximum parallelismHigh overhead, strict backpressure required
ChannelsFine-control, low overheadMore manual wiring
DataflowDeclarive pipelinesHigher abstraction cost

Every choice trade simplicity for control or vice versa.


7. Common mistakes and misconceptions

"Async automatically means scalable" Async without bounds still exchausts memory.

"Parallel.ForEach is enough" It provides concurrency, not flow control.

"We can retry the whole batch" This becomes untenable as batch sizes grow.

"Ordering is always required" Ordering is often a self-imposed constraint with real cost.

"Idempotency in code is enough" If the database doesn't enforce it, you don't have idempotency.


8. Operational and production considerations

Expect stress.

Monitor:

  • Queue depths per stage (chunks/records)
  • Throughput per stage
  • p95/p99 latency per stage
  • Allocation and GC pressure (Gen0 rate, LOH growth, pauses)
  • Retry rate and rety cause distribution
  • Poison counts and quarantine volume
  • Lease renew failures / lease contention

What degrades first:

  • Databse under write pressure
  • Message brokers under fan-out
  • Memory when backpressure is misconfigured
  • CPU when parsing/validation becomes the bottleneck

Graceful shutdown must:

  • Stop ingress (stop acquiring new leases)
  • Drain in-flight work (channels)
  • Flush batches
  • Persist checkpoint
  • Release leases

Anything else ricks duplication, partial apply or losing progress.


9. When NOT to use this

Do not use this approach when:

  • Files are few and small
  • Memory pressure is irrelevant
  • Failures can safely restart everything
  • Operational overhead outweighs benefits
  • You do not need replay, idempotency or observability guarantees

This architecture is intentionally heavy. Use it when the alternative is operational chaos.


9. Key takeaways

  • Theat bounded queues as correctness boundaries, not "performance tuning"
  • Choose partitioning (file/chunk/record) based on skew, bottlenecks and restart cost, not preference.
  • Put idempotency where it belongs: at the persistence boundary, enforced by the DB.
  • Checkpoint after commit not after parsing.
  • Make poison handling explicit (quarantine + thresholds) or you will corrupt data quietly.
  • Backpressure is the difference between slow and dead under downstream throttling.
  • Observability isn't optional: queue depth and stage latency are first-class signals of concrrectness under stress.

10. High-Level Overview

Visual representation of a high-throughput batch pipeline, highlighting streaming, work partitioning, bounded backpressure, idempotent persistence, and checkpointed progress in production-scale systems.

Scroll to zoom • Drag to pan
High-Throughput Batch File Processing Pipeline Streaming, Partitioning, Backpressure and ReliabilityHigh-Throughput Batch File Processing PipelineStreaming, Partitioning, Backpressure and ReliabilityScheduler /Worker ReplicaObject Storage(Blob / S3)File Discovery& EligibilityLease Manager(File Ownership)Streaming Reader(FileStream / Pipelines)Chunk Channel(Bounded)• Chunk Size• Max In-FlightChunk Parser(Boundary Aware)Record Channel(Bounded)• Backpressure• Memory CapBatch Aggregator& ValidatorIdempotent Writer(DB Boundary)Relational Database(Idempotency Enforced)Checkpoint Store(Offsets / Record Index)Outbox (Optional)Integration EventsMessage Broker(Service Bus / Kafka)Poison Handler(Quarantine)Observability(Tracing, Metrics, Logs)list pending filesacquire lease(per-file)ownership grantedstream chunks(no full file load)bounded pull(backpressure)records emittedmalformed / hostilerecordsbatch records(controlled size)transactional writeupsert / insert(idempotent)commit progress(after DB commit)optionalintegration eventasync publishrenew lease(heartbeat)queue depththroughputbackpressuresignalslatency / retriespoison ratequarantine bucket(optional)alerts / diagnosticsKey Concepts[Rectangle] Processing Stage(Queue) Bounded Channel / Backpressure Boundary||Database|| Persistent Storageplantuml-src ZLNRRXit47tdLqnTeB00EmRQljH00aIoWnY9uOk2vg7g0xKxscL5aHkIQrkj2-HRycdzaXvULbeLC53fGLhodE6PcJFSxqBB6wPEIsM4NolJIIStv5jPsqTb6bf9xNcquxWAqZIQwIz1D9NLkd6sD_NCQkle-rNwn1thQdPbzSVq4N-FudpfkNyfnY_n-pzYr3B8fVGlXVavVbyC8u8AoEAzQjhJJos8JTlr0M52rT8xXQrRPolsFbRXLdMibU65cGV7SeEr4xgL3Z6KDUafdjyvu7l79oHDJVUWoANIAco5qDmmrXoE2z3q-ZLzu2tDhAcu2nxFMFpjVYZkxpG-VHvamzW_lej5cr3wJ6EgNDEibSQmfhC31NbRkYrepyyNfL3FptJBpYiVs0I4jIu6XgwiJWXPOOsEvbNBTQ_PqTd2VBPkZN_tt6bLoIEIdlAI47ip7Dqi_y3347DXZgVQBY5g_jCaKOR3nfnKvGlbA_l0RhimFz2bLetA9KkitUQOzf5Zpw-bGJo7arAOcqU3p5hLvUCIvk2mNUEGYQntnBmCeKEBVGwGGIBRz6ZMzcOzb3ecbui_MPX_ldwZl3jNVt9-lfPFT6LEt-dOx1Gj8UuEz6GIFEIJbg5VzFPHEiuQykQW0UQ0aLyKCUv-aS4RYu0psPMC8lzGH7RxswPnt4XqAFRWLwbL7HyIAo76bAkQDvrD1lhiL4ZYBwOxlsNbUKU8tOH2lPPnLgIcmOqWxe9LMxeqA-igv9CxFnsHPoqZEQkAQIFpPhNoZC4vAtcXu3K_JKgr1_YXHVkmj4zqVDDb8IZSbGdSk2ICBX-Gaqy1Cd8e_3MA2glHrDbrIdVExa5LMEZZ-H_aQYtpmHbnSEYjLHx1tyiuP95yr-FkCW5EAqrEWF_C4jh-KAuEK3uvMQKxvfg3KvK_eO-sAJu5LeWoegUdR_OJG-UaCUZKOR3JHGwpUx7VZk0yIUSago-zmgMXERUbOtSQyHEH4H6R1m9WEqmQDJ4HnhLHzY8iEnqmdmUhYiyUCOsbLQzrqa7QodeY2ZRIijL1MvPhhGCMhEN8rHCX2YphINO6nQL_ddYZGfGp0fNgdzD6QhXhWy1dr5eV869X2Z7gpx4Y9Oy4XAJV0e6IofhWhDO8uJ7f4v61aPBT3av0GJpkqEppnxYCvkJTr9mfG7q7TG4wb8b_45hjHceoHezSVqwLtI0zwfnjOZd0agi8HCYyDJaugNWTtIfcN-0jjVSwHw-B0afP9-T6A_YjgL3z9SpJYb7_1wyuDlouSqhBU0akMSRZIpjVlNeJNGbedfyQBxXsOSBk7RihznwvFBYyl6gWs0_vx66uGj9LSGOLc0JukVHw3-bonv4dIb_Q7-MNzqzIDGmWBVjgpR43GvKcOqeEA36swPwfbMnCj4xb_mK0?>High-Throughput Batch File Processing Pipeline Streaming, Partitioning, Backpressure and ReliabilityHigh-Throughput Batch File Processing PipelineStreaming, Partitioning, Backpressure and ReliabilityScheduler /Worker ReplicaObject Storage(Blob / S3)File Discovery& EligibilityLease Manager(File Ownership)Streaming Reader(FileStream / Pipelines)Chunk Channel(Bounded)• Chunk Size• Max In-FlightChunk Parser(Boundary Aware)Record Channel(Bounded)• Backpressure• Memory CapBatch Aggregator& ValidatorIdempotent Writer(DB Boundary)Relational Database(Idempotency Enforced)Checkpoint Store(Offsets / Record Index)Outbox (Optional)Integration EventsMessage Broker(Service Bus / Kafka)Poison Handler(Quarantine)Observability(Tracing, Metrics, Logs)list pending filesacquire lease(per-file)ownership grantedstream chunks(no full file load)bounded pull(backpressure)records emittedmalformed / hostilerecordsbatch records(controlled size)transactional writeupsert / insert(idempotent)commit progress(after DB commit)optionalintegration eventasync publishrenew lease(heartbeat)queue depththroughputbackpressuresignalslatency / retriespoison ratequarantine bucket(optional)alerts / diagnosticsKey Concepts[Rectangle] Processing Stage(Queue) Bounded Channel / Backpressure Boundary||Database|| Persistent Storageplantuml-src ZLNRRXit47tdLqnTeB00EmRQljH00aIoWnY9uOk2vg7g0xKxscL5aHkIQrkj2-HRycdzaXvULbeLC53fGLhodE6PcJFSxqBB6wPEIsM4NolJIIStv5jPsqTb6bf9xNcquxWAqZIQwIz1D9NLkd6sD_NCQkle-rNwn1thQdPbzSVq4N-FudpfkNyfnY_n-pzYr3B8fVGlXVavVbyC8u8AoEAzQjhJJos8JTlr0M52rT8xXQrRPolsFbRXLdMibU65cGV7SeEr4xgL3Z6KDUafdjyvu7l79oHDJVUWoANIAco5qDmmrXoE2z3q-ZLzu2tDhAcu2nxFMFpjVYZkxpG-VHvamzW_lej5cr3wJ6EgNDEibSQmfhC31NbRkYrepyyNfL3FptJBpYiVs0I4jIu6XgwiJWXPOOsEvbNBTQ_PqTd2VBPkZN_tt6bLoIEIdlAI47ip7Dqi_y3347DXZgVQBY5g_jCaKOR3nfnKvGlbA_l0RhimFz2bLetA9KkitUQOzf5Zpw-bGJo7arAOcqU3p5hLvUCIvk2mNUEGYQntnBmCeKEBVGwGGIBRz6ZMzcOzb3ecbui_MPX_ldwZl3jNVt9-lfPFT6LEt-dOx1Gj8UuEz6GIFEIJbg5VzFPHEiuQykQW0UQ0aLyKCUv-aS4RYu0psPMC8lzGH7RxswPnt4XqAFRWLwbL7HyIAo76bAkQDvrD1lhiL4ZYBwOxlsNbUKU8tOH2lPPnLgIcmOqWxe9LMxeqA-igv9CxFnsHPoqZEQkAQIFpPhNoZC4vAtcXu3K_JKgr1_YXHVkmj4zqVDDb8IZSbGdSk2ICBX-Gaqy1Cd8e_3MA2glHrDbrIdVExa5LMEZZ-H_aQYtpmHbnSEYjLHx1tyiuP95yr-FkCW5EAqrEWF_C4jh-KAuEK3uvMQKxvfg3KvK_eO-sAJu5LeWoegUdR_OJG-UaCUZKOR3JHGwpUx7VZk0yIUSago-zmgMXERUbOtSQyHEH4H6R1m9WEqmQDJ4HnhLHzY8iEnqmdmUhYiyUCOsbLQzrqa7QodeY2ZRIijL1MvPhhGCMhEN8rHCX2YphINO6nQL_ddYZGfGp0fNgdzD6QhXhWy1dr5eV869X2Z7gpx4Y9Oy4XAJV0e6IofhWhDO8uJ7f4v61aPBT3av0GJpkqEppnxYCvkJTr9mfG7q7TG4wb8b_45hjHceoHezSVqwLtI0zwfnjOZd0agi8HCYyDJaugNWTtIfcN-0jjVSwHw-B0afP9-T6A_YjgL3z9SpJYb7_1wyuDlouSqhBU0akMSRZIpjVlNeJNGbedfyQBxXsOSBk7RihznwvFBYyl6gWs0_vx66uGj9LSGOLc0JukVHw3-bonv4dIb_Q7-MNzqzIDGmWBVjgpR43GvKcOqeEA36swPwfbMnCj4xb_mK0?>