Hopp til hovedinnhold

Du har trent maskinlæringsmodellen din, nå ønsker du å få denne opp i skyen, slik at du kan få kontinuerlige prediksjoner på ny data i sanntid. I dette eksempelet tar vi utgangspunkt i et reellt case hvor vi skal gjøre prediksjoner på kategorier av kundeservicehenvendelser.

Kontinuerlig prediksjoner 🏗️

Koden som brukes som eksempel kan sees i mer detaljer i dette Github repoet.

Det er vel og bra å ha fått gode testresultater på klassifisering av henvendelser, men for å se hvordan modellen faktisk fungerer i den virkelige verden, ønsker vi å klassifisere helt ny og usett data. Dataen henter vi kontinuerlig fra et eksternt API. Typisk vil data som vi henter ikke være klar for analyse direkte og må derfor gjennom en form for preprosessering. Dette kan innebære å vaske dataen for ekstremverdier eller nullverdier, eller å lage features som samsvarer med de modellen er trent på. For tekstlig data kan det innebære å fjerne stoppord, punktum eller komma. Når dataen er tilstrekkelig prosessert, kan den sendes til modellen for prediksjon.

Dersom vi har distribuert treningen vår, må vi sørge for at modellen vi benytter til enhver tid er den som er best egnet for å gjøre prediksjonene. Prediksjonene ønsker vi å lagre. Når vi etter hvert får inn fasit for prediksjonene vi har gjort, kan vi sammenligne disse med faktiske verdier, slik at vi kan monitorere og evaluere modellen vår i sanntid. Denne prosessen kan automatiseres, slik at vi kontinuerlig får inn prediksjoner i sanntid og ny data som vi kan bruke til trening på et senere tidspunkt. 📈

ETL (Extract Transform Load)

Vi definerer en ETL-funksjon som henter ut data fra et API og laster opp rådataen til tabellen vår i BigQuery. I noen tilfeller er det nødvendig å gjøre enkel preprosessering av dataen før den lastes opp, slik som i dette tilfellet, hvor dataen anonymiseres.

Extract transform load pipeline fra API til database
ETL funksjon

Å laste opp data til BigQuery programmatisk er relativt enkelt. Vi trenger ID-ene til prosjektet vårt, datasettet tabellen skal lagres i, og selve tabellen hvor dataen skal lagres. Vi laster opp dataen ved å omgjøre den til en Pandas DataFrame, som kan dumpes i BigQuery-tabellen. Dersom tabellen ikke allerede eksisterer, vil den automatisk detektere skjemaet basert på kolonnene i DataFrame-en.

import logging
import pandas as pd
from google.cloud import bigquery, storage

logging.basicConfig(level=logging.INFO)

def fetch_data_from_api()
		#TODO Denne vil avhenge av din datakilde
		return 

def save_to_bq(df, project_id, dataset_id, table_id):
    try:
        client = bigquery.Client(project=project_id)
        table_ref = client.dataset(dataset_id).table(table_id)

        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            autodetect=True  
        )

        # Last opp data til BigQuery 
        job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
        job.result() 

        logging.info(f"Data successfully loaded into BigQuery table: {table_id}")
    except Exception as e:
        logging.error(f"Error saving data to BigQuery: {str(e)}")
        raise
        
def run_etl():
    try:
        data = fetch_data_from_api()
        df = pd.DataFrame(data)
        save_to_bq(df, PROJECT_ID, DATASET_ID, TABLE_ID)
        logging.info("ETL process completed successfully.")
    except Exception as e:
        logging.error(f"ETL process failed: {str(e)}")
        raise

Preprosessering

Det neste steget i pipelinen vår blir å preprosessere dataen slik at den er i riktig format før vi sender den til maskinlæringsmodellen. Vi henter ut nødvendig data fra BigQuery-tabellen vi lagret i ETL-steget.

import logging
import pandas as pd
from google.cloud import bigquery, storage

def load_data_from_bq(project_id, dataset_id, table_id, columns):
    full_table_id = f"{project_id}.{dataset_id}.{table_id}"
    query = f"SELECT `{column}` FROM `{full_table_id}`"
    
    try:
        df = client.query(query).to_dataframe()
        return df
    except Exception as e:
        logging.error(f"Error loading data from BigQuery: {str(e)}")
        raise

I preprosesseringen klargjør vi dataen for analyse. Hvis vi bruker kundeservicehenvendelser som eksempel, kan dette være typiske NLP-teknikker, som å fjerne tegn som punktum eller komma, og gjøre resten til små bokstaver.

Preprossessering av tekst. Skriften blir til liten tekst, og enkelte ord fjernes.
Preprosessering funksjon

Prediksjoner

Vi gjør prediksjoner på dataen som vi har klargjort, og lagrer denne i en egen tabell, slik at vi kan monitorere resultatene våre over tid.

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
model_version, _ = get_best_model_version(MODEL_REGISTRY_NAME, "status", "best")
model = mlflow.keras.load_model(model_version.source)
predictions = model.predict(data)


Under modelltreningen tagger vi den beste modellen slik at den dynamisk kan hentes ut og brukes til prediksjoner i pipelinen. Hvordan du gjør dette vil avhenge av rammeverket du foretrekker. For eksempel kan du bruke skybaserte tjenester som Google Vertex AI, eller open-source-rammeverk som MLflow, som er det vi benytter her. Dersom du ønsker å komme i gang med MLflow for versjonering og evaluering av maskinlæringsmodellen din, kan du ta en titt på dette innlegget: Open Source MLOps – Fra lokalt til skyen.

Siden vi samtidig henter ut modellversjonen, kan vi lagre denne sammen med prediksjonene vi gjør, samt tidspunktet for prediksjonen. På denne måten har vi full kontroll over hva som predikeres og hvordan det utvikler seg over tid.

Automatisert i en Workflow

Disse tre tjenestene kan vi deploye som Cloud Functions til skyen. Cloud Functions kan trigges basert på enkelthendelser eller som en del av en sekvensiell pipeline, noe som passer godt til vårt formål! De tre stegene legges inn i en workflow, hvor oppstarten av en prosess avhenger av at den foregående fullføres. Slik vet vi at dataen som vi oppretter i ett steg av prosessen, er klar før vi begynner på neste steg.

resource "google_workflows_workflow" "etl_process_predict_pipeline" {
  name     = "etl_process_predict_pipeline"

  description = "Prediction pipeline with ETL, Pre-process, and Predict steps"

  source_contents = <<-EOT
    main:
      steps:
        - run_etl:
            call: http.post
            args:
              url: https://${var.region}-${var.project_id}.cloudfunctions.net/${google_cloudfunctions2_function.etl.name}
              auth:
                type: OIDC
        - run_pre_process:
            call: http.post
            args:
              url: https://${var.region}-${var.project_id}.cloudfunctions.net/${google_cloudfunctions2_function.pre_process.name}
              auth:
                type: OIDC
        - run_predict:
            call: http.post
            args:
              url: https://${var.region}-${var.project_id}.cloudfunctions.net/${google_cloudfunctions2_function.predict.name}
              auth:
                type: OIDC
        - return_value:
            return: "Prediction Workflow Completed"
    EOT
}

Siden vi ønsker å automatisere denne prosessen, kan vi sette opp en Cloud Scheduler som angir når og hvor ofte vi skal kjøre workflowen vår (Cron-job).

resource "google_cloud_scheduler_job" "etl_process_predict_job" {
  name   = "etl_process_predict_job"
  region = var.region

  schedule  = "0 0 * * *" # Hver natt ved midnatt
  time_zone = "Etc/UTC"

  http_target {
    http_method = "POST"
    uri         = "https://workflowexecutions.googleapis.com/v1/projects/${var.project_id}/locations/${var.region}/workflows/${google_workflows_workflow.etl_process_predict_pipeline.name}/executions"

    oauth_token {
      service_account_email = var.default_service_account_email
    }
  }
}

Nå som vi har satt opp kjøringen, kan pipelinen jobbe av seg selv, uten at vi trenger å starte noen prosesser manuelt. Når du trener nye, forbedrede modeller, vil disse automatisk bli tatt i bruk. Ettersom dataen lagres i hvert steg, kan denne senere brukes som treningsdata etter hvert som fasiten til prediksjonene blir tilgjengelig.

Med andre ord har du nå en egen MLOps pipeline for å gjøre automatiske prediksjoner i skyen med din maskinlæringsmodellen 📈.

Pipeline hvor data går fra rådata, prosessert til prediksjon.

Liker du innlegget?

Del gjerne med kollegaer og venner