// Example: Orchestrated Bulk Workflow with Event Dispatch
// Demonstrates a full pipeline: schema sync, orchestrator execution, and observer dispatch
using Core.Data.EF.Model.Model;
using Core.Data.EF.Context.Context;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.EntityFrameworkCore;
using DataArc;
using DataArc.ObservR.Abstractions;
using DataArc.OrchestratR;
using DataArc.OrchestratR.Abstractions;
using DataArc.Abstractions;
using DataArc.ObservR;
using DataArc.Abstractions.Pipelines;
using DataArc.OrchestratR.Pipelines;
internal class Program
{
static readonly IDatabaseDefinitionBuilder _databaseDefinitionService;
static readonly IOrchestratorHandler _orchestratorHandler;
static readonly IObservableEventHandler _observableEventHandler;
static readonly IDatabasePipelineFactory _databasePipelineFactory;
static List<ForeignEntityExample> foreignEntityExampleList = new();
private static int batchSize = 100000;
public class BulkOperationResponse : IOrchestratorOutput
{
public List<object> Events = new();
private readonly SqlResult _sqlResult;
public BulkOperationResponse(SqlResult result) => _sqlResult = result;
public SqlResult SqlResult => _sqlResult;
public BulkOperationResponse() { }
}
public record BulkOperationRequest(List<ForeignEntityExample> foreignEntityExampleList, int batchSize) : IOrchestratorInput;
public class BulkOperationCompletedDomainEvent
{
public IReadOnlyCollection<BulkContextResult> ContextResults { get; }
public BulkOperationCompletedDomainEvent(SqlResult sqlResult)
{
ContextResults = sqlResult.ContextResults
.Select(r => new BulkContextResult(
r.ContextName,
r.TotalOperations,
r.TotalEntitiesAffected,
r.ExecutionTime,
r.IsTransactional))
.ToList();
}
}
public class BulkOperationCompletedObserver : IEventObserver<BulkOperationCompletedDomainEvent>
{
public Task HandleAsync(BulkOperationCompletedDomainEvent evt)
{
Console.WriteLine("Bulk operation event fired!");
return Task.CompletedTask;
}
}
public record BulkContextResult(
string ContextName,
int TotalOperations,
int TotalEntitiesAffected,
TimeSpan ExecutionTime,
bool IsTransactional);
public class BulkOperationCommandOrchestror(IDatabasePipelineFactory databasePipelineFactory)
: PipelineOrchestrator<BulkOperationRequest, BulkOperationResponse>(databasePipelineFactory)
{
public override async Task<BulkOperationResponse> ExecuteAsync(BulkOperationRequest input, BulkOperationResponse output)
{
var commandPipeline = await _databasePipelineFactory.CreateDatabasePipelineAsync();
var sqlCommandPipeline = await commandPipeline.UseSqlPipelineAsync();
var pipelineCommands = await sqlCommandPipeline
.UseCommandContext<DatabaseContextReadCommitted>()
.Add(new PrimaryEntityExample() { Name = "Name", Description = "Description" })
.AddBulk(input.foreignEntityExampleList, input.batchSize)
.UseCommandContext<DatabaseContextReadUncommitted>()
.Add(new PrimaryEntityExample() { Name = "Name", Description = "Description" })
.AddBulk(input.foreignEntityExampleList, input.batchSize)
.UseCommandContext<DatabaseContextSerializable>()
.Add(new PrimaryEntityExample() { Name = "Name", Description = "Description" })
.AddBulk(input.foreignEntityExampleList, input.batchSize)
.UseCommandContext<DatabaseContextRepeatableRead>()
.Add(new PrimaryEntityExample() { Name = "Name", Description = "Description" })
.AddBulk(input.foreignEntityExampleList, input.batchSize)
.BuildAsync();
var result = await pipelineCommands.ExecuteParallelAsync();
if (result.Success) {
output = new BulkOperationResponse(result);
output.Events.Add(new BulkOperationCompletedDomainEvent(result));
}
return output;
}
}
static Program()
{
ILoggerFactory factory = LoggerFactory.Create(builder => builder.AddConsole());
var dataProvider = new ServiceCollection()
.AddLogging()
.AddDataArcCore(ctx =>
{
ctx.AddDbContextPool<DatabaseContextReadCommitted>(...);
ctx.AddDbContextPool<DatabaseContextReadUncommitted>(...);
ctx.AddDbContextPool<DatabaseContextRepeatableRead>(...);
ctx.AddDbContextPool<DatabaseContextSerializable>(...);
ctx.AddDbContextPool<DatabaseContextSnapshot>(...);
})
.AddDataArcOrchestration(orc =>
{
orc.AddOrchestrator<NotificationsOrchestrator>();
})
.AddDataArcPipelineOrchestration(orc =>
{
orc.AddOrchestrator<BulkOperationCommandOrchestror>();
})
.AddDataArcObserver(obv =>
{
obv.AddObserver<BulkOperationCompletedObserver>();
})
.BuildServiceProvider();
_databaseDefinitionService = dataProvider.GetRequiredService<IDatabaseDefinitionBuilder>();
_orchestratorHandler = dataProvider.GetRequiredService<IOrchestratorHandler>();
_observableEventHandler = dataProvider.GetRequiredService<IObservableEventHandler>();
_databasePipelineFactory = dataProvider.GetRequiredService<IDatabasePipelineFactory>();
}
static async Task ExecuteMain()
{
var db = _databaseDefinitionService
.UseContext<DatabaseContextReadCommitted>()
.UseContext<DatabaseContextReadUncommitted>()
.UseContext<DatabaseContextSerializable>()
.UseContext<DatabaseContextRepeatableRead>()
.Build(applyChanges: true, generateScripts: true);
db.ExecuteDrop();
db.ExecuteCreate();
foreignEntityExampleList = SeedDataGenerator.GenerateForeignEntityExamples(1000);
var commandPipelineResult = await _orchestratorHandler
.OrchestrateAsync<BulkOperationCommandOrchestror, BulkOperationResponse>(
new BulkOperationRequest(foreignEntityExampleList, batchSize),
new BulkOperationResponse());
await _observableEventHandler.DispatchAsync(commandPipelineResult.Events);
}
private static void Main(string[] args)
{
ExecuteMain().GetAwaiter().GetResult();
}
}