Commit 4a989056 authored by Fredrik Wendt's avatar Fredrik Wendt

initial-ish commit

parents
Pipeline #290 failed with stages
.env
venv/
__pycache__
.idea
What's in this repo?
====================
The first part of this solution is found in https://gitlab.wendt.io/platform/emailgw-wendt-io
That "first part" receives email from Gmail via Cloud Mailin, and publishes that JSON payload base64 encoded to redis.
This repo contains "part two," which consists of:
* a wrapper that SUBSCRIBEs to redis pubsub topic email
* the wrapper looks at the incoming email and determines what to do with it
* if it's from SDO, and it's about grading, then
* `snatcher.py` is executed, which will log in and try to grab assessment grading work
* notify `push.wendt.io` of the grading work snatched
This whole thing is wrapped in a Docker container, set to restart on stop.
Any exception = sys.exit = restart.
There are two manual util tools to manually trigger this whole thing:
* 1 - talks to redis
* 2 - talks to emailgw.wendt.io
Development
-------------
On Mac OS X, to get access to `redis.wendt.vpn`, set up a TCP port forwarder with SSH:
ssh -L 6379:10.20.30.44:6379 water.wendt.io
#!/usr/bin/env bash
# version 201911270815
set -e
function main {
init
prepare
build
setup_networkz
redeploy
notify_redeploy
}
function init {
export COMPOSE_PROJECT_NAME=sdo-grading-job
export COMPOSE_API_VERSION=auto
export PROJECT=$(basename $(dirname "$(readlink -f "$0")"))
export BASE_DIR=$(pwd)
export TARGET_DIR=$BASE_DIR
}
function prepare {
loggit "Preparing build & deployment environment"
# make_clean_target_dir
check_for_upstream_certificates
}
function make_clean_target_dir {
cd $BASE_DIR
if [ -d $TARGET_DIR ] ; then rm -rf $TARGET_DIR ; fi
mkdir -p $TARGET_DIR
}
function build {
loggit "Building"
cd $TARGET_DIR
if [ -f Dockerfile ] ; then
echo "Building base image $COMPOSE_PROJECT_NAME:latest"
docker build --no-cache -t $COMPOSE_PROJECT_NAME:latest -f Dockerfile .
fi
docker-compose pull
docker-compose build
}
function setup_networkz {
loggit "Setting up docker networking"
# no good way of doing this without exit code != 0
set +e
docker network create internetz > /dev/null
docker network ls
set -e
}
function redeploy {
loggit "Deploying"
cd $TARGET_DIR
echo
echo =======================================================================
echo Running docker containers BEFORE
echo -----------------------------------------------------------------------
docker ps
sleep 2
echo -----------------------------------------------------------------------
echo "Starting new containers"
docker-compose down
docker-compose up -d
sleep 5
echo
echo =======================================================================
echo Running docker containers AFTER
echo -----------------------------------------------------------------------
docker ps
sleep 2
echo -----------------------------------------------------------------------
echo
}
function notify_redeploy {
loggit "Sending notification of deploy"
cd $BASE_DIR
GIT_MSG=$(git log -1 --pretty=%B)
MSG="$GIT_MSG $GO_PIPELINE_COUNTER.$GO_STAGE_COUNTER"
curl -m 5 -fsSL https://push.wendt.io/ \
-d "message=$PROJECT redeployed: $MSG" \
-d url=$PUSH_URL || echo "Notification failed"
}
function loggit {
echo ">>>"
echo -n ">>> STEP: "
echo "$@"
echo ">>>"
}
function inject_vpn_ip_into_docker_compose_file {
loggit "Injecting VPN IP into Docker Compose file"
HOST=$(hostname --short)
VPNHOST=${HOST}.wendt.vpn
VPNIP=$(host $VPNHOST | grep address | tail -n 1 | cut -d ' ' -f 4)
if [ -z "$VPNIP" ] ; then
echo "Couldn't resolve $VPNHOST to an IP address"
exit 1
fi
if [ "$HOME" = "/home/ceda" ] ; then
DEBUG=true
fi
if [ "$DEBUG" ] ; then
clear
echo
echo "LOCAL DEVELOPMENT DETECTED"
echo
VPNIP=0.0.0.0
fi
echo -n "Creating build dir: "
BUILD_DIR=build/${COMPOSE_PROJECT_NAME}
mkdir -p $BUILD_DIR
echo "$BUILD_DIR"
echo -n "Prepping source files: "
echo -n "${COMPOSE_PROJECT_NAME} config "
cp -r src/docker/images/ $BUILD_DIR
mkdir -p $BUILD_DIR/build
echo -n "docker-compose.yml "
sed -e "s/__VPNIP__/$VPNIP/" $BASE_DIR/src/docker/runtime/docker-compose.yml.template > $BUILD_DIR/docker-compose.yml
echo "done"
tree $BASE_DIR/$BUILD_DIR
export TARGET_DIR=$BASE_DIR/$BUILD_DIR
}
function check_for_upstream_certificates {
if [ -d upstream/ ] ; then
echo "USING CERTIFICATES FROM UPSTREAM JOB (folder upstream)"
cp -r upstream/certificates/* proxy/certificates/
else
echo "USING CERTIFICATES FROM git REPO - old?"
fi
}
main
version: '3.8'
services:
snatcher:
build:
context: .
dockerfile: docker/Dockerfile
restart: always
FROM mcr.microsoft.com/playwright/python:v1.52.0-noble
WORKDIR /app
CMD ["python3", "wrapper.py"]
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends python3-venv && rm -rf /var/lib/apt/lists/*
# Create and activate virtual environment
RUN python -m venv /venv
ENV PATH="/venv/bin:$PATH"
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY .env /app/
COPY *.py /app/
LOGIN_USER=some@one.place
LOGIN_PASS=abcDEF123
import base64
import os
import time
import redis
import json
if __name__ == '__main__':
print("Manually causing a run")
host = 'localhost' if os.getenv('PYCHARM_HOSTED') else 'redis.wendt.vpn'
redis_client = redis.Redis(host=host, port=6379, decode_responses=True)
ts = int(time.time())
print(f"Sending {ts}")
data = {
'headers': { 'subject': f'PSFAKE III Grading Required: fake-{ts}'}
}
json_data = json.dumps(data)
base64_data = base64.b64encode(json_data.encode("utf-8"))
redis_client.publish('email', base64_data)
import requests
if __name__ == '__main__':
print("Manually causing a run")
requests.post("https://emailgw.wendt.io/email", json={"test": True})
How to:
```
screen
python3 -m venv venv
source venv/bin/activate
pip install requests python-dotenv
echo "[]" > ids_seen.json
python3 job.py
```
```
checking for new email 2024-04-05T01:00:06.099477
Setting up new connection to gmail.com
found 7 emails
```
IMAP_USER=some@one.here
IMAP_PASS=someTHINGthere
API_46ELKS_USER=ua..........
API_46ELKS_PASS=B3..........
from dotenv import load_dotenv
import email
import imaplib
import json
import os
import time
import datetime
import requests
class Checker:
# Function to fetch emails with a specific label
def __init__(self):
self.api_46elks_user = os.environ['API_46ELKS_USER']
self.api_46elks_pass = os.environ['API_46ELKS_PASS']
self.imap_user = os.environ['IMAP_USER']
self.imap_pass = os.environ['IMAP_PASS']
self.mail = None
def notify_fredrik_via_sms(self, message='Level III Grading Opportunity\n\nGo to https://eco.scrum.org/grading/results'):
print("new grading opportunity")
fields = {
'from': '46702778511',
'to': '46702778511',
'message': message
}
response = requests.post("https://api.46elks.com/a1/SMS", data=fields,
auth=(self.api_46elks_user, self.api_46elks_pass))
try:
if response.json()['status'] == "created":
print("SMS sent")
except ValueError:
print(f"SMS fail: {response.content}")
def check_for_new_email(self):
print(f"checking for new email {datetime.datetime.now().isoformat()}")
if not self.mail:
self.setup_mail_connection()
(typ, [data]) = self.mail.select("\"" + "Scrum.org/Level III Grading" + "\"")
if typ != "OK":
raise Exception(f"Hmm {typ} was not 'OK'")
count = int(data)
print(f"found {count} emails")
if count == 0:
return
# Search for emails in the selected label
result, data = self.mail.search(None, "ALL")
string_of_ids = data[0].decode("ascii", "ignore")
ids = string_of_ids.split()
ids_seen = set()
new_ids = set()
try:
id_data = open("ids_seen.json").read()
ids_seen.update(json.loads(id_data))
except:
pass
for email_id in ids:
result, data = self.mail.fetch(email_id, "(RFC822)")
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
msg_id = msg["Message-ID"]
if msg_id in ids_seen:
continue
new_ids.add(msg_id)
if len(new_ids) > 0:
self.notify_fredrik_via_sms()
ids_seen.update(new_ids)
print(f"Saving {ids_seen}")
open("ids_seen.json", "w").write(json.dumps(list(ids_seen)))
self.mail.close()
self.mail.logout()
self.mail = None
def run(self):
error_count = 0
while error_count < 3:
try:
self.check_for_new_email()
error_count = 0
except Exception as e:
print(f"Error: {e}")
try:
self.mail.close()
except:
pass
self.mail = None
error_count += 1
time.sleep(60 * (error_count + 1))
time.sleep(60)
self.notify_fredrik_via_sms('Exiting grading monitoring')
print("3 errors, aborting")
def setup_mail_connection(self):
print("Setting up new connection to gmail.com")
self.mail = imaplib.IMAP4_SSL('imap.gmail.com')
self.mail.login(self.imap_user, self.imap_pass)
if __name__ == '__main__':
load_dotenv()
print("""How to:
screen
python3 -m venv venv
source venv/bin/activate
pip install requests python-dotenv
cp example.env .env
python3 job.py
""")
Checker().run()
pipelines:
sdo-grading-job-snatcher:
group: Professional
materials:
source:
git: ssh://git@gitlab.wendt.vpn:5522/professional/sdo-grading-job-snatcher.git
stages:
- deploy:
resources:
- trumpet
tasks:
- exec:
command: ./deploy.sh
from dotenv import load_dotenv
import logging
import os
import sys
import requests
from playwright.sync_api import sync_playwright
import time
log = None
development_mode = os.getenv('PYCHARM_HOSTED')
load_env()
login_user = os.environ['LOGIN_USER']
login_pass = os.environ['LOGIN_PASS']
def snatch_grading_work(headless=None):
global log
if log is None:
log = logging.getLogger('snatcher')
if headless is None:
headless = False if development_mode else True
log.info(f"Snatching grading work ({headless})")
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=headless)
page = browser.new_page()
try:
log.info("Going over to SDO")
page.goto('https://eco.scrum.org/grading/results', wait_until='networkidle', timeout=30000)
# Check for password input and perform login if present
password_input = page.query_selector('input[id="password"]')
if password_input:
log.info("Logging in")
page.fill('input[id="password"]', login_pass)
page.fill('input[id="loginId"]', login_user)
page.click('button:has-text("Log In")')
# Wait for a specific XHR request to complete
with page.expect_response('https://as-sdo-eco-api-prod-eastus.azurewebsites.net/grading/results', timeout=30000) as response_info:
response = response_info.value
log.info(f"XHR request completed with status: {response.status}")
page.wait_for_selector('a:has-text("Grading Queue")', timeout=15000)
page.wait_for_selector('span:has-text("Result ID")', timeout=15000)
time.sleep(2)
# Don't claim if we already have one claimed
unclaim_buttons = page.query_selector_all('span:text-is("Release My Claim")')
unclaim_count = len(unclaim_buttons)
log.info(f"Release My Claim buttons: {unclaim_count}")
if unclaim_count > 1:
log.info("Not claiming yet another test")
else:
log.info("No grading jobs assigned to me, going ahead to snatch a new one")
# Verify the number of "Claim" buttons
claim_buttons = page.query_selector_all('span:text-is("Claim")')
claim_count = len(claim_buttons)
log.info(f"Claim buttons: {claim_count}")
if claim_count == 0:
log.info("No Claim button(s) found - nothing to do")
else:
log.info("Clicking Claim button 0")
claim_buttons[0].click()
# Wait for page to reload and number of "Claim" buttons to reduce by 1
start_time = time.time()
while True:
current_buttons = page.query_selector_all('span:text-is("Claim")')
if len(current_buttons) == claim_count - 1:
break
if time.time() - start_time > 30: # Timeout after 30 seconds
break
time.sleep(0.5) # Poll every 0.5 seconds
log.info(f"Clicked first 'Claim' button; 'Claim' buttons reduced to {claim_count - 1}")
requests.post("https://push.wendt.io", params={"message": "New grading work snatched!"})
except Exception as e:
log.error(f"Error: {e}")
if development_mode:
input("Press Enter to continue...")
browser.close()
playwright.stop()
log.info("Done snatching grading work")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
snatch_grading_work()
import base64
import datetime
import json
import logging
import os
import redis
import sys
from snatcher import snatch_grading_work
development_mode = os.getenv('PYCHARM_HOSTED')
def main():
log = logging.getLogger('wrapper')
log.info("Running main")
host = 'localhost' if development_mode else 'redis.wendt.vpn'
try:
client = redis.Redis(host=host, port=6379, decode_responses=True)
pubsub = client.pubsub()
pubsub.subscribe('email')
log.info("Subscribed to 'email' topic")
while True:
now = datetime.datetime.now(datetime.timezone.utc).isoformat(sep=' ', timespec='seconds')
log.info(f"Starting to wait for message at {now}")
try:
message = pubsub.get_message(ignore_subscribe_messages=True, timeout=60.0)
if message:
log.info("Message received")
# Process message if received
if message['type'] == 'message':
payload = message['data']
if development_mode:
log.info(payload)
decoded_payload = base64.b64decode(payload)
data = json.loads(decoded_payload)
if "headers" in data:
if "subject" in data["headers"]:
subject = data["headers"]["subject"]
log.info(f"Subject: {subject}")
if "III Grading Required" in subject:
log.info("Subject found - let's go snatching!")
snatch_grading_work()
else:
log.info("Ignoring this email")
except Exception as e:
log.error(f"Error processing message: {e}")
sys.exit(1)
except KeyboardInterrupt:
log.info("Shutting down")
sys.exit(0)
except redis.RedisError as e:
log.error(f"Redis connection error: {e}")
sys.exit(1)
except Exception as e:
log.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
main()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment