diff --git a/.gitmodules b/.gitmodules index 1af7ae06f5cf62803e94a26a152a60588c3c4689..2fcb9b5822dc56a3bce5f7dd21addd115da16b90 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,7 @@ -[submodule "simulation_server/simulation/raps"] - path = simulation_server/simulation/raps +[submodule "raps"] + path = raps url = https://github.com/ExaDigiT/RAPS.git branch = main +[submodule "simulation_dashboard"] + path = simulation_dashboard + url = https://github.com/ExaDigiT/SimulationDashboard.git diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..4ab22bef9a7abc6a82f5b24e7bf68a70f0a5c075 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.12.11 + +RUN apt-get update \ + && apt-get install git libsnappy-dev \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache-dir --upgrade pip +RUN pip install --no-cache-dir uv +ENV UV_NO_CACHE=true + +WORKDIR /app + +# Install RAPS dependencies as first layer for caching +COPY raps/pyproject.toml /app/raps/ +RUN uv pip install --system -r /app/raps/pyproject.toml + +# Install server dependencies (including raps) for caching +COPY raps/ /app/raps/ +COPY pyproject.toml /app/ +# pip install expects README to exist +RUN touch /app/README.md +RUN uv pip install --system -r /app/pyproject.toml + +# Install simulation server +COPY druid_ingests/ /app/druid_ingests/ +COPY simulation_server/ /app/simulation_server/ +RUN uv pip install --system -e . +# Re-install RAPS as editable (TODO: RAPS currently doesn't work in non-editable mode) +RUN uv pip install --system -e ./raps + +CMD ["python", "-m", "simulation_server.server.main"] diff --git a/Dockerfile.server b/Dockerfile.server deleted file mode 100644 index 55e5d2cc3f2246f6a939076068b9262d51521ccc..0000000000000000000000000000000000000000 --- a/Dockerfile.server +++ /dev/null @@ -1,25 +0,0 @@ -FROM python:3.9 - -RUN apt-get update \ - && apt-get install -y libsnappy-dev \ - && rm -rf /var/lib/apt/lists/* - -RUN pip install --upgrade pip -RUN pip install hatch - -WORKDIR /app - -COPY pyproject.toml /app - -RUN hatch dep show requirements > /app/requirements.txt -# RUN hatch dep show requirements --feature=server >> /app/requirements.txt -RUN python3 -m pip install -r /app/requirements.txt -ENV RAPS_CONFIG=/app/simulation_server/simulation/raps/config - -COPY ["druid_ingests", "/app/druid_ingests/"] -COPY ["models", "/app/models"] -COPY ["simulation_server", "/app/simulation_server/"] -COPY ["README.md", "/app"] -RUN python3 -m pip install -e . - -CMD ["python3", "-m", "simulation_server.server.main"] diff --git a/Dockerfile.simulation b/Dockerfile.simulation deleted file mode 100644 index 0160fe66f9451779257ea2979fffa2f8f59cc1cd..0000000000000000000000000000000000000000 --- a/Dockerfile.simulation +++ /dev/null @@ -1,24 +0,0 @@ -FROM ubuntu:22.04 - -RUN apt-get update \ - && apt-get install -y python3 python3-pip git libsnappy-dev \ - && rm -rf /var/lib/apt/lists/* - -RUN pip install --upgrade pip -RUN pip install hatch - -WORKDIR /app - -COPY pyproject.toml /app - -RUN hatch dep show requirements > /app/requirements.txt -# RUN hatch dep show requirements --feature=simulation >> /app/requirements.txt -RUN python3 -m pip install -r /app/requirements.txt -ENV RAPS_CONFIG=/app/simulation_server/simulation/raps/config - -COPY ["simulation_server", "/app/simulation_server/"] -COPY ["models", "/app/models"] -COPY ["README.md", "/app"] -RUN python3 -m pip install -e . - -CMD ["python3", "-m", "simulation_server.simulation.main"] diff --git a/README.md b/README.md index 1a4c80959af29b42a7825aad211254d9cf9777e3..1a8e6287fc7eafcc73f7e474b2e33785783d6d10 100644 --- a/README.md +++ b/README.md @@ -2,41 +2,62 @@ REST API that allows running and querying the results from the ExaDigit simulation and RAPS. -## Loading RAPS submodule +## Loading RAPS and Dashboard submodules This uses [RAPS](https://github.com/ExaDigiT/RAPS) to run the simulation, which is loaded as a -submodule. Make sure to run +submodule. The [Simulation Dashboard](https://github.com/ExaDigiT/SimulationDashboard) is also in a +separate repo and loaded as a submodule. Make to load the submodules by running: ``` git submodule update --init --recursive ``` -to load the submodule. ## Downloading FMU models +The Frontier FMU models aren't currently publicly available. To run Frontier simulations with cooling enabled, use this +command to download them (if you have access to the fmu-models repo). +``` +cd ./raps +make fetch-fmu-models +``` + You can run the job and power simulation without downloading any FMU models. But to use the cooling simulation you'll need to download FMU models into the `models` directory. You can download `Simulator_olcf5_base.fmu` from https://code.ornl.gov/exadigit/fmu-models if you have access. (The FMU models aren't currently publicly available.) +## Running locally +To run a local version of the server run +```bash +docker compose up --wait +``` +The API server will be hosted on http://localhost:8081. The dashboard will be hosted on http://localhost:8080. + +You'll need at least 32 GiB of RAM for druid and RAPS to run smoothly. -## Deploying -To deploy the server, run +If you want to run replay data locally, you'll need to download the datasets and then ingest them in +Druid. You can fetch the datasets with `./scripts/fetch_data.sh`, and use the `./scripts/submit_data_ingests.py` +script to ingest them into druid. + +View the server logs with: ```bash -./scripts/deploy.sh prod +docker compose logs -f --no-log-prefix simulation-server ``` -This will build both the server and simulation docker images, and push them to Slate. +To shut down the server run: +```bash +docker compose down +``` -## Running locally -To run a local version of the server run +Use this if you want to wipe all the database data as well: ```bash -./scripts/launch_local.sh +docker compose down --volumes ``` -The server will be hosted on http://localhost:8080 -You'll need at least 16 GiB of RAM, preferably 32 GiB for druid to run smoothly. +## Deploying +To deploy the server, run +```bash +./scripts/deploy.sh prod +``` -If you want to run replay data locally, you'll need to download the datasets (see ./scripts/fetch.sh) -and then ingest them in Druid. After launching, you can access the Druid UI at http://localhost:8888 -and submit druid ingests for the system you want. +This will build both the server and simulation docker images, and push them to Slate. ## API Docs You can view the API docs and the `openapi.json` with the API specification at diff --git a/deployment.yaml b/deployment.yaml index 2c3e37ea257557ee68dbbca197a2983991c056f9..1d9cf47209f5911ece42eeebcd37d74f9ad10335 100644 --- a/deployment.yaml +++ b/deployment.yaml @@ -41,6 +41,8 @@ objects: env: - name: EXADIGIT_ENV value: ${ENV} + - name: EXADIGIT_HTTP_PORT + value: "8080" - name: EXADIGIT_ROOT_PATH value: "/exadigit/api" - name: EXADIGIT_DEBUG_MODE diff --git a/docker-compose.yml b/docker-compose.yml index 706c5166915722d673f868458f85264265e53451..3a9dcd4012d9741e61b6ce589aa8057db70083a8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,23 +2,28 @@ # Docker UI will be available from http://localhost:8888 volumes: - metadata_data: {} - middle_var: {} - historical_var: {} - broker_var: {} - coordinator_var: {} - router_var: {} + postgres_data: {} + zookeeper_data: {} + zookeeper_datalog: {} + zookeeper_logs: {} + druid_middlemanager_var: {} + druid_historical_var: {} + druid_broker_var: {} + druid_coordinator_var: {} + druid_router_var: {} druid_shared: {} - + kafka_secrets: {} + kafka_config: {} + kafka_data: {} services: postgres: container_name: postgres - image: postgres:latest + image: postgres:17.6-trixie # ports: # - "5432:5432" volumes: - - metadata_data:/var/lib/postgresql/data + - postgres_data:/var/lib/postgresql/data environment: - POSTGRES_PASSWORD=FoolishPassword - POSTGRES_USER=druid @@ -31,13 +36,17 @@ services: # - "2181:2181" environment: - ZOO_MY_ID=1 + volumes: + - zookeeper_data:/data + - zookeeper_datalog:/datalog + - zookeeper_logs:/logs druid-coordinator: - image: apache/druid:30.0.1 + image: apache/druid:34.0.0 container_name: druid-coordinator volumes: - druid_shared:/opt/shared - - coordinator_var:/opt/druid/var + - druid_coordinator_var:/opt/druid/var - ./data:/data depends_on: - zookeeper @@ -51,13 +60,15 @@ services: healthcheck: test: ["CMD-SHELL", "wget -q -O - http://localhost:8081/status/health || exit 1"] interval: 10s - retries: 10 + retries: 3 + start_interval: 1s + start_period: 5m druid-broker: - image: apache/druid:30.0.1 + image: apache/druid:34.0.0 container_name: druid-broker volumes: - - broker_var:/opt/druid/var + - druid_broker_var:/opt/druid/var - ./data:/data depends_on: - zookeeper @@ -72,14 +83,16 @@ services: healthcheck: test: ["CMD-SHELL", "wget -q -O - http://localhost:8082/druid/broker/v1/readiness || exit 1"] interval: 10s - retries: 10 + retries: 3 + start_interval: 1s + start_period: 5m druid-historical: - image: apache/druid:30.0.1 + image: apache/druid:34.0.0 container_name: druid-historical volumes: - druid_shared:/opt/shared - - historical_var:/opt/druid/var + - druid_historical_var:/opt/druid/var - ./data:/data depends_on: - zookeeper @@ -94,14 +107,16 @@ services: healthcheck: test: ["CMD-SHELL", "wget -q -O - http://localhost:8083/druid/historical/v1/readiness || exit 1"] interval: 10s - retries: 10 + retries: 3 + start_interval: 1s + start_period: 5m druid-middlemanager: - image: apache/druid:30.0.1 + image: apache/druid:34.0.0 container_name: druid-middlemanager volumes: - druid_shared:/opt/shared - - middle_var:/opt/druid/var + - druid_middlemanager_var:/opt/druid/var - ./data:/data depends_on: - zookeeper @@ -116,10 +131,10 @@ services: - druid-environment.txt druid-router: - image: apache/druid:30.0.1 + image: apache/druid:34.0.0 container_name: druid-router volumes: - - router_var:/opt/druid/var + - druid_router_var:/opt/druid/var # - ./data:/data depends_on: - zookeeper @@ -134,7 +149,9 @@ services: healthcheck: test: ["CMD-SHELL", "wget -q -O - http://localhost:8888/status/health || exit 1"] interval: 10s - retries: 10 + retries: 3 + start_interval: 1s + start_period: 5m kafka: image: apache/kafka:3.7.1 @@ -143,9 +160,14 @@ services: # - 9092:9092 healthcheck: test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list || exit 1"] - interval: 15s - retries: 20 - timeout: 3s + interval: 10s + retries: 3 + start_interval: 1s + start_period: 5m + volumes: + - kafka_secrets:/etc/kafka/secrets + - kafka_config:/mnt/shared/config + - kafka_data:/var/lib/kafka/data environment: # Overriding any configs wipes the defaults, so most of this is copied from /opt/kafka/config/server.properties - KAFKA_PROCESS_ROLES=broker,controller @@ -162,7 +184,7 @@ services: - KAFKA_SOCKET_SEND_BUFFER_BYTES=102400 - KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400 - KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600 - - KAFKA_LOG_DIRS=/tmp/kraft-combined-logs + - KAFKA_LOG_DIRS=/var/lib/kafka/data - KAFKA_NUM_PARTITIONS=1 - KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 @@ -173,10 +195,13 @@ services: - KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000 simulation-server: + pull_policy: build image: exadigit-simulation-server + build: . container_name: simulation-server + command: ["python", "-m", "simulation_server.server.main"] ports: - - "8080:8080" + - "8081:8081" depends_on: druid-coordinator: condition: service_healthy @@ -191,9 +216,40 @@ services: kafka: condition: service_healthy environment: + - EXADIGIT_HTTP_PORT=8081 - EXADIGIT_ENV=dev # - EXADIGIT_ROOT_PATH - EXADIGIT_DEBUG_MODE=true # - EXADIGIT_JOB_IMAGE + - EXADIGIT_ALLOW_ORIGINS=["http://localhost:8080"] - DRUID_SERVICE_URL=http://druid-router:8888 - KAFKA_BOOTSTRAP=kafka:9092 + healthcheck: + test: ["CMD-SHELL", "wget -q -O - http://localhost:8081/openapi.json || exit 1"] + interval: 10s + retries: 3 + start_interval: 1s + start_period: 1m + + simulation-dashboard: + pull_policy: build + image: exadigit-simulation-dashboard + build: + context: ./simulation_dashboard + args: + VITE_PORT: "8080" + VITE_AUTH_URL: "" + VITE_BASE_PATH: "http://localhost:8080" + VITE_API_PATH: "http://localhost:8081" + container_name: simulation-dashboard + ports: + - "8080:8080" + depends_on: + simulation-server: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "curl --fail -s http://localhost:8080/index.html || exit 1"] + interval: 10s + retries: 3 + start_interval: 1s + start_period: 10s diff --git a/druid_ingests/data-fugaku.json b/druid_ingests/data-fugaku.json index f287e474545c847f96340ba5f8c4fd28a9a4a2df..8c3b30d16e944b3e8f269963d841431c215b7e88 100644 --- a/druid_ingests/data-fugaku.json +++ b/druid_ingests/data-fugaku.json @@ -25,7 +25,8 @@ "type": "dynamic" }, "maxNumConcurrentSubTasks": 2, - "maxRowsInMemory": 100000 + "maxRowsInMemory": 100000, + "awaitSegmentAvailabilityTimeoutMillis": 1800000 }, "dataSchema": { "dataSource": "svc-ts-exadigit-data-fugaku", @@ -39,36 +40,6 @@ "name": "__time", "type": "expression", "expression": "timestamp_parse(sdt)" - }, - { - "name": "adt", - "type": "expression", - "expression": "timestamp_format(timestamp_parse(adt))" - }, - { - "name": "qdt", - "type": "expression", - "expression": "timestamp_format(timestamp_parse(qdt))" - }, - { - "name": "schedsdt", - "type": "expression", - "expression": "timestamp_format(timestamp_parse(schedsdt))" - }, - { - "name": "deldt", - "type": "expression", - "expression": "timestamp_format(timestamp_parse(deldt))" - }, - { - "name": "sdt", - "type": "expression", - "expression": "timestamp_format(timestamp_parse(sdt))" - }, - { - "name": "edt", - "type": "expression", - "expression": "timestamp_format(timestamp_parse(edt))" } ] }, diff --git a/druid_ingests/data-lassen-allocation-history.json b/druid_ingests/data-lassen-allocation-history.json index 43086bdd0c7ee91632e3735b3ad7d97d2097181e..076b2e0e2e485c7b68214320f54131626a5ea9d7 100644 --- a/druid_ingests/data-lassen-allocation-history.json +++ b/druid_ingests/data-lassen-allocation-history.json @@ -25,7 +25,8 @@ "type": "dynamic" }, "maxNumConcurrentSubTasks": 2, - "maxRowsInMemory": 100000 + "maxRowsInMemory": 100000, + "awaitSegmentAvailabilityTimeoutMillis": 1800000 }, "dataSchema": { "dataSource": "svc-ts-exadigit-data-lassen-allocation-history", diff --git a/druid_ingests/data-lassen-node-history.json b/druid_ingests/data-lassen-node-history.json index 191c5292aed593a0dd439f4f471531b725e8af20..a9a0e24424f3870698d5d462286c6a662ef0672b 100644 --- a/druid_ingests/data-lassen-node-history.json +++ b/druid_ingests/data-lassen-node-history.json @@ -25,7 +25,8 @@ "type": "dynamic" }, "maxNumConcurrentSubTasks": 2, - "maxRowsInMemory": 100000 + "maxRowsInMemory": 100000, + "awaitSegmentAvailabilityTimeoutMillis": 1800000 }, "dataSchema": { "dataSource": "svc-ts-exadigit-data-lassen-node-history", diff --git a/druid_ingests/data-lassen-step-history.json b/druid_ingests/data-lassen-step-history.json index b9c3dc34c814eed7f994db8911a93f9ad27e99a1..b3c445ed2d661474275c7b06933c4ae4fba6f8c4 100644 --- a/druid_ingests/data-lassen-step-history.json +++ b/druid_ingests/data-lassen-step-history.json @@ -25,10 +25,11 @@ "type": "dynamic" }, "maxNumConcurrentSubTasks": 2, - "maxRowsInMemory": 100000 + "maxRowsInMemory": 100000, + "awaitSegmentAvailabilityTimeoutMillis": 1800000 }, "dataSchema": { - "dataSource": "svc-ts-exadigit-data-fugaku-lassen-step-history", + "dataSource": "svc-ts-exadigit-data-lassen-step-history", "timestampSpec": { "column": "!!!_no_such_column_!!!", "missingValue": "2010-01-01T00:00:00Z" diff --git a/druid_ingests/data-marconi100.json b/druid_ingests/data-marconi100.json index fc8b5c7d0d8979df78dca8ac32514d9af2ac88ee..1e89d82e3963dee46ffee7c306d8aab9c0a52527 100644 --- a/druid_ingests/data-marconi100.json +++ b/druid_ingests/data-marconi100.json @@ -24,7 +24,8 @@ "type": "dynamic" }, "maxNumConcurrentSubTasks": 2, - "maxRowsInMemory": 100000 + "maxRowsInMemory": 100000, + "awaitSegmentAvailabilityTimeoutMillis": 1800000 }, "dataSchema": { "dataSource": "svc-ts-exadigit-data-marconi100", diff --git a/models/README.md b/models/README.md deleted file mode 100644 index c2e932cd375cbc5d2f3f46323543dfb7f0bb22ab..0000000000000000000000000000000000000000 --- a/models/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# FMU Models -Place FMU models here. diff --git a/pyproject.toml b/pyproject.toml index f120dbad29217257c101cbe8e038851898504ea8..460ff2680bc0a62a47c2d2b74d7e535a1c9ccd77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,44 +14,28 @@ readme = "README.md" # license = {file = "LICENSE.txt"} dependencies = [ - "pydantic==2.9.2", - "pydantic-settings==2.5.2 ", - "loguru==0.7.2", - "SQLAlchemy==2.0.27", - "pydruid==0.6.6", - "PyYAML==6.0.1", - "kafka-python==2.0.2", - "python-snappy==0.6.1", - "jsonpath-ng==1.6.1", - "fastapi==0.115.2", - "gunicorn==23.0.0", - "uvicorn==0.32.0", - "sqlparse==0.5.1", - "kubernetes==29.0.0", - "matplotlib==3.7.2", - "numpy==1.23.5", - "rich==13.6.0", - "fmpy==0.3.19", - "pandas==2.0.3", - "scipy==1.10.1", - "pyarrow==15.0.1", - "uncertainties==3.2.1", - "ClusterShell==1.9.2", - "elasticsearch==7.13.4", - "elasticsearch-dbapi==0.2.11", - "tqdm==4.66.5", - "requests==2.32.3", + "pydantic==2.11.9", + "pydantic-settings==2.10.1", + "loguru==0.7.3", + "SQLAlchemy==2.0.43", + "pydruid==0.6.9", + "python-snappy==0.7.3", + "jsonpath-ng==1.7.0", + "fastapi==0.116.2", + "uvicorn==0.35.0", + "sqlparse==0.5.3", + "kubernetes==33.1.0", + "tenacity==9.1.2", + "elasticsearch==7.17.12", + "requests==2.32.5", + "orjson==3.11.3", + "confluent_kafka==2.11.1", + "pyjson5==2.0.0", + "psutil==7.1.0", + "raps@{root:uri}/raps", ] [project.optional-dependencies] dev = [ "pytest", ] - -# [project.scripts] -# spam-cli = "spam:main_cli" - -# [project.entry-points."spam.magical"] -# tomatoes = "spam:main_tomatoes" - - \ No newline at end of file diff --git a/raps b/raps new file mode 160000 index 0000000000000000000000000000000000000000..7c9840f8c4272fdae1e926978eb1ae64d67ab350 --- /dev/null +++ b/raps @@ -0,0 +1 @@ +Subproject commit 7c9840f8c4272fdae1e926978eb1ae64d67ab350 diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 02aaea0cdf18dba44b3ca60b239687cbb74e28eb..4f9db50066a0aa716ece13a0e442fe298072ce49 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -1,31 +1,28 @@ #!/bin/bash # Deploy the pod. Pass the environment (prod or stage) you want to deploy to set -e # Exit if any commmand fails +BASE_DIR=$(realpath $(dirname "${BASH_SOURCE[0]}")/..) +cd "$BASE_DIR" REGISTRY="registry.apps.marble.ccs.ornl.gov/stf218-app" -BASE_DIR=$(realpath $(dirname "${BASH_SOURCE[0]}")/..) -cd "$BASE_DIR" ENV=$1 if [ "$ENV" != "prod" ] && [ "$ENV" != "stage" ]; then echo 'You need to pass either "prod" or "stage"' exit fi -SERVER_IMAGE_STREAM="$REGISTRY/exadigit-simulation-server" -JOB_IMAGE_STREAM="$REGISTRY/exadigit-simulation-server-simulation-job" -docker build -t $SERVER_IMAGE_STREAM:latest -f Dockerfile.server . -docker build -t $JOB_IMAGE_STREAM:latest -f Dockerfile.simulation . +SERVER_IMAGE_STREAM="$REGISTRY/exadigit-simulation-server" +docker build -t $SERVER_IMAGE_STREAM:latest -f Dockerfile . docker push $SERVER_IMAGE_STREAM:latest -docker push $JOB_IMAGE_STREAM:latest SERVER_IMAGE=$(docker inspect --format='{{index .RepoDigests 0}}' $SERVER_IMAGE_STREAM:latest) -JOB_IMAGE=$(docker inspect --format='{{index .RepoDigests 0}}' $JOB_IMAGE_STREAM:latest) +echo "$SERVER_IMAGE" # Scale down so pod gets recreated and uses new image. Allow error if pod doesn't exist oc --namespace stf218-app scale deploy -l env=$ENV,app=exadigit-simulation-server --replicas=0 || true # Process template and apply oc process -f ./deployment.yaml -o yaml \ - --param=ENV=$ENV --param=SERVER_IMAGE="$SERVER_IMAGE" --param=JOB_IMAGE="$JOB_IMAGE" \ + --param=ENV=$ENV --param=SERVER_IMAGE="$SERVER_IMAGE" --param=JOB_IMAGE="$SERVER_IMAGE" \ | oc apply -f - diff --git a/scripts/fetch.sh b/scripts/fetch.sh deleted file mode 100755 index 5df0b8a7a1ef8565b18b73f6bbdd10e1dcb059d1..0000000000000000000000000000000000000000 --- a/scripts/fetch.sh +++ /dev/null @@ -1,24 +0,0 @@ -set -e - -mkdir data -cd data - -# lassen -git clone https://github.com/LLNL/LAST/ lassen-repo -cd lassen-repo -git lfs pull -cd .. -mkdir lassen -mv lassen-repo/Lassen-Supercomputer-Job-Dataset/*.csv lassen -rm -rf lassen-repo -python3 ../scripts/preprocess_lassen.py lassen - -# marconi -wget https://zenodo.org/api/records/10127767/files-archive -O marconi100.zip -unzip marconi100.zip -d marconi100 -rm marconi100.zip - -# fugaku -wget https://zenodo.org/api/records/11467483/files-archive -O fugaku.zip -unzip fugaku.zip -d fugaku -rm fugaku/*.csv diff --git a/scripts/fetch_data.sh b/scripts/fetch_data.sh new file mode 100755 index 0000000000000000000000000000000000000000..7e129846b9a7159bcf91a5c305b461da09af3da3 --- /dev/null +++ b/scripts/fetch_data.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -e + +mkdir data + +raps download --system lassen --dest ./data/lassen +mv ./data/lassen/Lassen-Supercomputer-Job-Dataset/* ./data/lassen +rm -rf ./data/lassen/Lassen-Supercomputer-Job-Dataset +python3 ./scripts/preprocess_lassen.py ./data/lassen + +raps download --system marconi100 --dest ./data/marconi100 + +raps download --system fugaku --dest ./data/fugaku +python3 ./scripts/preprocess_fugaku.py ./data/fugaku + +raps download --system adastraMI250 --dest ./data/adastraMI250 diff --git a/scripts/launch_local.sh b/scripts/launch_local.sh deleted file mode 100755 index 871da5f3208b3b6646cddb6d30018d3544bc67af..0000000000000000000000000000000000000000 --- a/scripts/launch_local.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -# Launch local version -set -e # Exit if any commmand fails - -BASE_DIR=$(realpath $(dirname "${BASH_SOURCE[0]}")/..) -cd "$BASE_DIR" - -docker build -t exadigit-simulation-server:latest -f Dockerfile.server . -# docker build -t exadigit-simulation-server-simulation-job:latest -f Dockerfile.simulation . - -# trap 'docker compose down' SIGINT SIGTERM EXIT - -docker stop simulation-server >/dev/null 2>&1 || true -docker compose up -d -docker compose logs -f --no-log-prefix simulation-server diff --git a/scripts/preprocess_fugaku.py b/scripts/preprocess_fugaku.py new file mode 100755 index 0000000000000000000000000000000000000000..739773eef253f21b972649fe8c4140b23ec2410a --- /dev/null +++ b/scripts/preprocess_fugaku.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +Split up the large fugaku parquets so druid doesn't choke on them when ingesting. +""" + +from pathlib import Path +import pandas as pd +import sys +from collections.abc import Iterable +from pyarrow.parquet import ParquetFile +import pyarrow as pa + +def read_parquet_chunked(file, chunk_size) -> Iterable[pd.DataFrame]: + pf = ParquetFile(file) + for chunk in pf.iter_batches(batch_size = chunk_size): + yield chunk.to_pandas() + +if __name__ == "__main__": + data_path = Path(sys.argv[1]) + files = list(data_path.glob("*.parquet")) + + for file in files: + for chunk_df in read_parquet_chunked(file, 100_000): + chunk_df['date'] = pd.to_datetime(chunk_df['sdt']).dt.strftime("%Y-%m-%d") + # fugaku dataset is indexed by submission date + for date, date_df in chunk_df.groupby('date'): + day_dir = data_path / Path(f"date={date}") + day_dir.mkdir(exist_ok = True) + num = max([int(p.stem) for p in day_dir.glob("*.parquet")], default=-1) + 1 + date_df.to_parquet(day_dir / f"{num:03}.parquet") + + # Delete the old parquets + for file in data_path.glob("*.parquet"): + file.unlink() diff --git a/scripts/submit_data_ingests.py b/scripts/submit_data_ingests.py new file mode 100755 index 0000000000000000000000000000000000000000..962500c73a9c2928c053c908a8a5778fae3ab669 --- /dev/null +++ b/scripts/submit_data_ingests.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +Submits the replay data ingests to druid. +""" +from pathlib import Path +import urllib.parse +from typing import Any +import time, os +import pyjson5 +import requests +import getpass, argparse +from loguru import logger + + +class DruidApi: + def __init__(self, url: str, user: str | None = None, password: str | None = None) -> None: + self.url = url.removesuffix("/") + self.user = user + self.password = password + + def request(self, method, url, **kwargs) -> Any: + url = urllib.parse.urljoin(self.url, url) + if self.user and self.password: + auth = (self.user, self.password) + else: + auth = None + + response = requests.request(method, url, timeout = 5 * 60, auth = auth, **kwargs) + if not response.ok: + raise Exception(f"Request {url} failed with {response.status_code}: {response.text}") + + if response.text.strip(): + return response.json() + else: # Some druid endpoints return empty response + return None + + + +def submit_ingest(druid: DruidApi, file): + logger.info(f"Submitting ingest for {file}...") + ingest = pyjson5.loads(Path(file).read_text()) # using yaml as hack to allow comments + ingest_type = ingest['type'] + + if ingest_type == "kafka": + response = druid.request("POST", "/druid/indexer/v1/supervisor", json = ingest) + logger.info(f"Supervisor for {file} submitted") + logger.info(f"See {druid.url}/unified-console.html to view the streaming ingest.") + else: + response = druid.request("POST", "/druid/indexer/v1/task", json = ingest) + task_id = response['task'] + + logger.info(f"See {druid.url}/unified-console.html#tasks/task_id~{task_id} to view ingest progress.") + logger.info(f"Waiting for ingest{task_id} to complete...") + + status = "RUNNING" + while status == "RUNNING": + time.sleep(5) + response = druid.request("GET", f"/druid/indexer/v1/task/{task_id}/status") + status = response['status']['statusCode'] + if status != "SUCCESS": + raise ValueError(f"Ingest for {file} failed!") + else: + logger.info(f"Ingest for {file} finished.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description = __doc__.strip(), + formatter_class = argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("ingests", type = Path, nargs = "*", help = "List of druid ingests") + args = parser.parse_args() + + if not args.ingests: + ingests = sorted(Path("./druid_ingests").resolve().glob("data-*.json")) + else: + ingests = [Path(p).resolve() for p in args.ingests] + + druid_url = os.environ.get("DRUID_URL") + if not druid_url: + druid_url = input("Druid URL (http://localhost:8888): ") + druid_url = druid_url.strip() or "http://localhost:8888" + + druid_username = os.environ.get("DRUID_USERNAME") + if not druid_username: + druid_username = input("Druid Username: ").strip() or None + + druid_password = os.environ.get("DRUID_PASSWORD") + if not druid_password: + druid_password = getpass.getpass("Druid Password: ").strip() or None + + druid = DruidApi(druid_url, druid_username, druid_password) + + for ingest in ingests: + submit_ingest(druid, ingest) + + logger.info("Done!") diff --git a/simulation_dashboard b/simulation_dashboard new file mode 160000 index 0000000000000000000000000000000000000000..4de3411f04d978f0fc24ba79d7352bff5633df26 --- /dev/null +++ b/simulation_dashboard @@ -0,0 +1 @@ +Subproject commit 4de3411f04d978f0fc24ba79d7352bff5633df26 diff --git a/simulation_server/models/output.py b/simulation_server/models/output.py index fb1c619ef695631ba7ac07984fae4ada0a368bed..55187d4b66554ae1af6402de369079ffcb54348e 100644 --- a/simulation_server/models/output.py +++ b/simulation_server/models/output.py @@ -27,7 +27,7 @@ class SchedulerSimJob(BaseModel): nodes: Optional[list[str]] = None """ - The nodes the job is running on ['x2307c3s0b1', 'x2408c5s2b1'] + The nodes the job is running on e.g. ['x2307c3s0b1', 'x2408c5s2b1'] """ # Removing these for now, they are constant and just what you set in the input. @@ -143,6 +143,10 @@ class CoolingSimCDU(BaseModel): col: Optional[int] = None """ Col index of the cdu (Note this is the col of the neighboring cabinet.)""" + # TODO: RAPS supports any number of racks per CDU, while this is still hard-coded to the 3 in + # Frontier. This will work for any system with 3 or less. We need to rethink how the racks are + # stored in the DB, maybe a separate table. Or use an Array type for the field, but that makes + # timeseries aggregation queries harder. rack_1_power: Optional[float] = None rack_2_power: Optional[float] = None rack_3_power: Optional[float] = None @@ -255,6 +259,7 @@ COOLING_CEP_FIELD_SELECTORS = { class SystemInfo(BaseModel): + name: str peak_flops: float peak_power: float g_flops_w_peak: float diff --git a/simulation_server/models/sim.py b/simulation_server/models/sim.py index 104ba4a44be9790861f0a8b267c14cc566120c4c..766bc333ace3a89de26d84ed469959f76e811827 100644 --- a/simulation_server/models/sim.py +++ b/simulation_server/models/sim.py @@ -1,19 +1,16 @@ from __future__ import annotations -from typing import Optional, Literal, Annotated as A, get_args -from datetime import timedelta, datetime, timezone -import json, math -from pydantic import AwareDatetime, model_validator, field_validator, Field +from typing import Optional, Literal, Annotated as A +import json +from pathlib import Path +from pydantic import AwareDatetime, Field, model_validator, BeforeValidator +from raps import SingleSimConfig +from raps.utils import AutoAwareDatetime, ResolvedPath from .base import BaseModel -from .job_state import JobStateEnum from ..util.misc import omit from ..util.api_queries import filter_params, sort_params -SimSystem = Literal["frontier", "fugaku", "lassen", "marconi100"] -SIM_SYSTEMS: tuple[str] = get_args(SimSystem) - - class Sim(BaseModel): """ Represents a single simulation run """ @@ -23,7 +20,7 @@ class Sim(BaseModel): user: Optional[str] = None """ User who launched the simulation """ - system: SimSystem + system: Optional[str] = None state: Optional[Literal['running', 'success', 'fail']] = None @@ -99,92 +96,35 @@ SIM_SORT = sort_params(omit(SIM_API_FIELDS, ['progress', 'progress_date', 'confi ]) -class SimConfig(BaseModel): - start: AwareDatetime - end: AwareDatetime - - system: SimSystem - scheduler: A[SchedulerSimConfig, Field(default_factory = lambda: SchedulerSimConfig())] - cooling: A[CoolingSimConfig, Field(default_factory = lambda: CoolingSimConfig())] - - @model_validator(mode='after') - def validate_model(self): - if self.end <= self.start: - raise ValueError("Start must be less than end") - - if not any(m.enabled for m in [self.scheduler, self.cooling]): - raise ValueError("Must enable one simulation") - if self.cooling.enabled and not self.scheduler.enabled: - raise ValueError("Currently can't run cooling simulation without the scheduler") - return self - - @field_validator("start", mode="after") - @classmethod - def trunc_start(cls, v: datetime, info): - return v.fromtimestamp(math.floor(v.timestamp()), tz=timezone.utc) - - @field_validator("end", mode="after") - @classmethod - def trunc_end(cls, v: datetime, info): - return v.fromtimestamp(math.ceil(v.timestamp()), tz=timezone.utc) - - -class SchedulerSimConfig(BaseModel): - """ - Config for RAPS job simulation. - There are 3 main "modes" for how to run the jobs. - - replay: Replay data based on the real jobs run on Frontier during start/end - - custom: Pass your own set of jobs to submit in the simulation in `jobs` - - random: Run random jobs. You can pass `seed` and `num_jobs` to customize it. - """ - - enabled: bool = False - down_nodes: list[int] = [] # List of hostnames. TODO: allow parsing from xnames - - jobs_mode: Literal['replay', 'custom', 'random', 'test'] = 'random' - schedule_policy: Literal['fcfs', 'sjf', 'prq'] ='fcfs' - """" - Policy to use when scheduling jobs. - Replay mode will ignore this and use the real time jobs were scheduled unless you also set - reschedule to true. - """ - reschedule: bool = False - """ If true, will apply schedule_policy in replay mode """ - - jobs: Optional[list[SchedulerSimCustomJob]] = None - """ - The list of jobs. - Only applicable if jobs_mode is "custom" - """ - - seed: Optional[int] = None - """ - Random seed for consistent random job generation. - Only applicable if jobs_mode is "random" - """ - num_jobs: Optional[int] = None - """ - Number of random jobs to generate. - Only applicable if jobs_mode is "random" - """ - - -class SchedulerSimCustomJob(BaseModel): - # This is mostly a subset of the SchedulerSimJob - name: str - allocation_nodes: int - """ Number of nodes required """ - time_submission: AwareDatetime - time_limit: timedelta - - cpu_util: float - gpu_util: float - cpu_trace: list[float] - gpu_trace: list[float] - - end_state: JobStateEnum - """ Slurm state job will end in """ - - -class CoolingSimConfig(BaseModel): - enabled: bool = False +class ServerSimConfig(SingleSimConfig): + start: AutoAwareDatetime # make start required + """ Start of the simulation """ + + replay: A[list[ResolvedPath] | None, + BeforeValidator(lambda r: ['database'] if r else None, bool)] = None + """ Whether to enable job replay. Pulls data from the database """ + # RAPS replay expects a list of paths, but that's not relevant when we are pulling the data from + # the database. We accept true/false as input and just put a dummy value in for the list. + + def __init__(self, /, **data): + # Override context to set base_path + RAPS_PATH = (Path(__file__) / '../../../raps').resolve() + self.__pydantic_validator__.validate_python( + data, + self_instance=self, + context={ + "base_path": RAPS_PATH, + "force_under_base_path": True, + } + ) + + @model_validator(mode = "before") + def _validate_server_sim_config(cls, data): + data = {**data} + # Force these options regardless of input + data['noui'] = True + data['output'] = "none" + if data.get("workload") == "replay" and 'replay' not in data: + data['replay'] = True + + return data diff --git a/simulation_server/server/config.py b/simulation_server/server/config.py index b33aacd338f0a7e171e00ffc6433d7237df1cee8..5b0d53e6726ec7613d67e5877e04dc21965c12d1 100644 --- a/simulation_server/server/config.py +++ b/simulation_server/server/config.py @@ -4,7 +4,7 @@ from pydantic import StringConstraints from pydantic_settings import BaseSettings, SettingsConfigDict from fastapi import Depends import sqlalchemy as sqla -from kafka import KafkaProducer +from confluent_kafka import Producer from ..util.kafka import get_kafka_producer as _get_kafka_producer from ..util.druid import get_druid_engine as _get_druid_engine @@ -18,7 +18,7 @@ class AppSettings(BaseSettings): root_path: str = "" """ The root path of the application if you are behind a proxy """ - http_port: int = 8080 + http_port: int = 8081 allow_origins: list[str] = [] @@ -43,7 +43,7 @@ DruidDep = A[sqla.Engine, Depends(get_druid_engine)] @functools.cache def get_kafka_producer(): return _get_kafka_producer() -KafkaProducerDep = A[KafkaProducer, Depends(get_kafka_producer)] +KafkaProducerDep = A[Producer, Depends(get_kafka_producer)] class AppDeps_(NamedTuple): diff --git a/simulation_server/server/endpoints.py b/simulation_server/server/endpoints.py index cec31ffeec60cb9c7e99d079b7b2a0f8b18ce941..a892e7f03cfa958941f87a4481c2cabe9b985ea6 100644 --- a/simulation_server/server/endpoints.py +++ b/simulation_server/server/endpoints.py @@ -10,14 +10,14 @@ from ..models.output import ( CoolingSimCDU, COOLING_CDU_FILTERS, COOLING_CDU_FIELD_SELECTORS, CoolingSimCEP, COOLING_CEP_FIELD_SELECTORS, ) -from ..models.sim import Sim, SIM_FIELD_SELECTORS, SIM_FILTERS, SIM_SORT, SimConfig, SimSystem +from ..models.sim import Sim, SIM_FIELD_SELECTORS, SIM_FILTERS, SIM_SORT, ServerSimConfig from ..models.output import SystemInfo from ..util.api_queries import Granularity, granularity_params, Filters, Sort, get_selectors from .config import AppDeps from .service import ( run_simulation, query_sims, query_cooling_sim_cdu, query_scheduler_sim_jobs, query_scheduler_sim_system, query_scheduler_sim_power_history, query_cooling_sim_cep, - get_system_info, + get_systems, get_system_info, ) router = APIRouter(tags=["simulation"]) @@ -27,10 +27,10 @@ GranularityDep = A[Granularity, Depends(granularity_params(default_granularity=t @router.post("/simulation/run", response_model=Sim) -def run(*, sim_config: A[SimConfig, Body()], deps: AppDeps): +def run(*, sim_config: A[ServerSimConfig, Body()], deps: AppDeps): """ Start running a simulation in the background. POST the configuration for the simulation. Returns - a Sim object containing an id you can use to query the results as they are generated. Foo + a Sim object containing an id you can use to query the results as they are generated. """ return run_simulation(sim_config, deps) @@ -212,6 +212,11 @@ def scheduler_system(*, return result -@router.get("/system-info/{system}", response_model=SystemInfo) -def system_info(system: SimSystem): +@router.get("/system/list", response_model=list[SystemInfo]) +def system_list(): + return get_systems() + + +@router.get("/system/{system}", response_model=SystemInfo) +def system_info(system: str): return get_system_info(system = system) diff --git a/simulation_server/server/main.py b/simulation_server/server/main.py index bfb1d05595b50603a8a173c9ef7abc18dfadd3e3..174ac44888bd9c5aa4cd06e13557041cb46da5ed 100644 --- a/simulation_server/server/main.py +++ b/simulation_server/server/main.py @@ -1,6 +1,6 @@ """ A simple REST API for triggering and querying the results from the digital twin """ from pathlib import Path -import subprocess, asyncio, functools, os, json +import asyncio, functools, os, json from contextlib import asynccontextmanager from starlette.exceptions import HTTPException from starlette.requests import Request @@ -15,17 +15,19 @@ from loguru import logger from ..util.druid import submit_ingest from .service import cleanup_jobs from .config import AppSettings, get_app_settings, get_druid_engine, get_kafka_producer +from ..util.kafka import get_kafka_admin +from confluent_kafka.admin import NewTopic settings = AppSettings() def repeat_task(func, seconds): - if not asyncio.iscoroutinefunction(func): - func = functools.partial(run_in_threadpool, func) - async def loop() -> None: while True: - await func() + try: + await func() + except Exception as e: + logger.exception(f"Background task failed: {e}") await asyncio.sleep(seconds) return asyncio.create_task(loop()) @@ -38,20 +40,39 @@ async def lifespan(api: FastAPI): for dep in deps: api.dependency_overrides.get(dep, dep)() - # TODO: Should add cleanup handler for local as well - background_task_loop = None - if settings.env == 'prod' and 'KUBERNETES_SERVICE_HOST' in os.environ: - background_task_loop = repeat_task( - lambda: cleanup_jobs(druid_engine = get_druid_engine(), kafka_producer = get_kafka_producer()), - seconds = 5 * 60, + async def background_task(): + cleanup_jobs( + druid_engine = get_druid_engine(), + kafka_producer = get_kafka_producer(), + settings = get_app_settings(), ) + background_task_loop = repeat_task(background_task, seconds = 2 * 60) if settings.env == 'dev': + kafka_admin = get_kafka_admin() + topic_config = {"compression.type": "snappy"} + topics = [ + NewTopic("svc-event-exadigit-sim", 1, 1, config = topic_config), + NewTopic("svc-ts-exadigit-schedulersimsystem", 4, 1, config = topic_config), + NewTopic("svc-event-exadigit-schedulersimjob", 2, 1, config = topic_config), + NewTopic("svc-ts-exadigit-coolingsimcdu", 4, 1, config = topic_config), + NewTopic("svc-ts-exadigit-coolingsimcep", 2, 1, config = topic_config), + NewTopic("svc-ts-exadigit-jobpowerhistory", 4, 1, config = topic_config), + ] + existing_topics = set(kafka_admin.list_topics().topics.keys()) + new_topics = [t for t in topics if t.topic not in existing_topics] + if new_topics: + logger.info(f"Creating kafka topics {', '.join(t.topic for t in new_topics)}") + kafka_admin.create_topics(new_topics) + druid_ingests_dir = Path(__file__).parent.parent.parent.resolve() / 'druid_ingests' ingests = [ - "cooling-sim-cdu", "cooling-sim-cep", "scheduler-job-power-history", - "scheduler-sim-job", "scheduler-sim-system", "sim", + "scheduler-sim-system", + "scheduler-sim-job", + "cooling-sim-cdu", + "cooling-sim-cep", + "scheduler-job-power-history", ] for ingest in ingests: @@ -59,13 +80,12 @@ async def lifespan(api: FastAPI): yield - # if background_task_loop: - # background_task_loop.cancel() + background_task_loop.cancel() app = FastAPI( title = "ExaDigiT Simulation Server", - version = "0.1.0", + version = "1.0.0", # Simplify ids and names in generated clients a bit # NOTE: This means we need one tag defined (or inherited from the APIRouter object) on every route generate_unique_id_function = lambda route: f"{route.tags[0]}_{route.name}", @@ -110,22 +130,17 @@ if settings.allow_origins: allow_credentials=True, allow_methods=["*"], allow_headers=["*"], -) + ) + if "*" not in settings.allow_origins: + logger.info(f"Frontend hosted at {' '.join(settings.allow_origins)}") from .endpoints import router app.include_router(router) if __name__ == "__main__": - if settings.debug_mode: - uvicorn.run(app, - host='0.0.0.0', - port=settings.http_port, - reload=False, - ) - else: - subprocess.run(["gunicorn", - "simulation_server.server.main:app", - "--bind", f"0.0.0.0:{settings.http_port}", - "--worker-class", "uvicorn.workers.UvicornWorker", - ], check=True) + uvicorn.run(app, + host='0.0.0.0', + port=settings.http_port, + reload=False, + ) diff --git a/simulation_server/server/service.py b/simulation_server/server/service.py index c71422bbd02f2186f3d22e52241eac54738953cb..7be5aaef56f03c679ea65c1cf33bf0fd90df2193 100644 --- a/simulation_server/server/service.py +++ b/simulation_server/server/service.py @@ -1,10 +1,13 @@ from typing import Optional, Any from datetime import datetime, timedelta, timezone +import psutil +import functools import uuid, time, json, base64, os, sys, subprocess import sqlalchemy as sqla from loguru import logger from pydantic import ValidationError -from ..models.sim import Sim, SimConfig, SIM_FILTERS, SIM_FIELD_SELECTORS, SimSystem +from fastapi import HTTPException +from ..models.sim import Sim, ServerSimConfig, SIM_FILTERS, SIM_FIELD_SELECTORS from ..models.base import ResponseFormat from ..models.output import ( COOLING_CDU_API_FIELDS, COOLING_CDU_FIELD_SELECTORS, @@ -13,14 +16,14 @@ from ..models.output import ( SCHEDULER_SIM_JOB_POWER_HISTORY_API_FIELDS, SCHEDULER_SIM_JOB_POWER_HISTORY_FIELD_SELECTORS, ) from ..util.misc import pick, omit -from ..util.k8s import submit_job, get_job, get_job_state, get_job_end_time +from ..util.k8s import submit_job, get_k8s_job, get_k8s_job_state, get_k8s_job_end_time from ..util.druid import to_timestamp, any_value, latest, execute_ignore_missing from ..util.api_queries import ( Filters, Sort, QuerySpan, Granularity, expand_field_selectors, DatetimeValidator, DEFAULT_FIELD_TYPES, ) from . import orm -from .config import AppDeps, AppSettings +from .config import AppDeps def wait_until_exists(stmt: sqla.Select, *, timeout: timedelta = timedelta(minutes=1), druid_engine: sqla.Engine): @@ -51,12 +54,12 @@ def wait_until_exists(stmt: sqla.Select, *, timeout: timedelta = timedelta(minut -def run_simulation(sim_config: SimConfig, deps: AppDeps): +def run_simulation(sim_config: ServerSimConfig, deps: AppDeps): sim = Sim( # Random sim id, use base32 to make it a bit shorter id = base64.b32encode(uuid.uuid4().bytes).decode().rstrip('=').lower(), user = "unknown", # TODO pull this from cookie/auth header - system = sim_config.system, + system = sim_config.system_configs[0].system_name, state = "running", start = sim_config.start, end = sim_config.end, @@ -67,7 +70,7 @@ def run_simulation(sim_config: SimConfig, deps: AppDeps): config = sim_config.model_dump(mode = 'json'), ) logger.info(f"Launching simulation {sim.id}") - deps.kafka_producer.send("svc-event-exadigit-sim", value = sim.serialize_for_druid()) + deps.kafka_producer.produce("svc-event-exadigit-sim", sim.serialize_for_druid()) deps.kafka_producer.flush() if 'KUBERNETES_SERVICE_HOST' in os.environ: # We're running on k8s @@ -83,7 +86,7 @@ def run_simulation(sim_config: SimConfig, deps: AppDeps): { "name": "main", "image": deps.settings.job_image, - "command": ['python3', "-m", "simulation_server.simulation.main", "background-job"], + "command": ['python3', "-m", "simulation_server.simulation.main"], "env": [ {"name": "SIM", "value": sim.model_dump_json()}, ], @@ -104,7 +107,7 @@ def run_simulation(sim_config: SimConfig, deps: AppDeps): }) else: # Running locally, just use a subprocess proc = subprocess.Popen( - args = [sys.executable, "-m", "simulation_server.simulation.main", "background-job"], + args = [sys.executable, "-m", "simulation_server.simulation.main"], env = { "SIM": sim.model_dump_json(), **os.environ, @@ -119,64 +122,56 @@ def run_simulation(sim_config: SimConfig, deps: AppDeps): return sim -_sim_jobs_cache: dict[str, tuple[Any, datetime]] = {} -_sim_job_cache_expire = timedelta(minutes=5) -def get_sim_job(sim_id: str): - now = datetime.now() - # Expire old entries - for cid in list(_sim_jobs_cache.keys()): - if (now - _sim_jobs_cache[cid][1]) > _sim_job_cache_expire: - del _sim_jobs_cache[cid] - - if sim_id not in _sim_jobs_cache: - _sim_jobs_cache[sim_id] = (get_job(f"exadigit-simulation-server-{sim_id}"), now) - - return _sim_jobs_cache[sim_id][0] - - -def cleanup_jobs(druid_engine, kafka_producer): +def cleanup_jobs(druid_engine, kafka_producer, settings): """ If a simulation job dies unexpectedly (e.g. OOM error), it won't be able to send the kafka message marking the sim as complete, leaving the sim stuck as running. This task checks all running sim jobs and cleans them up if their job is dead. """ + if 'KUBERNETES_SERVICE_HOST' in os.environ and settings.env != 'prod': + # Skip job cleanup on stage/dev k8s deployments to avoid multiple instances of the server + # trying to cancel jobs + return logger.info(f"Checking for stuck jobs") now = datetime.now(timezone.utc) - threshold = timedelta(minutes=5) + # threshold after job has ended before sending a cancel (incase the job did send its own + # cancel message and it just hasn't shown up in Druid yet) + threshold = timedelta(minutes=2) - sims, _ = query_sims( + running_sims, _ = query_sims( filters=SIM_FILTERS(state = ["eq:running"]), - fields = ["id"], + fields = ["all"], limit = 1000, # If somehow there's more than that we'll just get them next trigger druid_engine = druid_engine, ) - - stuck_ids = [] - for sim in sims: - job = get_sim_job(sim.id) - job_state = get_job_state(job) - if job_state != 'running' and (not job or get_job_end_time(job) < now - threshold): - stuck_ids.append(sim.id) - - if stuck_ids: - stuck_sims, _ = query_sims( - filters = SIM_FILTERS(id = [f'one_of:{",".join(stuck_ids)}']), - fields = ['all'], - limit = len(stuck_ids), - druid_engine = druid_engine, - ) - - for sim in stuck_sims: + + running_jobs = set() + if 'KUBERNETES_SERVICE_HOST' in os.environ: + for sim in running_sims: + job = get_k8s_job(f"exadigit-simulation-server-{sim.id}") + # Add a little bit of threshold to avoid potentially sending duplicate fail messages + if get_k8s_job_state(job) == "running" or (job and now - get_k8s_job_end_time(job) < threshold): + running_jobs.add(sim.id) + else: + for proc in psutil.Process().children(): + try: + if 'simulation_server.simulation.main' in proc.cmdline(): + sim_id = json.loads(proc.environ()["SIM"])['id'] + if proc.is_running(): + running_jobs.add(sim_id) + except (psutil.Error): + pass + + for sim in running_sims: + if sim.id not in running_jobs and now - sim.execution_start > threshold: sim.state = 'fail' sim.execution_end = now sim.error_messages = "Simulation crashed" logger.warning(f"Marking stuck sim {sim.id} as failed") - kafka_producer.send("svc-event-exadigit-sim", - value = sim.serialize_for_druid() - ) + kafka_producer.produce("svc-event-exadigit-sim", sim.serialize_for_druid()) - for sim in stuck_sims: + # Block until saved to make sure we don't double-send stmt = ( sqla.select(orm.sim.c.id) .where(orm.sim.c.id == sim.id, orm.sim.c.state == 'fail') @@ -696,7 +691,21 @@ def build_scheduler_sim_power_history_query(*, ) -def get_system_info(system: SimSystem): - from ..simulation.simulation import get_scheduler - sc = get_scheduler(system = system) - return sc.get_gauge_limits() +@functools.cache +def get_systems(): + from raps.system_config import list_systems + return [get_system_info(s) for s in list_systems()] + + +@functools.cache +def get_system_info(system: str): + from raps.system_config import list_systems + from raps import Engine, SingleSimConfig + from raps.stats import get_gauge_limits + if system not in list_systems(): + raise HTTPException(status_code=404, detail=f"System {system} not found") + engine = Engine(SingleSimConfig(system = system)) + return { + "name": system, + **get_gauge_limits(engine), + } diff --git a/simulation_server/simulation/dataloaders.py b/simulation_server/simulation/dataloaders.py deleted file mode 100644 index ab35d7275db9df0e90946be69d4ee73cf5addcbc..0000000000000000000000000000000000000000 --- a/simulation_server/simulation/dataloaders.py +++ /dev/null @@ -1,177 +0,0 @@ -import pandas as pd -import numpy as np -import sqlalchemy as sqla -from loguru import logger -from datetime import datetime, timedelta -from .raps.raps.telemetry import Telemetry -from ..models.sim import SimConfig -from ..util.druid import get_druid_engine, get_table, to_timestamp -from ..util.es import get_nccs_cadence_engine -from . import SimException - - -def fetch_frontier_data(sim_config: SimConfig, raps_config: dict): - """ - Fetch and parse real telemetry data - """ - # TODO: Should consider using LVA API instead of directly querying the DB for this - nccs_cadence_engine = get_nccs_cadence_engine() - druid_engine = get_druid_engine() - start, end = sim_config.start, sim_config.end - - job_query = sqla.text(""" - SELECT - "allocation_id", "job_id", "slurm_version", "account", "group", "user", "name", - "time_limit", "time_submission", "time_eligible", "time_start", "time_end", "time_elapsed", - "node_count", xnames_str AS "xnames", "state_current", "state_reason", - "time_snapshot" - FROM "stf218.frontier.job-summary" - WHERE - (time_start IS NOT NULL AND time_start <= CONVERT(:end, TIMESTAMP)) AND - (time_end IS NULL OR time_end > CONVERT(:start, TIMESTAMP)) - """).bindparams( - start = start.isoformat(), end = end.isoformat(), - ) - job_data = pd.read_sql_query(job_query, nccs_cadence_engine, parse_dates=[ - "time_snapshot", "time_submission", "time_eligible", "time_start", "time_end", - ]) - # TODO: Even with sqlStringifyArrays: false, multivalue columns are returned as json strings. - # And single rows are returned as raw strings. When we update Druid we can use ARRAYS and remove - # this. Moving the jobs table to postgres would also fix this (and other issues). - job_data['xnames'] = job_data['xnames'].map(lambda x: x.split(",") if x else []) - - job_profile_tbl = get_table("pub-ts-frontier-job-profile", druid_engine) - job_profile_query = ( - sqla.select( - job_profile_tbl.c['__time'].label("timestamp"), - job_profile_tbl.c.allocation_id, - job_profile_tbl.c.sum_cpu0_power, - job_profile_tbl.c.sum_gpu_power, - ) - .where( - to_timestamp(start) <= job_profile_tbl.c['__time'], - job_profile_tbl.c['__time'] < to_timestamp(end), - ) - ) - job_profile_data = pd.read_sql(job_profile_query, druid_engine, parse_dates=[ - "timestamp", - ]) - - if (job_data.empty or job_profile_data.empty): - raise SimException(f"No telemetry data for {start.isoformat()} -> {end.isoformat()}") - - telemetry = Telemetry(system = "frontier", config = raps_config) - jobs = telemetry.load_data_from_df(job_data, job_profile_data, - min_time = start, - reschedule = sim_config.scheduler.reschedule, - config = raps_config, - ) - return jobs - - -def query_time_range( - tbl_name: str, start: datetime, end: datetime, end_col: str, *, - druid_engine, parse_dates: list[str], -): - tbl = get_table(tbl_name, druid_engine) - query = ( - sqla.select(sqla.text("*")) - .where( - (tbl.c['__time'] <= to_timestamp(end)) & - (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) & - (tbl.c[end_col] >= to_timestamp(start)) - ) - ) - data = pd.read_sql(query, druid_engine, parse_dates=parse_dates) - return data - - -def split_list(x): - x = x.split(",") if x else [] - return np.array([int(x) for x in x]) - - -def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict): - druid_engine = get_druid_engine() - start, end = sim_config.start, sim_config.end - - data = query_time_range( - "svc-ts-exadigit-data-fugaku", start, end, 'edt', - druid_engine = druid_engine, - parse_dates = ["adt", "qdt", "schedsdt", "deldt", "sdt", "edt"], - ) - telemetry = Telemetry(system = "fugaku", config = raps_config) - jobs = telemetry.load_data_from_df(data, - min_time = start, - reschedule = sim_config.scheduler.reschedule, - config = raps_config, - ) - return jobs - - -def fetch_marconi100_data(sim_config: SimConfig, raps_config: dict): - druid_engine = get_druid_engine() - start, end = sim_config.start, sim_config.end - - data = query_time_range( - "svc-ts-exadigit-data-marconi100", start, end, 'end_time', - druid_engine = druid_engine, - parse_dates = ["submit_time", "start_time", "end_time", "eligible_time"], - ) - - data['nodes'] = data['nodes'].map(split_list) - data['node_power_consumption'] = data['node_power_consumption'].map(split_list) - data['mem_power_consumption'] = data['mem_power_consumption'].map(split_list) - data['cpu_power_consumption'] = data['cpu_power_consumption'].map(split_list) - - telemetry = Telemetry(system = "marconi100", config = raps_config) - jobs = telemetry.load_data_from_df(data, - min_time = start, - reschedule = sim_config.scheduler.reschedule, - config = raps_config, - ) - return jobs - - -def fetch_lassen_data(sim_config: SimConfig, raps_config: dict): - druid_engine = get_druid_engine() - start, end = sim_config.start, sim_config.end - - allocation_df = query_time_range( - "svc-ts-exadigit-data-lassen-allocation-history", start, end, 'end_time', - druid_engine = druid_engine, - parse_dates = ["begin_time", "end_time", "job_submit_time"], - ) - - tbl = get_table("svc-ts-exadigit-data-lassen-node-history", druid_engine) - node_query = ( - sqla.select(sqla.text("*")) - .where( - (tbl.c['__time'] <= to_timestamp(end)) & - (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) - ) - ) - node_df = pd.read_sql(node_query, druid_engine) - - step_df = query_time_range( - "svc-ts-exadigit-data-fugaku-lassen-step-history", start, end, 'end_time', - druid_engine = druid_engine, - parse_dates = ["begin_time", "end_time"], - ) - - telemetry = Telemetry(system = "lassen", config = raps_config) - jobs = telemetry.load_data_from_df( - allocation_df = allocation_df, node_df = node_df, step_df = step_df, - min_time = start, - reschedule = sim_config.scheduler.reschedule, - config = raps_config, - ) - return jobs - - -DATA_LOADERS = { - "frontier": fetch_frontier_data, - "fugaku": fetch_fugaku_data, - "marconi100": fetch_marconi100_data, - "lassen": fetch_lassen_data, -} \ No newline at end of file diff --git a/simulation_server/simulation/dataloaders/frontier.py b/simulation_server/simulation/dataloaders/frontier.py new file mode 100644 index 0000000000000000000000000000000000000000..3b006cdd060cdb2f4600447d89793dba53062c64 --- /dev/null +++ b/simulation_server/simulation/dataloaders/frontier.py @@ -0,0 +1,67 @@ +from ...util.druid import get_druid_engine, get_table, to_timestamp +from ...util.dataloader import query_time_range +from ...util.es import get_nccs_cadence_es, es_sql_query +from ...models.sim import ServerSimConfig +from .. import SimException +import sqlalchemy as sqla +import pandas as pd + +# Re-use these from the raps dataloader +from raps.dataloaders.frontier import load_data_from_df, node_index_to_name, cdu_index_to_name, cdu_pos + + +def load_data(_paths, **kwargs): + # TODO: Should consider using LVA API instead of directly querying the DB for this + druid_engine = get_druid_engine() + es = get_nccs_cadence_es() + + sim_config: ServerSimConfig = kwargs['sim_config'] + start, end = sim_config.start, sim_config.end + + job_query = """ + SELECT + "allocation_id", "job_id", "slurm_version", "account", "group", "user", "name", + "time_limit", "time_submission", "time_eligible", "time_start", "time_end", "time_elapsed", + "node_count", xnames_str AS "xnames", "state_current", "state_reason", + "time_snapshot" + FROM "stf218.frontier.job-summary" + WHERE + (time_end IS NULL OR time_end > CONVERT(?, TIMESTAMP)) AND + (time_start IS NOT NULL AND time_start <= CONVERT(?, TIMESTAMP)) + """ + job_query_params = [start.isoformat(), end.isoformat()] + job_data = es_sql_query(es, job_query, job_query_params, fetch_size=500) + + job_df = pd.DataFrame(job_data) + job_df['time_snapshot'] = pd.to_datetime(job_df['time_snapshot']) + job_df["time_submission"] = pd.to_datetime(job_df["time_submission"]) + job_df["time_eligible"] = pd.to_datetime(job_df["time_eligible"]) + job_df["time_start"] = pd.to_datetime(job_df["time_start"]) + job_df["time_end"] = pd.to_datetime(job_df["time_end"]) + job_df['xnames'] = job_df['xnames'].map(lambda x: x.split(",") if x else []) + + job_profile_tbl = get_table("pub-ts-frontier-job-profile", druid_engine) + job_profile_query = ( + sqla.select( + job_profile_tbl.c['__time'].label("timestamp"), + job_profile_tbl.c.allocation_id, + job_profile_tbl.c.sum_cpu0_power, + job_profile_tbl.c.sum_gpu_power, + ) + .where( + to_timestamp(start) <= job_profile_tbl.c['__time'], + job_profile_tbl.c['__time'] < to_timestamp(end), + ) + ) + job_profile_df = pd.read_sql(job_profile_query, druid_engine, parse_dates=[ + "timestamp", + ]) + + from loguru import logger + logger.info(f"job_df {job_df}") + logger.info(f"job_profile_df {job_profile_df}") + + if (job_df.empty or job_profile_df.empty): + raise SimException(f"No telemetry data for {start.isoformat()} -> {end.isoformat()}") + + return load_data_from_df(job_df, job_profile_df, **kwargs) diff --git a/simulation_server/simulation/dataloaders/fugaku.py b/simulation_server/simulation/dataloaders/fugaku.py new file mode 100644 index 0000000000000000000000000000000000000000..87b103c3c44c3a5c1ff497e0c8b4559cc3865ce3 --- /dev/null +++ b/simulation_server/simulation/dataloaders/fugaku.py @@ -0,0 +1,18 @@ +from ...util.druid import get_druid_engine +from ...util.dataloader import query_time_range +from ...models.sim import ServerSimConfig + +# Re-use these from the raps dataloader +from raps.dataloaders.fugaku import load_data_from_df, node_index_to_name, cdu_index_to_name, cdu_pos + + +def load_data(_paths, **kwargs): + druid_engine = get_druid_engine() + sim_config: ServerSimConfig = kwargs['sim_config'] + start, end = sim_config.start, sim_config.end + df = query_time_range( + "svc-ts-exadigit-data-fugaku", start, end, 'sdt', 'edt', + druid_engine = druid_engine, + parse_dates = ["adt", "qdt", "schedsdt", "deldt", "sdt", "edt"], + ) + return load_data_from_df(df, **kwargs) diff --git a/simulation_server/simulation/dataloaders/lassen.py b/simulation_server/simulation/dataloaders/lassen.py new file mode 100644 index 0000000000000000000000000000000000000000..be1e52dff840590d8f09de1aa049571e7ba3049f --- /dev/null +++ b/simulation_server/simulation/dataloaders/lassen.py @@ -0,0 +1,51 @@ +import pandas as pd +import sqlalchemy as sqla +from datetime import timedelta + +from ...util.druid import get_druid_engine, get_table, to_timestamp +from ...util.dataloader import query_time_range +from ...models.sim import ServerSimConfig + +# Re-use these from the raps dataloader +from raps.dataloaders.lassen import load_data_from_df, node_index_to_name, cdu_index_to_name, cdu_pos + + +def load_data(_paths, **kwargs): + druid_engine = get_druid_engine() + sim_config: ServerSimConfig = kwargs['sim_config'] + start, end = sim_config.start, sim_config.end + + allocation_df = query_time_range( + "svc-ts-exadigit-data-lassen-allocation-history", + start, end, 'begin_time', 'end_time', + druid_engine = druid_engine, + parse_dates = ["begin_time", "end_time", "job_submit_time"], + ) + # load_data_from_df expects naive datetimes + allocation_df["begin_time"] = allocation_df["begin_time"].dt.tz_localize(None) + allocation_df["end_time"] = allocation_df["end_time"].dt.tz_localize(None) + allocation_df["job_submit_time"] = allocation_df["job_submit_time"].dt.tz_localize(None) + + tbl = get_table("svc-ts-exadigit-data-lassen-node-history", druid_engine) + node_query = ( + sqla.select(sqla.text("*")) + .where( + (tbl.c['__time'] <= to_timestamp(end)) & + (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) + ) + ) + node_df = pd.read_sql(node_query, druid_engine) + + # step_df doesn't appear to actually be used by load_data_from_df? + step_df = query_time_range( + "svc-ts-exadigit-data-lassen-step-history", start, end, 'begin_time', 'end_time', + druid_engine = druid_engine, + parse_dates = ["begin_time", "end_time"], + ) + step_df["begin_time"] = step_df["begin_time"].dt.tz_localize(None) + step_df["end_time"] = step_df["end_time"].dt.tz_localize(None) + + return load_data_from_df( + allocation_df = allocation_df, node_df = node_df, step_df = step_df, + **kwargs, + ) diff --git a/simulation_server/simulation/dataloaders/marconi100.py b/simulation_server/simulation/dataloaders/marconi100.py new file mode 100644 index 0000000000000000000000000000000000000000..e4fcad0ec83820f11809c0f9779c8233065ebcc2 --- /dev/null +++ b/simulation_server/simulation/dataloaders/marconi100.py @@ -0,0 +1,24 @@ +from ...util.druid import get_druid_engine +from ...util.dataloader import query_time_range, split_list +from ...models.sim import ServerSimConfig + +# Re-use these from the raps dataloader +from raps.dataloaders.marconi100 import load_data_from_df, node_index_to_name, cdu_index_to_name, cdu_pos + + +def load_data(_paths, **kwargs): + druid_engine = get_druid_engine() + sim_config: ServerSimConfig = kwargs['sim_config'] + start, end = sim_config.start, sim_config.end + df = query_time_range( + "svc-ts-exadigit-data-marconi100", + start, end, 'start_time', 'end_time', + druid_engine = druid_engine, + parse_dates = ["submit_time", "start_time", "end_time", "eligible_time"], + ) + df['nodes'] = df['nodes'].map(split_list) + df['node_power_consumption'] = df['node_power_consumption'].map(split_list) + df['mem_power_consumption'] = df['mem_power_consumption'].map(split_list) + df['cpu_power_consumption'] = df['cpu_power_consumption'].map(split_list) + + return load_data_from_df(df, **kwargs) diff --git a/simulation_server/simulation/main.py b/simulation_server/simulation/main.py index 01c09ddd5f6b9a47a002dc52d2622600bf5b1115..3b83d2a72dfaf9a82d558e55ddebd412171146a0 100644 --- a/simulation_server/simulation/main.py +++ b/simulation_server/simulation/main.py @@ -1,98 +1,111 @@ """ A script to run the ExaDigiT simulation """ -import argparse, os, json +from collections.abc import Iterable +import argparse, os, orjson from pathlib import Path from datetime import datetime, timezone from loguru import logger import yaml -from ..models.sim import Sim, SimConfig +from ..models.sim import Sim, ServerSimConfig from .simulation import run_simulation from ..util.kafka import get_kafka_producer -def cli_run(config: SimConfig): - for i, data in enumerate(run_simulation(config)): - print(f"TICK {i}") - - -def background_job(sim: Sim): +def run_simulation_serialized(sim: Sim) -> Iterable[dict[str, list[bytes]]]: sim = sim.model_copy() - kafka_producer = get_kafka_producer() - def output_rows(topic, rows): - for row in rows: - value = json.dumps({"sim_id": sim.id, **row.model_dump(mode='json')}).encode() - kafka_producer.send(topic=topic, value=value) + def serialize_rows(rows): + return [ + orjson.dumps({"sim_id": sim.id, **row.model_dump(mode='json')}) + for row in rows + ] - logger.info(f"Starting simulation {sim.model_dump_json()}") - config = SimConfig.model_validate(sim.config) + logger.info(f"Starting simulation: {sim.model_dump_json(indent = 4)}") + config = ServerSimConfig.model_validate(sim.config) progress_date = sim.start try: for data in run_simulation(config): - output_rows("svc-ts-exadigit-schedulersimsystem", data.scheduler_sim_system) - output_rows("svc-event-exadigit-schedulersimjob", data.scheduler_sim_jobs) - output_rows("svc-ts-exadigit-coolingsimcdu", data.cooling_sim_cdus) - output_rows("svc-ts-exadigit-coolingsimcep", data.cooling_sim_cep) - output_rows("svc-ts-exadigit-jobpowerhistory", data.power_history) + yield { + "svc-ts-exadigit-schedulersimsystem": serialize_rows(data.scheduler_sim_system), + "svc-event-exadigit-schedulersimjob": serialize_rows(data.scheduler_sim_jobs), + "svc-ts-exadigit-coolingsimcdu": serialize_rows(data.cooling_sim_cdus), + "svc-ts-exadigit-coolingsimcep": serialize_rows(data.cooling_sim_cep), + "svc-ts-exadigit-jobpowerhistory": serialize_rows(data.power_history), + } progress_date = data.timestamp + if data.timestamp.second == 0: + logger.info(f"progress: {data.timestamp.isoformat()} / {sim.end.isoformat()}") except BaseException as e: sim.state = "fail" sim.execution_end = datetime.now(timezone.utc) sim.error_messages = str(e) sim.progress_date = progress_date - kafka_producer.send("svc-event-exadigit-sim", value = sim.serialize_for_druid()) - kafka_producer.close() # Close and wait for messages to be sent + yield {"svc-event-exadigit-sim": [sim.serialize_for_druid()]} logger.info(f"Simulation {sim.id} failed") raise e sim.state = "success" sim.execution_end = datetime.now(timezone.utc) sim.progress_date = sim.end - kafka_producer.send(topic = "svc-event-exadigit-sim", value = sim.serialize_for_druid()) - kafka_producer.close() # Close and wait for messages to be sent + yield {"svc-event-exadigit-sim": [sim.serialize_for_druid()]} logger.info(f"Simulation {sim.id} finished") +def write_sim_to_kafka(sim: Sim): + kafka_producer = get_kafka_producer({ + 'bootstrap.servers': os.environ['KAFKA_BOOTSTRAP'], + 'linger.ms': 2 * 1000, + 'batch.size': 65536, + "compression.type": "snappy", + }) + + try: + for data in run_simulation_serialized(sim): + # kafka_producer does its own buffering of output so we don't need to worry about batching + for topic, messages in data.items(): + for message in messages: + kafka_producer.poll(0) + kafka_producer.produce(topic, message) + finally: + kafka_producer.flush() + + +def write_sim_to_disk(sim: Sim, dest: str): + Path(dest).mkdir(exist_ok=True) + files = {} + try: + for data in run_simulation_serialized(sim): + for topic, rows in data.items(): + if topic not in files: + files[topic] = open(Path(dest) / f"{topic}.jsonl", 'ab') + files[topic].writelines(l + b"\n" for l in rows) + finally: + for file in files.values(): + file.close() + + if __name__ == "__main__": parser = argparse.ArgumentParser( description = __doc__.strip(), allow_abbrev = False, formatter_class = argparse.RawDescriptionHelpFormatter, ) - subparsers = parser.add_subparsers(required=True, dest="action") - parser_cli_run = subparsers.add_parser('run') - parser_cli_run.add_argument("--config", type=str, help="JSON config string") - parser_cli_run.add_argument("--config-file", type=Path, help="Path to a yaml or json file contain the config") - - parser_cli_run = subparsers.add_parser('background-job') - parser_cli_run.add_argument("--sim", type=str, help="JSON config string") + parser.add_argument("--sim", type=str, help="Sim json") + parser.add_argument("--dest", default=None) args = parser.parse_args() + if args.sim: + sim = args.sim + elif os.environ.get("SIM"): + sim = os.environ["SIM"] + else: + raise Exception("No configuration passed") + + sim = Sim.model_validate(yaml.safe_load(sim)) - if args.action == "run": - if args.config and args.config_file: - raise Exception("You can only specify either config or config-file") - - if args.config: - config = yaml.safe_load(args.config) - elif args.config_file: - config = yaml.safe_load(args.config_file.read_text()) - elif "SIM_CONFIG" in os.environ: - config = yaml.safe_load(os.environ["SIM_CONFIG"]) - elif "SIM_CONFIG_FILE" in os.environ: - config = yaml.safe_load(Path(os.environ["SIM_CONFIG_FILE"]).read_text()) - else: - raise Exception("No configuration passed") - config = SimConfig.model_validate(config) - cli_run(config) - elif args.action == "background-job": - if args.sim: - sim = yaml.safe_load(args.sim) - elif "SIM" in os.environ: - sim = yaml.safe_load(os.environ["SIM"]) - else: - raise Exception("No sim passed") - sim = Sim.model_validate(sim) - background_job(sim) + if args.dest: + write_sim_to_disk(sim, args.dest) + else: + write_sim_to_kafka(sim) diff --git a/simulation_server/simulation/raps b/simulation_server/simulation/raps deleted file mode 160000 index 945f878039b82207fca96182544536914809a302..0000000000000000000000000000000000000000 --- a/simulation_server/simulation/raps +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 945f878039b82207fca96182544536914809a302 diff --git a/simulation_server/simulation/simulation.py b/simulation_server/simulation/simulation.py index 60b00fdfd813569d3958a4b4070c352a0f6f282b..bb68f4bf19ee76840d6c8afd93294a785c64db94 100644 --- a/simulation_server/simulation/simulation.py +++ b/simulation_server/simulation/simulation.py @@ -1,37 +1,22 @@ from typing import NamedTuple from datetime import datetime, timedelta -from pathlib import Path -import random, math, functools -import numpy as np +import functools, itertools +import importlib, importlib.util +import orjson from loguru import logger -from .raps.raps.config import ConfigManager -from .raps.raps.cooling import ThermoFluidsModel -from .raps.raps.power import PowerManager -from .raps.raps.flops import FLOPSManager -from .raps.raps.scheduler import Scheduler -from .raps.raps.telemetry import Telemetry -from .raps.raps.workload import Workload -from ..models.sim import SimConfig, SimSystem +from raps import Engine +from raps.job import Job as RapsJob +from raps.stats import RunningStats +from ..models.sim import ServerSimConfig from ..models.output import ( JobStateEnum, SchedulerSimJob, SchedulerSimJobPowerHistory, SchedulerSimSystem, CoolingSimCDU, CoolingSimCEP, ) from ..util.misc import nest_dict from . import SimException -from .dataloaders import DATA_LOADERS -PKG_PATH = Path(__file__).parent.parent.parent - - -def _offset_to_time(start, offset): - if offset is not None: - return start + timedelta(seconds=offset) - else: - return None - - -class SimOutput(NamedTuple): +class SimTickOutput(NamedTuple): timestamp: datetime scheduler_sim_system: list[SchedulerSimSystem] scheduler_sim_jobs: list[SchedulerSimJob] @@ -40,263 +25,240 @@ class SimOutput(NamedTuple): power_history: list[SchedulerSimJobPowerHistory] -def get_scheduler( - system: SimSystem, - down_nodes = [], cooling_enabled = False, replay = False, - schedule_policy = 'fcfs', -): - if cooling_enabled and system != "frontier": - raise SimException("Cooling sim only supported for frontier") - - raps_config = ConfigManager(system_name = system).get_config() - if "FMU_PATH" in raps_config: - raps_config['FMU_PATH'] = str(PKG_PATH / raps_config['FMU_PATH']) - - down_nodes = [*raps_config['DOWN_NODES'], *down_nodes] - - power_manager = PowerManager(**raps_config) - flops_manager = FLOPSManager(**raps_config) - if cooling_enabled: - cooling_model = ThermoFluidsModel(**raps_config) - cooling_model.initialize() - else: - cooling_model = None - - return Scheduler( - power_manager = power_manager, - flops_manager = flops_manager, - cooling_model = cooling_model, - debug = False, replay = replay, - schedule = schedule_policy, - config = raps_config, - ) - - -def get_job_state_hash(job: SchedulerSimJob): +def get_job_hash(job: RapsJob): """ Return string that can be used to check if any meaningful state changed """ - return job.model_dump_json(exclude={"time_snapshot"}) - - -def run_simulation(sim_config: SimConfig): - sample_scheduler_sim_system = timedelta(seconds = 1).total_seconds() - # Sample CDU as fast as it is available - sample_cooling = timedelta(seconds = 1).total_seconds() - - # Keep record of how many power history steps we've emitted for each job - power_history_counts: dict[int, int] = {} - prev_jobs: dict[str, str] = {} + return orjson.dumps([ + str(job.id), + job.name, + job.nodes_required, + job.submit_time, + job.time_limit, + job.start_time, + job.end_time, + job.current_state.name, + # Node list shouldn't change once set so just do len instead of serializing the large list + len(job.scheduled_nodes) if job.scheduled_nodes else None, + ], option=orjson.OPT_SERIALIZE_NUMPY) + + +def snap_sample_rate(desired_rate: int, actual_rate: int): + """ + Returns a sample rate close to desired_rate, but is still divisible by actual_rate. + E.g. if power is being ticked every 3 seconds, but disired sample rate is 10, round it to 9. + """ + if actual_rate >= desired_rate: + return actual_rate + else: + return int(desired_rate / actual_rate) * actual_rate - if sim_config.scheduler.enabled: - if sim_config.scheduler.seed: - # TODO: This is globabl and should probably be done in RAPS - random.seed(sim_config.scheduler.seed) - np.random.seed(sim_config.scheduler.seed) - timesteps = math.ceil((sim_config.end - sim_config.start).total_seconds()) +def run_simulation(sim_config: ServerSimConfig): + if sim_config.replay: + if not isinstance(sim_config.system, str): + raise SimException(f"replay is not supported for custom systems") + dataloader = f"simulation_server.simulation.dataloaders.{sim_config.system}" + if not importlib.util.find_spec(dataloader): + raise SimException(f"{sim_config.system} does not support replay") + sim_config = sim_config.model_copy(update = {"dataloader": dataloader}) - sc = get_scheduler( - system = sim_config.system, - down_nodes = sim_config.scheduler.down_nodes, - cooling_enabled = sim_config.cooling.enabled, - replay = (sim_config.scheduler.jobs_mode == "replay"), - schedule_policy = sim_config.scheduler.schedule_policy, - ) - telemetry = Telemetry(system = sim_config.system, config = sc.config) + engine = Engine(sim_config) + running_stats = RunningStats(engine) - # Memoized function to convert raps indexes into node names. - # Memo increases performance since it gets called on snapshots of the same job multiple times. - @functools.lru_cache(maxsize = 65_536) - def _parse_nodes(node_indexes: tuple[int]): - return [telemetry.node_index_to_name(i) for i in node_indexes] + sample_system = 1 + sample_power = snap_sample_rate(5, int(sim_config.time_delta.total_seconds())) + sample_cooling = snap_sample_rate(5, int(sim_config.time_delta.total_seconds())) - if sim_config.scheduler.jobs_mode == "random": - num_jobs = sim_config.scheduler.num_jobs if sim_config.scheduler.num_jobs is not None else 1000 - workload = Workload(**sc.config) - jobs = workload.random(num_jobs=num_jobs) - elif sim_config.scheduler.jobs_mode == "test": - workload = Workload(**sc.config) - jobs = workload.test() - elif sim_config.scheduler.jobs_mode == "replay": - if sim_config.system not in DATA_LOADERS: - raise SimException(f"Replay not supported for {sim_config.system}") - logger.info("Fetching telemetry data...") - jobs = DATA_LOADERS[sim_config.system](sim_config, sc.config) - if len(jobs) == 0: - raise SimException(f"No data for {sim_config.system} {sim_config.start.isoformat()} -> {sim_config.end.isoformat()}") - logger.info(f"Fetched {len(jobs)} jobs") - elif sim_config.scheduler.jobs_mode == "custom": - raise SimException("Custom not supported") + def offset_to_time(offset): + if offset is not None: + return engine.start + timedelta(seconds=offset - engine.timestep_start) else: - raise SimException(f'Unknown jobs_mode "{sim_config.scheduler.jobs_mode}"') - - for data in sc.run_simulation(jobs, timesteps=timesteps): - timestamp: datetime = _offset_to_time(sim_config.start, data.current_time) - is_last_tick = (timestamp + timedelta(seconds=1) == sim_config.end) - - unix_timestamp = int(timestamp.timestamp()) - - scheduler_sim_system: list[SchedulerSimSystem] = [] - if unix_timestamp % sample_scheduler_sim_system == 0 or is_last_tick: - down_nodes = _parse_nodes(tuple(data.down_nodes)) - stats = sc.get_stats() - - scheduler_sim_system = [SchedulerSimSystem.model_validate(dict( + return None + + # Memoized function to convert raps indexes into node names. + # Memo increases performance since it gets called on snapshots of the same job multiple times. + @functools.lru_cache(maxsize = 65_536) + def parse_nodes(node_indexes: tuple[int]): + return [engine.telemetry.node_index_to_name(i) for i in node_indexes] + + @functools.lru_cache(maxsize = 16384) + def cdu_info(cdu_index: int): + cdu_name = engine.telemetry.cdu_index_to_name(cdu_index) + row, col = engine.telemetry.cdu_pos(cdu_index) + return cdu_name, row, col + + def parse_job(job: RapsJob, timestamp: datetime): + # Output jobs only if something changed + time_end = offset_to_time(job.end_time) + # end_time is set to its planned end once its scheduled. Set it to None for + # unfinished jobs here + if time_end is not None and (job.start_time is None or time_end > timestamp): + time_end = None + return SchedulerSimJob.model_validate({ + "job_id": str(job.id), + "name": job.name, + "node_count": job.nodes_required, + "time_snapshot": timestamp, + "time_submission": offset_to_time(job.submit_time), + "time_limit": job.time_limit, + "time_start": offset_to_time(job.start_time), + "time_end": time_end, + "state_current": JobStateEnum(job.current_state.name), + "nodes": parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None, + # How does the new job.power attribute work? Is it total_energy? + # Or just the current wattage? + # power = job.power, + }) + + job_hashes: dict[int, bytes] = {} + # Keep record of how many power history steps we've emitted for each job + job_power_history_counts: dict[int, int] = {} + + for tick in engine.run_simulation(): + timestamp: datetime = offset_to_time(tick.current_timestep) + unix_timestamp = int(timestamp.timestamp()) + is_last_tick = (timestamp + timedelta(seconds=1) >= sim_config.end) + + scheduler_sim_system: list[SchedulerSimSystem] = [] + if unix_timestamp % sample_system == 0 or is_last_tick: + down_nodes = parse_nodes(tuple(tick.down_nodes)) + stats = running_stats.get_stats() + + scheduler_sim_system = [SchedulerSimSystem.model_validate({ + "timestamp": timestamp, + "down_nodes": down_nodes, + "num_samples": stats['num_samples'], + + "jobs_completed": engine.jobs_completed, + "jobs_running": len(tick.running), + "jobs_pending": len(tick.queue), + "throughput": stats["throughput"], + + "average_power": stats['average_power'] * 1_000_000, + "min_loss": stats['min_loss'] * 1_000_000, + "average_loss": stats['average_loss'] * 1_000_000, + "max_loss": stats['max_loss'] * 1_000_000, + "system_power_efficiency": stats['system_power_efficiency'], + "total_energy_consumed": stats['total_energy_consumed'], + "carbon_emissions": stats['carbon_emissions'], + "total_cost": stats['total_cost'], + + "p_flops": tick.p_flops, + "g_flops_w": tick.g_flops_w, + "system_util": tick.system_util, + })] + + scheduler_sim_jobs: list[SchedulerSimJob] = [] + power_history: list[SchedulerSimJobPowerHistory] = [] + + # Only output running jobs when the state changes + for job in tick.queue: + # Just use a constant as hash for queued jobs to avoid computing the hash repeatedly for + # them. This assumes queued jobs don't change any meaningful state until they run + job_hash = b"queued" + if is_last_tick or job_hashes.get(job.id) != job_hash: + scheduler_sim_jobs.append(parse_job(job, timestamp)) + job_hashes[job.id] = job_hash + for job in tick.running: + job_hash = get_job_hash(job) + if is_last_tick or job_hashes.get(job.id) != job_hash: + scheduler_sim_jobs.append(parse_job(job, timestamp)) + job_hashes[job.id] = job_hash + for job in itertools.chain(tick.completed, tick.killed): + scheduler_sim_jobs.append(parse_job(job, timestamp)) + job_hashes.pop(job.id, None) + + for job in itertools.chain(tick.running, tick.completed, tick.killed): + if job_power_history_counts.get(job.id, 0) < len(job.power_history): + power_history.append(SchedulerSimJobPowerHistory( timestamp = timestamp, - down_nodes = down_nodes, - # TODO: Update sc.get_stats to return more easily parsable data - num_samples = int(stats['num_samples']), - - jobs_completed = int(stats['jobs completed']), - jobs_running = len(stats['jobs still running']), - jobs_pending = len(stats['jobs still in queue']), - - throughput = float(stats['throughput'].split(' ')[0]), - average_power = float(stats['average power'].split(' ')[0]) * 1_000_000, - min_loss = float(stats['min loss'].split(' ')[0]) * 1_000_000, - average_loss = float(stats['average loss'].split(' ')[0]) * 1_000_000, - max_loss = float(stats['max loss'].split(' ')[0]) * 1_000_000, - system_power_efficiency = float(stats['system power efficiency']), - total_energy_consumed = float(stats['total energy consumed'].split(' ')[0]), - carbon_emissions = float(stats['carbon emissions'].split(' ')[0]), - total_cost = float(stats['total cost'].removeprefix("$")), - - p_flops = data.p_flops, - g_flops_w = data.g_flops_w, - system_util = data.system_util, - ))] - - scheduler_sim_jobs: list[SchedulerSimJob] = [] - curr_jobs = {} - data_jobs = data.completed + data.running + data.queue - for job in data_jobs: - # end_time is set to its planned end once its scheduled. Set it to None for unfinished jobs here - if job.start_time is not None: - time_end = _offset_to_time(sim_config.start, job.end_time) - else: - time_end = None - parsed_job = SchedulerSimJob.model_validate(dict( job_id = str(job.id), - name = job.name, - node_count = job.nodes_required, - time_snapshot = timestamp, - time_submission = _offset_to_time(sim_config.start, job.submit_time), - time_limit = job.wall_time, - time_start = _offset_to_time(sim_config.start, job.start_time), - time_end = time_end, - state_current = JobStateEnum(job.state.name), - nodes = _parse_nodes(tuple(job.scheduled_nodes)), - # How does the new job.power attribute work? Is it total_energy? - # Or just the current wattage? - # power = job.power, + power = job.power_history[-1], )) - job_state_hash = get_job_state_hash(parsed_job) - - # Output jobs if something other than time_snapshot changed - if is_last_tick or prev_jobs.get(parsed_job.job_id) != job_state_hash: - scheduler_sim_jobs.append(parsed_job) - curr_jobs[parsed_job.job_id] = job_state_hash - prev_jobs = curr_jobs - - power_history: list[SchedulerSimJobPowerHistory] = [] - for job in data_jobs: - if job.id and power_history_counts.get(job.id, 0) < len(job.power_history): - power_history.append(SchedulerSimJobPowerHistory( - timestamp = timestamp, - job_id = str(job.id), - power = job.power_history[-1], - )) - power_history_counts[job.id] = len(job.power_history) - - cooling_sim_cdus: list[CoolingSimCDU] = [] - cooling_sim_cep: list[CoolingSimCEP] = [] - - cooling_sim_cdu_map: dict[int, dict] = {} - if data.power_df is not None and (unix_timestamp % sample_cooling == 0 or is_last_tick): - for i, point in data.power_df.iterrows(): - cooling_sim_cdu_map[int(point['CDU'])] = dict( - rack_1_power = point['Rack 1'], - rack_2_power = point['Rack 2'], - rack_3_power = point['Rack 3'], - total_power = point['Sum'], - - rack_1_loss = point['Loss 1'], - rack_2_loss = point['Loss 2'], - rack_3_loss = point['Loss 3'], - total_loss = point['Loss'], - ) - - if data.fmu_outputs: - fmu_data = nest_dict({**data.fmu_outputs}) - - # CDU columns are output in the dict with keys like this: - # "simulator[1].datacenter[1].computeBlock[1].cdu[1].summary.m_flow_prim" - # "simulator[1].datacenter[1].computeBlock[1].cdu[1].summary.V_flow_prim_GPM" - # "simulator[1].datacenter[1].computeBlock[2].cdu[1].summary.m_flow_prim" - # "simulator[1].datacenter[1].computeBlock[2].cdu[1].summary.V_flow_prim_GPM" - # etc. - - cdus_data = fmu_data['simulator'][1]['datacenter'][1]['computeBlock'] - for cdu, cdu_data in cdus_data.items(): - cdu_data = cdu_data['cdu'][1]['summary'] - cooling_sim_cdu_map[cdu].update( - work_done_by_cdup = cdu_data['W_flow_CDUP_kW'], - rack_return_temp = cdu_data['T_sec_r_C'], - rack_supply_temp = cdu_data['T_sec_s_C'], - rack_supply_pressure = cdu_data['p_sec_s_psig'], - rack_return_pressure = cdu_data['p_sec_r_psig'], - rack_flowrate = cdu_data['V_flow_sec_GPM'], - facility_return_temp = cdu_data["T_prim_r_C"], - facility_supply_temp = cdu_data['T_prim_s_C'], - facility_supply_pressure = cdu_data['p_prim_s_psig'], - facility_return_pressure = cdu_data['p_prim_r_psig'], - facility_flowrate = cdu_data['V_flow_prim_GPM'], - ) - - cep_data = fmu_data['simulator'][1]['centralEnergyPlant'][1] - cooling_sim_cep = [CoolingSimCEP.model_validate(dict( - timestamp = timestamp, - htw_flowrate = cep_data['hotWaterLoop'][1]['summary']['V_flow_htw_GPM'], - ctw_flowrate = cep_data['coolingTowerLoop'][1]['summary']['V_flow_ctw_GPM'], - htw_return_pressure = cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_r_psig'], - htw_supply_pressure = cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_s_psig'], - ctw_return_pressure = cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_r_psig'], - ctw_supply_pressure = cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_s_psig'], - htw_return_temp = cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'], - htw_supply_temp = cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_s_C'], - ctw_return_temp = cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_r_C'], - ctw_supply_temp = cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_s_C'], - power_consumption_htwps = cep_data['hotWaterLoop'][1]['summary']['W_flow_HTWP_kW'], - power_consumption_ctwps = cep_data['coolingTowerLoop'][1]['summary']['W_flow_CTWP_kW'], - power_consumption_fan = cep_data['coolingTowerLoop'][1]['summary']['W_flow_CT_kW'], - htwp_speed = cep_data['hotWaterLoop'][1]['summary']['N_HTWP'], - nctwps_staged = cep_data['coolingTowerLoop'][1]['summary']['n_CTWPs'], - nhtwps_staged = cep_data['hotWaterLoop'][1]['summary']['n_HTWPs'], - pue_output = fmu_data['pue'], - nehxs_staged = cep_data['hotWaterLoop'][1]['summary']['n_EHXs'], - ncts_staged = cep_data['coolingTowerLoop'][1]['summary']['n_CTs'], - facility_return_temp = cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'], - cdu_loop_bypass_flowrate = fmu_data['simulator'][1]['datacenter'][1]['summary']['V_flow_bypass_GPM'], - ))] - - - for cdu_index, cdu_data in cooling_sim_cdu_map.items(): - cdu_name = telemetry.cdu_index_to_name(cdu_index) - row, col = telemetry.cdu_pos(cdu_index) - cdu_data.update( - timestamp = timestamp, - name = cdu_name, - row = row, - col = col, - ) - cooling_sim_cdus.append(CoolingSimCDU.model_validate(cdu_data)) - - yield SimOutput( - timestamp = timestamp, - scheduler_sim_system = scheduler_sim_system, - scheduler_sim_jobs = scheduler_sim_jobs, - cooling_sim_cdus = cooling_sim_cdus, - cooling_sim_cep = cooling_sim_cep, - power_history = power_history, - ) - else: - raise SimException("No simulations specified") + job_power_history_counts[job.id] = len(job.power_history) + + cooling_sim_cdus: list[CoolingSimCDU] = [] + cooling_sim_cep: list[CoolingSimCEP] = [] + + cooling_sim_cdu_map: dict[int, dict] = {} + if tick.power_df is not None and (is_last_tick or unix_timestamp % sample_power == 0): + # TODO: RAPS supports any number of racks per CDU, while this is still hard-coded to the + # 3 in Frontier. This will work for any system with 3 or less. We need to rethink how + # the racks are stored in the DB, maybe a separate table + for i, point in tick.power_df.iterrows(): + cooling_sim_cdu_map[int(point['CDU'])] = { + "rack_1_power": point.get('Rack 1'), + "rack_2_power": point.get('Rack 2'), + "rack_3_power": point.get('Rack 3'), + "total_power": point['Sum'], + "rack_1_loss": point.get('Loss 1'), + "rack_2_loss": point.get('Loss 2'), + "rack_3_loss": point.get('Loss 3'), + "total_loss": point['Loss'], + } + + if tick.fmu_outputs and (is_last_tick or unix_timestamp % sample_cooling == 0): + # CDU columns are output in the dict with keys like this: + # "simulator[1].datacenter[1].computeBlock[1].cdu[1].summary.m_flow_prim" + # "simulator[1].datacenter[1].computeBlock[1].cdu[1].summary.V_flow_prim_GPM" + # "simulator[1].datacenter[1].computeBlock[2].cdu[1].summary.m_flow_prim" + # "simulator[1].datacenter[1].computeBlock[2].cdu[1].summary.V_flow_prim_GPM" + # nest_dict will un-flatten it + fmu_data = nest_dict({**tick.fmu_outputs}) + + cdus_data = fmu_data['simulator'][1]['datacenter'][1]['computeBlock'] + for cdu, cdu_data in cdus_data.items(): + cdu_data = cdu_data['cdu'][1]['summary'] + cooling_sim_cdu_map[cdu] = { + **cooling_sim_cdu_map.get(cdu, {}), + "work_done_by_cdup": cdu_data['W_flow_CDUP_kW'], + "rack_return_temp": cdu_data['T_sec_r_C'], + "rack_supply_temp": cdu_data['T_sec_s_C'], + "rack_supply_pressure": cdu_data['p_sec_s_psig'], + "rack_return_pressure": cdu_data['p_sec_r_psig'], + "rack_flowrate": cdu_data['V_flow_sec_GPM'], + "facility_return_temp": cdu_data["T_prim_r_C"], + "facility_supply_temp": cdu_data['T_prim_s_C'], + "facility_supply_pressure": cdu_data['p_prim_s_psig'], + "facility_return_pressure": cdu_data['p_prim_r_psig'], + "facility_flowrate": cdu_data['V_flow_prim_GPM'], + } + + cep_data = fmu_data['simulator'][1]['centralEnergyPlant'][1] + cooling_sim_cep = [CoolingSimCEP.model_validate({ + "timestamp": timestamp, + "htw_flowrate": cep_data['hotWaterLoop'][1]['summary']['V_flow_htw_GPM'], + "ctw_flowrate": cep_data['coolingTowerLoop'][1]['summary']['V_flow_ctw_GPM'], + "htw_return_pressure": cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_r_psig'], + "htw_supply_pressure": cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_s_psig'], + "ctw_return_pressure": cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_r_psig'], + "ctw_supply_pressure": cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_s_psig'], + "htw_return_temp": cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'], + "htw_supply_temp": cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_s_C'], + "ctw_return_temp": cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_r_C'], + "ctw_supply_temp": cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_s_C'], + "power_consumption_htwps": cep_data['hotWaterLoop'][1]['summary']['W_flow_HTWP_kW'], + "power_consumption_ctwps": cep_data['coolingTowerLoop'][1]['summary']['W_flow_CTWP_kW'], + "power_consumption_fan": cep_data['coolingTowerLoop'][1]['summary']['W_flow_CT_kW'], + "htwp_speed": cep_data['hotWaterLoop'][1]['summary']['N_HTWP'], + "nctwps_staged": cep_data['coolingTowerLoop'][1]['summary']['n_CTWPs'], + "nhtwps_staged": cep_data['hotWaterLoop'][1]['summary']['n_HTWPs'], + "pue_output": fmu_data['pue'], + "nehxs_staged": cep_data['hotWaterLoop'][1]['summary']['n_EHXs'], + "ncts_staged": cep_data['coolingTowerLoop'][1]['summary']['n_CTs'], + "facility_return_temp": cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'], + "cdu_loop_bypass_flowrate": fmu_data['simulator'][1]['datacenter'][1]['summary']['V_flow_bypass_GPM'], + })] + + for cdu_index, cdu_data in cooling_sim_cdu_map.items(): + cdu_name, row, col = cdu_info(cdu_index) + cdu_data.update(timestamp = timestamp, name = cdu_name, row = row, col = col) + cooling_sim_cdus.append(CoolingSimCDU.model_validate(cdu_data)) + + yield SimTickOutput( + timestamp = timestamp, + scheduler_sim_system = scheduler_sim_system, + scheduler_sim_jobs = scheduler_sim_jobs, + cooling_sim_cdus = cooling_sim_cdus, + cooling_sim_cep = cooling_sim_cep, + power_history = power_history, + ) diff --git a/simulation_server/util/dataloader.py b/simulation_server/util/dataloader.py new file mode 100644 index 0000000000000000000000000000000000000000..b494ca1f0adbb4c445a18f232ca2f5b2271676e3 --- /dev/null +++ b/simulation_server/util/dataloader.py @@ -0,0 +1,35 @@ +from datetime import datetime, timedelta +import pandas as pd +import numpy as np +import sqlalchemy as sqla +from .druid import to_timestamp, get_table +from ..simulation import SimException + + +def query_time_range( + tbl_name: str, + start: datetime, end: datetime, + start_col: str, end_col: str, *, + druid_engine, parse_dates: list[str] = [], +) -> pd.DataFrame: + """ Queries a time range in druid. Returns a dataframe, throws if empty. """ + tbl = get_table(tbl_name, druid_engine) + query = ( + sqla.select(sqla.text("*")) + .where( + # __time is submission time + (tbl.c['__time'] <= to_timestamp(end)) & + (tbl.c['__time'] >= to_timestamp(start - timedelta(days=7))) & + (tbl.c[start_col] <= to_timestamp(end)) & + (tbl.c[end_col] >= to_timestamp(start)) + ) + ) + df = pd.read_sql(query, druid_engine, parse_dates=parse_dates) + if len(df) == 0: + raise SimException(f"No data found for {start.isoformat()} -> {end.isoformat()}") + return df + + +def split_list(x): + x = x.split(",") if x else [] + return np.array([int(x) for x in x]) diff --git a/simulation_server/util/druid.py b/simulation_server/util/druid.py index 2e49231172512e432716a38c226a00b66b9c4387..52377d76c11ee21b30927e640892fd51bb22a015 100644 --- a/simulation_server/util/druid.py +++ b/simulation_server/util/druid.py @@ -8,7 +8,6 @@ from loguru import logger import sqlalchemy as sqla from sqlalchemy.sql import ColumnElement from .misc import to_iso_duration -# from ..config import get_app_settings def get_druid_engine(**kwargs): @@ -123,6 +122,23 @@ earliest = _size_func(sqla.func.earliest) earliest_py = _size_func(sqla.func.earliest_py) +def table_is_ready(engine, tbl: str): + """ + sqla.inspect(conn.engine) returns tables that have streaming ingestion but no data yet and so + still cause errors. Only workaround to check for this I've found is to just check for the error. + + This only seems to be an issue on older versions of Druid + """ + try: + return sqla.inspect(engine).has_table(tbl) + except Exception as e: + # druid throws errors like "has_table() got an unexpected keyword argument 'info_cache'" + if "has_table" in str(e): + return False + else: + raise e + + def execute_ignore_missing(conn, stmt) -> sqla.CursorResult: """ Wrapper conn.execute that handles missing tables. @@ -130,17 +146,18 @@ def execute_ignore_missing(conn, stmt) -> sqla.CursorResult: cursor. Note this may have unexpected results if you have joins/aggregations/etc that would have returned data with an empty table. """ - # try: - return conn.execute(stmt) - # except Exception as e: - # existing_tables = set(sqla.inspect(conn.engine).get_table_names()) - # stmt_tables = set([t.name for t in stmt.get_final_froms()]) - # missing_tables = stmt_tables - existing_tables - # if missing_tables: - # logger.info(f"table(s) {', '.join(stmt_tables)} missing, returning empty result") - # return conn.execute(sqla.text("SELECT 1 FROM (VALUES (1)) AS tbl(a) WHERE 1 != 1")) - # else: - # raise e + try: + return conn.execute(stmt) + except Exception as e: + try: + stmt_tables = set([t.name for t in stmt.get_final_froms()]) + missing_tables = [tbl for tbl in stmt_tables if not table_is_ready(conn.engine, tbl)] + if missing_tables: + logger.info(f"table(s) {', '.join(missing_tables)} missing, returning empty result") + return conn.execute(sqla.text("SELECT 1 FROM (VALUES (1)) AS tbl(a) WHERE 1 != 1")) + except: + pass # Just raise the original error + raise e def submit_ingest(ingest: dict): diff --git a/simulation_server/util/es.py b/simulation_server/util/es.py index 32caa98f54dbc34db2f7b8fb0c994278aa988fc5..979441a962235a99cc400c738f7635c57e64849e 100644 --- a/simulation_server/util/es.py +++ b/simulation_server/util/es.py @@ -1,45 +1,53 @@ """ Connection to Cadence ES """ -import os, json -import urllib.parse -from datetime import datetime -import sqlalchemy as sqla -from sqlalchemy.engine import Engine, create_engine +import os from elasticsearch import Elasticsearch -from es.elastic.sqlalchemy import ESDialect +import tenacity -def get_nccs_cadence_engine(**kwargs) -> Engine: - import sqlalchemy.types as types - from sqlalchemy.ext.compiler import compiles - - # For some reason sqla/pydruid renders `cast(col, sqla.TIMESTAMP)` to `CAST(col AS LONG)`. This - # is a manual override to make sqla render them properly. - cast_fixes = { - types.TIMESTAMP: "TIMESTAMP", - } - - for (sqla_type, override) in cast_fixes.items(): - compiles(sqla_type, "elasticsearch")(lambda type_, compiler, override=override, **kw: override) - - # We need to set retry_on_status to work around intermittent 401 errors from Cadence ES. - # The query params will get passed to the Elasticsearch client, but only some specific - # ones get parsed and the rest are left as strings. This monkey patch hacks elasticsearch-dbapi - # to parse retry_on_status. We can remove this if the AM team fixes the auth errors - import es.basesqlalchemy - es.basesqlalchemy.BaseESDialect._map_parse_connection_parameters['retry_on_status'] = json.loads - - URL = urllib.parse.urlparse(os.environ["NCCS_CADENCE_URL"]) - HOST, PORT = URL.netloc.split(":") +def get_nccs_cadence_es(): + URL = os.environ["NCCS_CADENCE_URL"] USER = os.environ["NCCS_CADENCE_USER"] PASSWORD = os.environ["NCCS_CADENCE_PASSWORD"] - # These get passed through to the internal Elasticsearch instance - QUERY_PARAMS = 'use_ssl=false&ssl_show_warn=false&verify_certs=false&retry_on_status=[502,503,504,401]' - - engine = create_engine(f'elasticsearch+{URL.scheme}://{USER}:{PASSWORD}@{HOST}:{PORT}{URL.path}?{QUERY_PARAMS}', **kwargs) - return engine - - -def to_timestamp(val: datetime): - return sqla.func.convert(val.isoformat(), sqla.literal_column('TIMESTAMP')) + return Elasticsearch( + URL, + http_auth=(USER, PASSWORD), + # TODO: we need to fix the self-signed certs on ES + use_ssl=False, + ssl_show_warn=False, + verify_certs=False, + ) + + +def es_sql_query(client: Elasticsearch, query: str, params: list = [], fetch_size = 100): + """ + Runs an SQL query against ES. Use `?` format for SQL params. + """ + # Cadence ES is a bit flaky with intermittent 401 errors + @tenacity.retry( + stop = tenacity.stop_after_attempt(5), + wait = tenacity.wait_exponential(multiplier=0.5, min=1, max=30), + reraise = True, + ) + def _retry_query(query, params, cursor = None): + body = { + "query": query, + "params": params, + "fetch_size": fetch_size, + } + if cursor: + body["cursor"] = cursor + return client.sql.query(format = 'json', body = body) + + response = _retry_query(query, params) + rows = response['rows'] + cursor = response.get("cursor") + columns = [c['name'] for c in response['columns']] + while cursor: + response = _retry_query(query, params, cursor) + rows.extend(response['rows']) + cursor = response.get("cursor") + + rows = [dict(zip(columns, row)) for row in rows] + return rows diff --git a/simulation_server/util/k8s.py b/simulation_server/util/k8s.py index 58c8fcf0cf68f9157006feefc307c7e4636b4e3a..bf67a38192a3b223a8119206ab7e72ef17ab7d3e 100644 --- a/simulation_server/util/k8s.py +++ b/simulation_server/util/k8s.py @@ -17,7 +17,7 @@ def submit_job(job: dict): return get_batch_api().create_namespaced_job(namespace = get_namespace(), body = job) -def get_job(name: str): +def get_k8s_job(name): try: return get_batch_api().read_namespaced_job(namespace = get_namespace(), name = name) except k8s.client.ApiException as e: @@ -27,7 +27,7 @@ def get_job(name: str): raise e -def get_job_state(job): +def get_k8s_job_state(job): if job: if job.status.succeeded: return 'success' @@ -39,6 +39,6 @@ def get_job_state(job): return 'deleted' -def get_job_end_time(job): +def get_k8s_job_end_time(job): # completion_time for failed jobs is null return job.status.completion_time or job.status.conditions[-1].last_transition_time diff --git a/simulation_server/util/kafka.py b/simulation_server/util/kafka.py index af3e510583984468756312e4af6a62b6d1a66db3..b366195368f5ddc23a8e9ab8f43e2fc46cc259f5 100644 --- a/simulation_server/util/kafka.py +++ b/simulation_server/util/kafka.py @@ -1,31 +1,32 @@ import os -from kafka import KafkaProducer, KafkaConsumer -import functools +from confluent_kafka import Producer, Consumer +from confluent_kafka.admin import AdminClient -@functools.cache -def get_kafka_producer(**configs): - env_configs = { +def _get_kafka_config(): + env_config = { # Pick-up credentials from the context - 'bootstrap_servers': [os.environ['KAFKA_BOOTSTRAP']], - 'sasl_mechanism': os.environ.get('KAFKA_SASL_MECHANISM'), - 'sasl_plain_username': os.environ.get('KAFKA_SASL_USERNAME'), - 'sasl_plain_password': os.environ.get('KAFKA_SASL_PASSWORD'), - 'security_protocol': os.environ.get('KAFKA_SECURITY_PROTOCOL'), + 'bootstrap.servers': os.environ['KAFKA_BOOTSTRAP'], + 'sasl.mechanism': os.environ.get('KAFKA_SASL_MECHANISM'), + 'security.protocol': os.environ.get('KAFKA_SECURITY_PROTOCOL'), + 'sasl.username': os.environ.get('KAFKA_SASL_USERNAME'), + 'sasl.password': os.environ.get('KAFKA_SASL_PASSWORD'), } - env_configs = {k: v for k, v in env_configs.items() if v is not None} - return KafkaProducer(**{**env_configs, **configs}) + return {k: v for k, v in env_config.items() if v is not None} -@functools.cache -def get_kafka_consumer(*topics, **configs): - env_configs = { - # Pick-up credentials from the context - 'bootstrap_servers': [os.environ['KAFKA_BOOTSTRAP']], - 'sasl_mechanism': os.environ.get('KAFKA_SASL_MECHANISM'), - 'sasl_plain_username': os.environ.get('KAFKA_SASL_USERNAME'), - 'sasl_plain_password': os.environ.get('KAFKA_SASL_PASSWORD'), - 'security_protocol': os.environ.get('KAFKA_SECURITY_PROTOCOL'), - } - env_configs = {k: v for k, v in env_configs.items() if v is not None} - return KafkaConsumer(*topics, **{**env_configs, **configs}) +def get_kafka_producer(config = {}): + # Use confluent_kafka as it has significantly better producer performance + # I think that kafka.KafkaProducer sends messages in a background thread so it still blocks the + # GIL, while confluent_kafka is using some kind c bindings internally which avoid that. + return Producer({**_get_kafka_config(), **config}) + + +def get_kafka_consumer(*topics, config = {}): + consumer = Consumer({**_get_kafka_config(), **config}) + consumer.subscribe(list(topics)) + return consumer + + +def get_kafka_admin(config = {}): + return AdminClient({**_get_kafka_config(), **config})