Dataimport med Akka.net

Vi hadde før sommeren behov for å importere ca 50.000 kunder i forbindelse med produksjonssetting av en ny nettbutikk. Dette var en jobb som ville bli kjørt maks noen få ganger, så det var liten vilje til å investere mye tid og energi i å lage et verktøy.

Samtidig hadde vi en sterk følelse av at behovet for parallellisering var til stede, noe som fort kan komplisere jobben og kreve mer tid enn det som er ønskelig.

Akka.NET, med sin modell for å skrive samtidig kode, gjorde det enkelt å skrive robust kode som også lot seg optimere fra en første importtid på 6 dager ned til siste kjøring som tok noen få timer.

Hvorfor Akka.NET?

Akka.NET har en programmeringsmodell som støtter samtidig kode og har mekanismer for den type parallellisering som vi trenger. Det var naturlig å bruke dette rammeverket i stedet for å bruke ekstra tid og energi på å skrive denne type kode selv.

Litt om hva Akka.NET er, sakset fra getakka.net:

Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET & Mono.

Akka.NET bygger på actor-modellen:

The Actor Model provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management, making it easier to write correct concurrent and parallel systems.

Modellering

En actor kan i grunnen representere hva som helst, men det vil som oftest ikke lønne seg å plassere all kode i en actor. Noen nyttige ting å tenke på når man skal identifisere actors, er:

  • Kommunikasjon med 3. part.
  • Monitorering
  • Orkestrering
  • Logging

For dataimporten vår vet vi at vi skal kommunisere med to eksterne systemer, ERP og nettbutikken. Disse kommer garantert til å feile før eller siden, så de får hver sin actor.

Kommunikasjonen mellom disse to må koordineres, og fremdriften rapporteres så vi vet hvor langt vi er kommet i importen. Dette er en actor til.

Vi vil også logge alle feil slik at vi kan analysere dem. Noen feil kan vi ignorere eller fikse ved å sende en melding på nytt, mens andre krever kodeendringer. Dette blir vår siste actor.

Konsollapplikasjonen vår, sammen med identifiserte actors og kommunikasjonsbehovet kan modelleres i et sekvensdiagram:

import-customers

Implementering

Sekvensdiagrammet over kan omsettes til følgende kode og struktur i Visual Studio:

visual-studio-struktur

Oppstart

Importprogrammet starter opp med å sette opp actor-systemet, initiere støtteklasser for 3. partskommunikasjon, og starte importen av alle kunder:

// Program.cs

private static void Run(ImportSettings settings)
{
    var customerReader = new ErpCustomerReader();
    var customerWriter = new WebshopCustomerWriter();
    var client = new ConsoleReporter();

    using (var system = ActorSystem.Create("customer-import"))
    {
        var importer =
            system.ActorOf(Importer.Create(client,
                                           customerReader,
                                           customerWriter));

        importer.Tell(new ImportCustomers(settings.Customers));

        client.WaitForTermination();

        system.Terminate().Wait();
    }
}

Koordinering

Actor for koordinering settes opp til å håndtere disse meldingene:

 // Importer.cs

protected override void OnReceive(object message)
{
	message.Match()
		   .With<ImportCustomers>(ImportAllCustomers)
		   .With<CustomerFound>(UpdateCustomer)
		   .With<CustomerNotFound>(ReportProgress)
		   .With<ReportUpdate>(ReportProgress)
		   .With<ReportReadFailure>(ReportFailure)
		   .With<ReportWriteFailure>(ReportFailure);
}

Å håndtere den første meldingen, å importere alle kunder, gjøres enkelt slik:

 // Importer.cs

private void ImportAllCustomers(ImportCustomers customers)
{
    remainingCount = customers.TotalCount();

    foreach (var number in customers.CustomerNumbers())
    {
        erp.Tell(new ImportCustomer(number));
    }
}

Etter hvert som en kunde er funnet, sendes den videre for oppdatering:

 // Importer.cs

private void UpdateCustomer(CustomerFound found)
{
    webshop.Tell(new UpdateCustomer(found));
}

Når kunden er oppdatert, rapporteres fremdriften til konsollet:

 // Importer.cs

private void ReportProgress(ReportUpdate report)
{
    failures.Remove(report.CustomerNumber);
    var updateStatus = new UpdateStatus(report,
                                        --remainingCount,
                                        failures.Count);
    client.ReportProgress(updateStatus);
}

Eventuelle feil logges og rapporteres til konsollet og fil:

 // Importer.cs

private void ReportFailure(ReportReadFailure report)
{
    failures.Add(report.CustomerNumber);
    client.ReportFailure(new FailureStatus(report, failures.Count));
    logger.Tell(report);
}

Lese kunde fra ERP

Actor for å lese kunder fra ERP er implementert noe annerledes enn den over, siden vi ønsker å ta hensyn til at en ekstern tjeneste kan feile, sporadisk eller permanent. Vi antar her at alle feil løser seg før eller siden, derfor vil vi vente en kort periode før vi prøver igjen. Dette håndteres med to tilstander, Processing og Paused.

Processing er den normale tilstanden som leser kunden fra ERP og sender ny melding dersom den er funnet. Dersom det har skjedd feil tidligere, kan det finnes meldinger som er køet opp. Disse må sendes om igjen.

Ved feil settes tilstanden over til Paused. En feilbeskrivelse sendes tilbake til avsender, orginal-melding sendes på nytt, og normal operasjon gjenopptas etter 5 sekunder:

 // Erp.cs

protected override void PreStart()
{
    Become(Processing);
}

private void Processing(object message)
{
    message.Match()
           .With<ImportCustomer>(ReadCustomer);
}

private void ReadCustomer(ImportCustomer message)
{
    try
    {
        var stopwatch = Stopwatch.StartNew();
        var customer = erpCustomers.Read(message.CustomerNumber);
        stopwatch.Stop();
        var readTime = stopwatch.Elapsed;

        if (customer == null)
        {
            Sender.Tell(new CustomerNotFound(message.CustomerNumber));
        }
        else
        {
            Sender.Tell(new CustomerFound(customer, readTime));
        }
    }
    catch (Exception e)
    {
        Become(Paused);
        ResumeAfterSeconds(2);

        Stash.Stash();
        Sender.Tell(new ReportReadFailure(message.CustomerNumber, e));
    }
}

I tilstanden paused køes meldinger opp med Stash(). Normal operasjon gjenopptas når det blir bedt om det:

// Erp.cs

private void Paused(object message)
{
    message.Match()
           .With<ImportCustomer>(m => Stash.Stash())
           .With<Resume>(m => Resume());
}

private void Resume()
{
    Stash.UnstashAll();
    Become(Processing);
}

Oppdatere kunde i nettbutikk

Å oppdatere kunde er implementert så å si likt som å lese kunde fra ERP:

 // Webshop.cs

private void Processing(object message)
{
    message.Match()
           .With<UpdateCustomer>(CreateOrUpdateCustomer);
}

private void CreateOrUpdateCustomer(UpdateCustomer message)
{
    var customer = message.Customer;

    if (customer.Email == null) return;

    try
    {
        var stopwatch = Stopwatch.StartNew();
        customers.Write(customer);
        stopwatch.Stop();

        var readTime = message.ReadTime;
        var writeTime = stopwatch.Elapsed;

        Sender.Tell(new ReportUpdate(customer, readTime, writeTime));
    }
    catch (Exception e)
    {
        Become(Paused);
        ResumeAfterSeconds(2);

        Stash.Stash();
        Sender.Tell(new ReportWriteFailure(customer, e));
    }
}

Optimere importtid

Koden som beskrevet over leser alle kunder en og en fra ERP i en operasjon, og oppdaterer alle kunder i nettbutikken, en og en, i en annen parallell operasjon. Med de mengdene som skal importeres, vil det ta ca 4 timer å lese alle kunder og 6 dager å oppdatere dem.

Dette er alt for tregt, og den eneste måten dette kan gjøres raskere på, i dette tilfellet, er å lese og skrive i flere parallelle operasjoner. Men vi kan ikke åpne et ubegrenset antall forbindelser mot hver av tjenestene, det vil være det samme som å sette i gang et ddos-angrep.

Etter noe prøving og feiling fant vi ut at 100 samtidige skriveoperasjoner er godt nok. Da kan vi oppdatere alle kunder i løpet av 90 minutter. Dette forutsetter at vi leser kundene fort nok inn, og 10 samtidige leseoperasjoner i dette tilfellet er mer enn godt nok.

Og dette ordnes i Akka.NET med noen få kodelinjer:

// Importer.cs

public Importer(IReportToClient client,
                IReadCustomers customerReader,
                IWriteCustomers customerWriter)
{
    var erpPool = new RoundRobinPool(10);
    var webshopPool = new RoundRobinPool(100);

    erp = Context.ActorOf(Erp.Create(customerReader)
                             .WithRouter(erpPool));

    webshop = Context.ActorOf(Webshop.Create(customerWriter)
                                     .WithRouter(webshopPool));

    logger = Context.ActorOf(Logger.Create());
    this.client = client;
}

Thomas Eyde
S
enior .NET-utvikler og arkitekt i Ciber

Thomas har jobbet med C# og .NET siden versjon 1.0 og har vært konsulent omtrent like lenge. Er opptatt av god kode og arkitektur, og har vært hekta på Test Driven Development i den større delen av yrkeskarrieren. I den senere tid har han latt seg inspirere av prinsippene bak CQRS.

Legg igjen en kommentar

Fyll inn i feltene under, eller klikk på et ikon for å logge inn:

WordPress.com-logo

Du kommenterer med bruk av din WordPress.com konto. Logg ut / Endre )

Twitter picture

Du kommenterer med bruk av din Twitter konto. Logg ut / Endre )

Facebookbilde

Du kommenterer med bruk av din Facebook konto. Logg ut / Endre )

Google+ photo

Du kommenterer med bruk av din Google+ konto. Logg ut / Endre )

Kobler til %s