Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ target/packer/out
.stignore
pyrightconfig.json
TODO
run.sh
run*.sh
.env
__pycache__/
*.py[co]
client/warcupload/testdata
upload-space/
*.tar
PIPELINES
2 changes: 1 addition & 1 deletion client/worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN apt-get update \
&& apt-get clean

# 06fc825d3fbed9801110b2d3f562c44d72940862 is known to work.
RUN pip3 install --break-system-packages rethinkdb git+https://github.com/internetarchive/brozzler@06fc825d3fbed9801110b2d3f562c44d72940862 websockets doublethink yt-dlp aiofiles
RUN pip3 install --break-system-packages rethinkdb git+https://github.com/internetarchive/brozzler@06fc825d3fbed9801110b2d3f562c44d72940862 websockets doublethink yt-dlp aiofiles uuid_utils
RUN pip3 install --break-system-packages --upgrade rethinkdb

RUN mkdir /app
Expand Down
77 changes: 36 additions & 41 deletions client/worker/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,24 @@ async def warcprox_cleanup():
except Exception:
pass

async def run_job(ws: Websocket, full_job: dict, url: str, warc_prefix: str, ua: str, custom_js: typing.Optional[str], info_url: str):
tries = full_job['_current_attempt']
id = full_job['id']
async def run_job(ws: Websocket, full_job: dict, info_url: str):
job_id = full_job['job_id']
page_id = full_job['page_id']
attempt_id = full_job['attempt_id']
assert "_" not in job_id
warc_prefix = "mnbot-brozzler-" + job_id.replace("-", "_")
#dedup_bucket = f"dedup-{id}-{tries}"
dedup_bucket = ""
stats_bucket = f"stats-{id}-{tries}"
stats_bucket = f"stats-{job_id}"
job = Job(
attempt_id = attempt_id,
full_job = full_job,
url = url,
url = full_job['payload'],
warc_prefix = warc_prefix,
dedup_bucket = dedup_bucket,
stats_bucket = stats_bucket,
ua = ua,
custom_js = custom_js,
ua = full_job['settings']['ua'],
custom_js = full_job['settings']['custom_js'],
cookie_jar = None,
mnbot_info_url = info_url
)
Expand All @@ -129,7 +133,10 @@ async def run_job(ws: Websocket, full_job: dict, url: str, warc_prefix: str, ua:
PYTHON,
os.path.join(os.path.dirname(sys.argv[0]), "browse.py"),
str(pwrite),
id, # useful for ps
# These arguments are passed only because they are useful for ps
job_id,
page_id,
attempt_id,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT,
Expand Down Expand Up @@ -174,16 +181,8 @@ async def run_job(ws: Websocket, full_job: dict, url: str, warc_prefix: str, ua:
res = json.loads(res)
type = res['type']
payload = res['payload']
if type in ("status_code", "outlinks", "final_url", "requisites", "custom_js"):
await ws.store_result(id, type, tries, payload)
elif type == "screenshot":
# Don't tell the tracker to decode the thumbnail
# if there isn't a thumbnail
decode_fields = [k for k in ("full", "thumb") if payload[k]]
await ws.store_result(id, type, tries, payload, decode_fields)
elif type == "cjs_screenshot":
decode_fields = ["full"]
await ws.store_result(id, type, tries, payload, decode_fields)
if type in ("status_code", "outlinks", "final_url", "requisites", "custom_js", "screenshot", "cjs_screenshot"):
await ws.store_result(attempt_id, type, payload)
elif type == "error":
# Cancel tasks, since both stdout and pread are about to get closed.
# Failing to do this results in a "Task exception was never retrieved"
Expand Down Expand Up @@ -233,6 +232,7 @@ async def ping_occasionally():
pass
await asyncio.sleep(15)

# Don't change this without changing the hardcoded slot=0 below and the hardcoded num_slots=1 in tracker.py.
MAX_WORKERS = 1
workers: dict[asyncio.Task, tuple[TaskType, dict | None]] = dict()

Expand Down Expand Up @@ -266,41 +266,36 @@ def handle_sigint():
logger.debug("not spinning up new item as we are pending a stop")
continue
logger.debug("spinning up worker")
resp = await ws.claim_item()
resp = await ws.claim_item(0)
if resp:
item, info_url = resp
id = item['id']
logger.info(f"Starting task {id}")
url = item['item']
assert "_" not in id
prefix = "mnbot-brozzler-" + id.replace("-", "_")
claim, info_url = resp
print(claim, info_url)
attempt_id = claim['attempt_id']
job_id = claim['job_id']
page_id = claim['page_id']
logger.info(f"Starting claim {attempt_id} (for {job_id} : {page_id}")
task = asyncio.create_task(run_job(
ws,
item,
url,
prefix,
item['metadata']['ua'],
item['metadata']['custom_js'],
info_url
claim,
info_url,
))
task.set_name(id)
workers[task] = (TaskType.ITEM, item)
task.set_name(attempt_id)
workers[task] = (TaskType.ITEM, claim)
else:
to_sleep = random.randint(10, 30)
logger.info(f"No items found, blocking this worker for {to_sleep} seconds.")
logger.info(f"No tasks found, blocking this worker for {to_sleep} seconds.")
task = asyncio.create_task(asyncio.sleep(to_sleep))
workers[task] = (TaskType.SLEEP, None)
done: set[asyncio.Task] = (await asyncio.wait(workers, return_when = asyncio.FIRST_COMPLETED))[0]
for finished_task in done:
logger.debug(f"checking finished task {finished_task}")
task_type, item = workers[finished_task]
task_type, task_claim = workers[finished_task]
del workers[finished_task]
if task_type != TaskType.ITEM:
logger.debug("nevermind, not an item")
continue
# item can't be None at this point
id = item['id']
tries = item['_current_attempt']
# claim can't be None at this point
attempt_id = task_claim['attempt_id']
try:
_dedup_bucket, _stats_bucket = finished_task.result()
except Exception as e:
Expand All @@ -309,14 +304,14 @@ def handle_sigint():
fatal = e.fatal
else:
fatal = False
logger.exception(f"failed task {id}:")
logger.exception(f"failed task {attempt_id}:")
fmt = io.StringIO()
finished_task.print_stack(file = fmt)
message = f"Caught exception!\n{fmt.getvalue()}"
await ws.fail_item(id, message, tries, fatal)
await ws.fail_item(attempt_id, message, fatal)
else:
logger.info(f"task {id} was successful!")
await ws.finish_item(id)
await ws.finish_item(attempt_id)
logger.debug("creating cleanup task")
task = asyncio.create_task(warcprox_cleanup())
workers[task] = (TaskType.CLEANUP, None)
Expand Down
27 changes: 10 additions & 17 deletions client/worker/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,12 @@ def _brozzle(self) -> Result:
ua = self._create_user_agent(version)

# Write job info to the WARC
logger.debug("writing item info")
logger.debug("writing job info")
self._write_warcprox_record(
"metadata:mnbot-job-metadata",
f"metadata:mnbot-metadata/{self.job.attempt_id}",
"application/json",
json.dumps({
"job": self.job.full_job,
"claim": self.job.full_job,
"version": VERSION,
"browser": {
"executable": self.chrome_exe,
Expand Down Expand Up @@ -403,7 +403,7 @@ def _brozzle(self) -> Result:

with self.websock_thread_lock:
r = Result(
id = self.job.full_job['id'],
id = self.job.attempt_id,
final_url = final_url,
outlinks = list(outlinks),
custom_js = custom_js_result,
Expand All @@ -413,10 +413,11 @@ def _brozzle(self) -> Result:
)
logger.debug("writing job result data")
self._write_warcprox_record(
"metadata:mnbot-job-result",
f"metadata:mnbot-result/{self.job.attempt_id}",
"application/json",
json.dumps({
"result": r.dict()
"result": r.dict(),
"attempt_id": self.job.attempt_id,
}).encode(),
self.job.warc_prefix
)
Expand Down Expand Up @@ -445,17 +446,9 @@ def write_message(type, payload):
write_message("requisites", [dataclasses.asdict(v) for v in result.requisites.values()])
write_message("status_code", result.status_code)

screenshot = browser.screenshot
thumbnail = browser.thumbnail
if screenshot:
screenshot = base64.b85encode(screenshot).decode()
if thumbnail:
thumbnail = base64.b85encode(thumbnail).decode()
if screenshot or thumbnail:
write_message("screenshot", {
"full": screenshot,
"thumb": thumbnail,
})
if browser.screenshot:
screenshot = base64.b85encode(browser.screenshot).decode()
write_message("screenshot", screenshot)

if result.custom_js_screenshot:
write_message("cjs_screenshot", {"full": result.custom_js_screenshot})
Expand Down
3 changes: 2 additions & 1 deletion client/worker/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Update this whenever you make a change, cosmetic or not.
# During development you can ignore it, but when you actually
# push it to prod, it *must* be updated.
VERSION = "20260412.01"
VERSION = "20260626.01"

DEBUG = os.environ.get("DEBUG") == "1"
if DEBUG:
Expand All @@ -19,6 +19,7 @@
@dataclasses.dataclass
class Job:
full_job: dict
attempt_id: str
url: str
warc_prefix: str
dedup_bucket: str
Expand Down
54 changes: 29 additions & 25 deletions client/worker/tracker.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
### HEY YOU! Yeah, you!
### If you are making any change to the client, please
### update the version in meta.py.
### No two versions in prod should have the same version number.

############################# NOTE! ###############################
### If you are making any change to the client, please ###
### update the version in shared.py. ###
### No two versions in prod should have the same version number.###
############################ THANKS! ##############################

import asyncio, logging, json
import asyncio
import json
import logging
import typing

from shared import VERSION
Expand All @@ -19,6 +16,14 @@

import websockets
from websockets.asyncio.client import connect
import uuid_utils.compat as uuid

class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, uuid.UUID):
return str(obj)
return super().default(obj)
encoder = CustomEncoder()

class Websocket:
def __init__(self, url: str, advisory_handler):
Expand All @@ -32,13 +37,14 @@ async def _send_once(self, type, payload=None) -> tuple[int, dict]:
async with self.lock:
if not self.conn:
logger.debug("creating connection")
self.conn = await connect(self.url)
await self.conn.send(json.dumps({"v": VERSION}))
conn = await connect(self.url)
await conn.send(encoder.encode({"v": VERSION, "p": 2, "num_slots": 1}))
self.conn = conn
self.seq += 1
seq = self.seq
message = {"type": type, "request": payload, "seq": seq}
logger.debug(f"sending message (keys: {list(message.keys())})")
await self.conn.send(json.dumps(message))
logger.debug(f"sending {type} message (seq = {seq})")
await self.conn.send(encoder.encode(message))
while True:
resp = json.loads(await self.conn.recv())
if resp['seq'] is None: # Advisory
Expand Down Expand Up @@ -79,41 +85,39 @@ async def _send(self, type, payload=None):
await asyncio.sleep(sleep)
tries += 1

async def claim_item(self) -> typing.Optional[tuple[dict, str]]:
status, resp = await self._send("Item:claim", {"pipeline_type": "brozzler"})
async def claim_item(self, slot: int) -> typing.Optional[tuple[dict, str]]:
status, resp = await self._send("Item:claim", {"slot": slot})
if status != 200:
raise RuntimeError(f"Bad response from server: {status} {resp}")
if resp['item']:
return resp['item'], resp['info_url']
return None

async def fail_item(self, id: str, reason: str, tries: int, fatal: bool):
async def fail_item(self, attempt_id: str, reason: str, fatal: bool):
status, resp = await self._send(
"Item:fail",
{
"id": id,
"attempt_id": attempt_id,
"message": reason,
"attempt": tries,
"fatal": fatal
"fatal": fatal,
}
)
if status != 204:
raise RuntimeError(f"Bad response from server: {status} {resp}")

async def finish_item(self, id: str):
status, resp = await self._send("Item:finish", {"id": id})
async def finish_item(self, attempt_id: str):
status, resp = await self._send("Item:finish", {"attempt_id": attempt_id})
if status != 204:
raise RuntimeError(f"Bad response from server: {status} {resp}")

async def store_result(self, id: str, result_type: str, tries: int, result, decode_fields = None):
async def store_result(self, attempt_id: str, result_type: str, result):
result_id = uuid.uuid7()
pl = {
"result_type": result_type,
"attempt": tries,
"result": result,
"id": id
"result_id": result_id,
"attempt_id": attempt_id,
"type": result_type,
"payload": result,
}
if decode_fields:
pl['decode_fields'] = decode_fields
status, resp = await self._send("Item:store", pl)
if status != 201:
raise RuntimeError(f"Bad response from server: {status} {resp}")
Expand Down
Loading