diff --git a/src/django_project/core/settings.py b/src/django_project/core/settings.py index 551472d..60b4ad1 100644 --- a/src/django_project/core/settings.py +++ b/src/django_project/core/settings.py @@ -289,6 +289,9 @@ GEMINI_API_KEY: str | None = env.str("GEMINI_API_KEY", None) GOOGLE_ANALYTICS_TAG_ID: str | None = env.str("GOOGLE_ANALYTICS_TAG_ID", None) LINKEDIN_ACCESS_TOKEN: str | None = env.str("LINKEDIN_ACCESS_TOKEN", None) +LINKEDIN_CLIENT_ID: str | None = env.str("LINKEDIN_CLIENT_ID", None) +LINKEDIN_CLIENT_SECRET: str | None = env.str("LINKEDIN_CLIENT_SECRET", None) LINKEDIN_ORGANIZATION_URN: str | None = env.str("LINKEDIN_ORGANIZATION_URN", None) +LINKEDIN_REFRESH_TOKEN: str | None = env.str("LINKEDIN_REFRESH_TOKEN", None) SPUG_API_TOKEN: str | None = env.str("SPUG_API_TOKEN", None) SPUG_API_URL: str | None = env.str("SPUG_API_URL", None) diff --git a/src/django_project/core/urls.py b/src/django_project/core/urls.py index dd58d19..3f37384 100644 --- a/src/django_project/core/urls.py +++ b/src/django_project/core/urls.py @@ -18,10 +18,12 @@ from django.conf import settings from django.contrib import admin from django.urls import include, path +from web.views import linkedin_oauth_callback urlpatterns: list = [ # Django provided URLs path("console/", admin.site.urls), + path("accounts/oidc/linkedin/login/callback/", linkedin_oauth_callback, name="linkedin_oauth_callback"), path("accounts/", include("django.contrib.auth.urls")), # 3rd party URLs path("handyhelpers/", include("handyhelpers.urls")), diff --git a/src/django_project/tests/unit/core/views/test_views.py b/src/django_project/tests/unit/core/views/test_views.py index a29a771..c4c42b0 100644 --- a/src/django_project/tests/unit/core/views/test_views.py +++ b/src/django_project/tests/unit/core/views/test_views.py @@ -35,3 +35,24 @@ def test_post(self) -> None: """verify call to HostView view""" response: HttpResponse = self.client.post(reverse("host")) self.assertEqual(response.status_code, 405) + + +class LinkedInOAuthCallbackTests(TestCase): + def test_get_with_code(self) -> None: + response: HttpResponse = self.client.get( + reverse("linkedin_oauth_callback"), + {"code": "example-code", "state": "example-state"}, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response["content-type"], "text/plain") + self.assertIn("code: example-code", response.content.decode("utf-8")) + self.assertIn("state: example-state", response.content.decode("utf-8")) + + def test_get_with_error(self) -> None: + response: HttpResponse = self.client.get( + reverse("linkedin_oauth_callback"), + {"error": "access_denied", "error_description": "user denied"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response["content-type"], "text/plain") + self.assertIn("error: access_denied", response.content.decode("utf-8")) diff --git a/src/django_project/tests/unit/web/test_linkedin_notifier.py b/src/django_project/tests/unit/web/test_linkedin_notifier.py new file mode 100644 index 0000000..077d54d --- /dev/null +++ b/src/django_project/tests/unit/web/test_linkedin_notifier.py @@ -0,0 +1,163 @@ +import os +import tempfile +import unittest +from pathlib import Path +from unittest.mock import Mock, patch + +import requests + +BASE_DIR = Path(__file__).parents[4] +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") +os.environ.setdefault("ENV_PATH", f"{BASE_DIR}/envs/.env.test") + +from web.utilities.notifiers.linkedin import LinkedInOrganizationClient + + +class DummyCredentialManager: + def __init__(self, credential): + self.credential = credential + + def select_for_update(self): + return self + + def get(self, pk): + return self.credential + + +class DummyCredential: + objects = None + + def __init__(self): + self.pk = 1 + self.access_token = "old-token" + self.refresh_token = "refresh-token" + self.access_token_expires_at = None + self.refresh_token_expires_at = None + self.save = Mock() + + +class LinkedInOrganizationClientTests(unittest.TestCase): + def build_client(self, env_path: str | None = None, access_token: str | None = "old-token") -> LinkedInOrganizationClient: + return LinkedInOrganizationClient( + access_token=access_token, + organization_urn="urn:li:organization:107506588", + client_id="client-id", + client_secret="client-secret", + refresh_token="refresh-token", + env_path=env_path, + ) + + def test_refresh_access_token_updates_settings_and_env_file(self): + with tempfile.TemporaryDirectory() as temp_dir: + temp_env = Path(temp_dir) / ".env.test" + temp_env.write_text( + 'LINKEDIN_ACCESS_TOKEN="old-token"\nLINKEDIN_REFRESH_TOKEN="refresh-token"\n', + encoding="utf-8", + ) + client = self.build_client(env_path=str(temp_env)) + + response = Mock() + response.json.return_value = { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + } + response.raise_for_status.return_value = None + + with ( + patch("web.utilities.notifiers.linkedin.requests.post", return_value=response) as mock_post, + patch("web.utilities.notifiers.linkedin.settings") as mock_settings, + ): + client.refresh_access_token() + + self.assertEqual(client.access_token, "new-token") + self.assertEqual(client.refresh_token, "new-refresh-token") + self.assertIn('LINKEDIN_ACCESS_TOKEN="new-token"', temp_env.read_text(encoding="utf-8")) + self.assertIn('LINKEDIN_REFRESH_TOKEN="new-refresh-token"', temp_env.read_text(encoding="utf-8")) + mock_post.assert_called_once() + self.assertEqual(mock_settings.LINKEDIN_ACCESS_TOKEN, "new-token") + self.assertEqual(mock_settings.LINKEDIN_REFRESH_TOKEN, "new-refresh-token") + + def test_post_retries_once_after_auth_failure(self): + client = self.build_client() + + auth_failure_response = Mock(status_code=401) + auth_failure_response.raise_for_status.side_effect = requests.HTTPError(response=auth_failure_response) # type: ignore[name-defined] + + refresh_response = Mock() + refresh_response.json.return_value = { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + } + refresh_response.raise_for_status.return_value = None + + success_response = Mock(status_code=201) + success_response.raise_for_status.return_value = None + + with ( + patch( + "web.utilities.notifiers.linkedin.requests.post", + side_effect=[auth_failure_response, refresh_response, success_response], + ) as mock_post, + patch("web.utilities.notifiers.linkedin.settings"), + ): + response = client.post_organization_post("hello world") + + self.assertIs(response, success_response) + self.assertEqual(client.access_token, "new-token") + self.assertEqual(mock_post.call_count, 3) + + def test_refresh_access_token_updates_db_credential_when_present(self): + credential = DummyCredential() + credential.__class__.objects = DummyCredentialManager(credential) + + client = LinkedInOrganizationClient( + access_token="old-token", + organization_urn="urn:li:organization:107506588", + client_id="client-id", + client_secret="client-secret", + refresh_token="refresh-token", + credential=credential, + ) + + refresh_response = Mock() + refresh_response.json.return_value = { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + "expires_in": 3600, + "refresh_token_expires_in": 7200, + } + refresh_response.raise_for_status.return_value = None + + with ( + patch("web.utilities.notifiers.linkedin.requests.post", return_value=refresh_response), + patch("web.utilities.notifiers.linkedin.settings"), + ): + client.refresh_access_token() + + self.assertEqual(credential.access_token, "new-token") + self.assertEqual(credential.refresh_token, "new-refresh-token") + credential.save.assert_called_once() + + def test_missing_tokens_can_be_bootstrapped_from_refresh_credentials(self): + client = self.build_client(access_token=None) + + refresh_response = Mock() + refresh_response.json.return_value = { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + } + refresh_response.raise_for_status.return_value = None + + success_response = Mock(status_code=201) + success_response.raise_for_status.return_value = None + + with ( + patch( + "web.utilities.notifiers.linkedin.requests.post", + side_effect=[refresh_response, success_response], + ), + patch("web.utilities.notifiers.linkedin.settings"), + ): + client.post_organization_post("hello world") + + self.assertEqual(client.access_token, "new-token") diff --git a/src/django_project/tests/unit/web/test_linkedin_oauth_command.py b/src/django_project/tests/unit/web/test_linkedin_oauth_command.py new file mode 100644 index 0000000..409bc81 --- /dev/null +++ b/src/django_project/tests/unit/web/test_linkedin_oauth_command.py @@ -0,0 +1,79 @@ +import os +import unittest +from pathlib import Path +from unittest.mock import Mock, patch + +BASE_DIR = Path(__file__).parents[4] +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") +os.environ.setdefault("ENV_PATH", f"{BASE_DIR}/envs/.env.test") + +from web.management.commands.linkedin_oauth import Command + + +class LinkedInOAuthCommandTests(unittest.TestCase): + def test_show_url_outputs_authorization_url(self): + command = Command() + command.stdout = Mock() + + with ( + patch("web.management.commands.linkedin_oauth.apps.get_model") as mock_get_model, + patch("web.management.commands.linkedin_oauth.settings") as mock_settings, + patch("web.management.commands.linkedin_oauth.LinkedInOrganizationClient") as mock_client_class, + ): + mock_settings.LINKEDIN_CLIENT_ID = "client-id" + mock_settings.LINKEDIN_CLIENT_SECRET = "client-secret" + mock_settings.LINKEDIN_ACCESS_TOKEN = None + mock_settings.LINKEDIN_ORGANIZATION_URN = "urn:li:organization:107506588" + mock_settings.LINKEDIN_REFRESH_TOKEN = None + mock_settings.ENV_PATH = "/tmp/.env.test" + + mock_credential = Mock(access_token=None, refresh_token=None) + mock_get_model.return_value.objects.get_or_create.return_value = (mock_credential, True) + + mock_client = mock_client_class.return_value + mock_client.build_authorization_url.return_value = "https://www.linkedin.com/oauth/v2/authorization?x=1" + + command.handle(redirect_uri="https://example.com/callback", scope=command.default_scope, state=None, code=None) + + command.stdout.write.assert_any_call("Open this URL in a browser and complete the LinkedIn consent flow:") + command.stdout.write.assert_any_call("https://www.linkedin.com/oauth/v2/authorization?x=1") + + def test_code_exchange_stores_tokens(self): + command = Command() + command.stdout = Mock() + + with ( + patch("web.management.commands.linkedin_oauth.apps.get_model") as mock_get_model, + patch("web.management.commands.linkedin_oauth.settings") as mock_settings, + patch("web.management.commands.linkedin_oauth.LinkedInOrganizationClient") as mock_client_class, + ): + mock_settings.LINKEDIN_CLIENT_ID = "client-id" + mock_settings.LINKEDIN_CLIENT_SECRET = "client-secret" + mock_settings.LINKEDIN_ACCESS_TOKEN = None + mock_settings.LINKEDIN_ORGANIZATION_URN = "urn:li:organization:107506588" + mock_settings.LINKEDIN_REFRESH_TOKEN = None + mock_settings.ENV_PATH = "/tmp/.env.test" + + mock_credential = Mock(access_token=None, refresh_token=None) + mock_get_model.return_value.objects.get_or_create.return_value = (mock_credential, True) + + mock_client = mock_client_class.return_value + mock_client.exchange_authorization_code.return_value = { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + "expires_in": 5184000, + "refresh_token_expires_in": 31536000, + } + + command.handle( + redirect_uri="https://example.com/callback", + scope=command.default_scope, + state=None, + code="auth-code", + show_url=False, + ) + + mock_client.exchange_authorization_code.assert_called_once_with( + code="auth-code", + redirect_uri="https://example.com/callback", + ) diff --git a/src/django_project/web/admin.py b/src/django_project/web/admin.py index 51691c7..f9e6cf4 100644 --- a/src/django_project/web/admin.py +++ b/src/django_project/web/admin.py @@ -1,7 +1,14 @@ from django.contrib import admin # import models -from web.models import Event, Link, SocialPlatform, Tag, TechGroup +from web.models import ( + Event, + IntegrationCredential, + Link, + SocialPlatform, + Tag, + TechGroup, +) class TagAdmin(admin.ModelAdmin): @@ -20,6 +27,18 @@ class SocialPlatformAdmin(admin.ModelAdmin): list_filter = ["enabled"] +class IntegrationCredentialAdmin(admin.ModelAdmin): + list_display = [ + "id", + "provider", + "access_token_expires_at", + "refresh_token_expires_at", + "created_at", + "updated_at", + ] + search_fields = ["id", "provider"] + + class TechGroupAdmin(admin.ModelAdmin): list_display = ["id", "name", "description", "enabled", "platform", "icon", "image", "created_at", "updated_at"] search_fields = ["id", "name", "description", "icon", "image"] @@ -61,5 +80,6 @@ class EventAdmin(admin.ModelAdmin): admin.site.register(Tag, TagAdmin) admin.site.register(Link, LinkAdmin) admin.site.register(SocialPlatform, SocialPlatformAdmin) +admin.site.register(IntegrationCredential, IntegrationCredentialAdmin) admin.site.register(TechGroup, TechGroupAdmin) admin.site.register(Event, EventAdmin) diff --git a/src/django_project/web/management/__init__.py b/src/django_project/web/management/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/django_project/web/management/__init__.py @@ -0,0 +1 @@ + diff --git a/src/django_project/web/management/commands/__init__.py b/src/django_project/web/management/commands/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/django_project/web/management/commands/__init__.py @@ -0,0 +1 @@ + diff --git a/src/django_project/web/management/commands/linkedin_oauth.py b/src/django_project/web/management/commands/linkedin_oauth.py new file mode 100644 index 0000000..72cdcdc --- /dev/null +++ b/src/django_project/web/management/commands/linkedin_oauth.py @@ -0,0 +1,77 @@ +from django.apps import apps +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from web.utilities.notifiers.linkedin import LinkedInOrganizationClient + + +class Command(BaseCommand): + help = "Generate a LinkedIn OAuth authorization URL or exchange an authorization code for tokens." + + default_scope = "w_organization_social" + + def add_arguments(self, parser) -> None: + parser.add_argument("--code", help="Authorization code returned by LinkedIn.") + parser.add_argument( + "--redirect-uri", + required=True, + help="Redirect URI registered in the LinkedIn developer app.", + ) + parser.add_argument( + "--scope", + default=self.default_scope, + help=f"OAuth scopes to request. Defaults to: {self.default_scope}", + ) + parser.add_argument("--state", help="Optional OAuth state value for the authorization URL.") + parser.add_argument( + "--show-url", + action="store_true", + help="Print the authorization URL even when --code is also provided.", + ) + + def handle(self, *args, **options) -> str: + client_id = settings.LINKEDIN_CLIENT_ID + client_secret = settings.LINKEDIN_CLIENT_SECRET + env_path = getattr(settings, "ENV_PATH", None) + + if not client_id or not client_secret: + raise CommandError("LINKEDIN_CLIENT_ID and LINKEDIN_CLIENT_SECRET must be configured.") + + credential_model = apps.get_model("web", "IntegrationCredential") + credential, _ = credential_model.objects.get_or_create(provider="linkedin") + + client = LinkedInOrganizationClient( + access_token=credential.access_token or settings.LINKEDIN_ACCESS_TOKEN, + organization_urn=settings.LINKEDIN_ORGANIZATION_URN or "", + client_id=client_id, + client_secret=client_secret, + refresh_token=credential.refresh_token or settings.LINKEDIN_REFRESH_TOKEN, + env_path=env_path, + credential=credential, + ) + + redirect_uri = options["redirect_uri"] + scope = options["scope"] + state = options.get("state") + code = options.get("code") + show_url = options.get("show_url", False) + + if show_url or not code: + auth_url = client.build_authorization_url(redirect_uri=redirect_uri, scope=scope, state=state) + self.stdout.write("Open this URL in a browser and complete the LinkedIn consent flow:") + self.stdout.write(auth_url) + if not code: + self.stdout.write("Re-run this command with --code once LinkedIn redirects back with ?code=...") + return auth_url + + token_data = client.exchange_authorization_code(code=code, redirect_uri=redirect_uri) + self.stdout.write(self.style.SUCCESS("LinkedIn tokens stored successfully.")) + self.stdout.write(f"Access token expires in: {token_data.get('expires_in')}") + if token_data.get("refresh_token_expires_in") is not None: + self.stdout.write(f"Refresh token expires in: {token_data.get('refresh_token_expires_in')}") + elif not token_data.get("refresh_token"): + self.stdout.write( + self.style.WARNING( + "LinkedIn did not return a refresh token. Programmatic refresh may not be enabled for this app." + ) + ) + return "LinkedIn OAuth token exchange completed." diff --git a/src/django_project/web/migrations/0006_integrationcredential.py b/src/django_project/web/migrations/0006_integrationcredential.py new file mode 100644 index 0000000..488f0aa --- /dev/null +++ b/src/django_project/web/migrations/0006_integrationcredential.py @@ -0,0 +1,26 @@ +import encrypted_fields.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("web", "0005_alter_event_location_fields_nullable"), + ] + + operations = [ + migrations.CreateModel( + name="IntegrationCredential", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("provider", models.CharField(max_length=64, unique=True)), + ("access_token", encrypted_fields.fields.EncryptedTextField(blank=True, null=True)), + ("refresh_token", encrypted_fields.fields.EncryptedTextField(blank=True, null=True)), + ("access_token_expires_at", models.DateTimeField(blank=True, null=True)), + ("refresh_token_expires_at", models.DateTimeField(blank=True, null=True)), + ], + options={"ordering": ["provider"]}, + ), + ] diff --git a/src/django_project/web/models.py b/src/django_project/web/models.py index 43b09d2..1cd9105 100644 --- a/src/django_project/web/models.py +++ b/src/django_project/web/models.py @@ -81,6 +81,22 @@ def __str__(self) -> str: return self.name +class IntegrationCredential(HandyHelperBaseModel): + """Stores third-party integration credentials shared across app processes.""" + + provider = models.CharField(max_length=64, unique=True) + access_token = EncryptedTextField(blank=True, null=True) + refresh_token = EncryptedTextField(blank=True, null=True) + access_token_expires_at = models.DateTimeField(blank=True, null=True) + refresh_token_expires_at = models.DateTimeField(blank=True, null=True) + + class Meta: + ordering = ["provider"] + + def __str__(self) -> str: + return self.provider + + class Tag(HandyHelperBaseModel): """A tag that describes attributes of a Event or a TechGroup""" diff --git a/src/django_project/web/scripts/ingest_events.py b/src/django_project/web/scripts/ingest_events.py index 789b614..dd4dbb5 100644 --- a/src/django_project/web/scripts/ingest_events.py +++ b/src/django_project/web/scripts/ingest_events.py @@ -15,7 +15,7 @@ def get_eventbright_events(): def get_meetup_events() -> None: - tech_group_list = TechGroup.objects.filter(enabled=True, platform__name="Meetup") + tech_group_list = TechGroup.objects.filter(enabled=True, platform__name="Meetup", name__icontains="python") for group in tech_group_list: print("INFO: getting upcoming events for ", group.name) job = ingest_future_meetup_events.s(group.pk) @@ -23,5 +23,5 @@ def get_meetup_events() -> None: def run(): - get_eventbright_events() + # get_eventbright_events() get_meetup_events() diff --git a/src/django_project/web/tasks.py b/src/django_project/web/tasks.py index f48f187..801c338 100644 --- a/src/django_project/web/tasks.py +++ b/src/django_project/web/tasks.py @@ -11,7 +11,7 @@ from django.conf import settings from django.db.models.manager import BaseManager from django.utils import timezone -from web.models import Event, Link, Tag, TechGroup +from web.models import Event, IntegrationCredential, Link, Tag, TechGroup from web.utilities.dt_utils import convert_to_pacific from web.utilities.notifiers.discord import DiscordNotifier from web.utilities.notifiers.linkedin import LinkedInOrganizationClient @@ -206,14 +206,25 @@ def post_event_to_linkedin(event_pk: int, is_new: bool) -> str: if not event: return f"Event with pk {event_pk} not found." - # Ensure LinkedIn API credentials are set - if not settings.LINKEDIN_ACCESS_TOKEN or not settings.LINKEDIN_ORGANIZATION_URN: + linkedin_credential = IntegrationCredential.objects.filter(provider="linkedin").first() + access_token = linkedin_credential.access_token if linkedin_credential else settings.LINKEDIN_ACCESS_TOKEN + refresh_token = linkedin_credential.refresh_token if linkedin_credential else settings.LINKEDIN_REFRESH_TOKEN + + if not settings.LINKEDIN_ORGANIZATION_URN: + return "LinkedIn organization URN not configured in settings. Skipping post." + + if not access_token and not (refresh_token and settings.LINKEDIN_CLIENT_ID and settings.LINKEDIN_CLIENT_SECRET): return "LinkedIn API credentials not configured in settings. Skipping post." # Initialize LinkedIn client linkedin_client = LinkedInOrganizationClient( - access_token=settings.LINKEDIN_ACCESS_TOKEN, + access_token=access_token, organization_urn=settings.LINKEDIN_ORGANIZATION_URN, + client_id=settings.LINKEDIN_CLIENT_ID, + client_secret=settings.LINKEDIN_CLIENT_SECRET, + refresh_token=refresh_token, + env_path=getattr(settings, "ENV_PATH", None), + credential=linkedin_credential, ) linkedin_client.post_event( diff --git a/src/django_project/web/utilities/notifiers/linkedin.py b/src/django_project/web/utilities/notifiers/linkedin.py index 13e6961..681bc3e 100644 --- a/src/django_project/web/utilities/notifiers/linkedin.py +++ b/src/django_project/web/utilities/notifiers/linkedin.py @@ -1,10 +1,14 @@ import json -from typing import Any, Optional +import logging +from datetime import timedelta +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional import requests from bs4 import BeautifulSoup +from django.conf import settings +from django.db import transaction from django.utils import timezone -from web.models import Event from web.utilities.ai.gemini import generate_post_content from web.utilities.ai.prompts import ( create_event_reminder_prompt, @@ -12,16 +16,34 @@ ) from web.utilities.dt_utils import convert_to_pacific +if TYPE_CHECKING: + from web.models import Event + + +logger = logging.getLogger(__name__) + class LinkedInOrganizationClient: def __init__( self, - access_token: str, + access_token: Optional[str], organization_urn: str, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + refresh_token: Optional[str] = None, + env_path: Optional[str] = None, + credential: Optional[Any] = None, ) -> None: - self.access_token: str = access_token + self.access_token = access_token self.organization_urn: str = organization_urn + self.client_id = client_id + self.client_secret = client_secret + self.refresh_token = refresh_token + self.env_path = Path(env_path) if env_path else None + self.credential = credential self.post_url = "https://api.linkedin.com/rest/posts" + self.access_token_url = "https://www.linkedin.com/oauth/v2/accessToken" # nosec B105 + self.authorization_url = "https://www.linkedin.com/oauth/v2/authorization" self.set_headers() def set_headers(self) -> None: @@ -32,6 +54,162 @@ def set_headers(self) -> None: "X-Restli-Protocol-Version": "2.0.0", } + def can_refresh_access_token(self) -> bool: + return bool(self.refresh_token and self.client_id and self.client_secret) + + def ensure_access_token(self) -> None: + if self.access_token: + return + if not self.can_refresh_access_token(): + raise ValueError("LinkedIn access token is missing and refresh credentials are not fully configured.") + self.refresh_access_token() + + def refresh_access_token(self) -> None: + if self.credential is not None and getattr(self.credential, "pk", None): + self._refresh_access_token_with_credential() + return + if not self.can_refresh_access_token(): + raise ValueError("LinkedIn refresh token flow is not fully configured.") + + token_data = self._request_token_refresh(self.refresh_token) + self._apply_token_data(token_data) + self._persist_tokens(token_data) + + def build_authorization_url(self, redirect_uri: str, scope: str, state: Optional[str] = None) -> str: + if not self.client_id: + raise ValueError("LinkedIn client ID is required to build the authorization URL.") + + query_params = { + "response_type": "code", + "client_id": self.client_id, + "redirect_uri": redirect_uri, + "scope": scope, + } + if state: + query_params["state"] = state + + prepared_request: requests.PreparedRequest = requests.Request( + "GET", self.authorization_url, params=query_params + ).prepare() + if prepared_request.url is None: + raise ValueError("LinkedIn authorization URL could not be generated.") + return prepared_request.url + + def exchange_authorization_code(self, code: str, redirect_uri: str) -> dict[str, Any]: + if not self.client_id or not self.client_secret: + raise ValueError("LinkedIn client ID and client secret are required to exchange an authorization code.") + + response = requests.post( + self.access_token_url, + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + "client_id": self.client_id, + "client_secret": self.client_secret, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=15, + ) + response.raise_for_status() + token_data = response.json() + + self._apply_token_data(token_data) + self._persist_tokens(token_data) + return token_data + + def _request_token_refresh(self, refresh_token: Optional[str]) -> dict[str, Any]: + response = requests.post( + self.access_token_url, + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": self.client_id, + "client_secret": self.client_secret, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=15, + ) + response.raise_for_status() + return response.json() + + def _apply_token_data(self, token_data: dict[str, Any]) -> None: + self.access_token = token_data["access_token"] + self.refresh_token = token_data.get("refresh_token", self.refresh_token) + self.set_headers() + + def _refresh_access_token_with_credential(self) -> None: + if self.credential is None or getattr(self.credential, "pk", None) is None: + raise ValueError("LinkedIn credential record is missing and cannot be refreshed with locking.") + + credential_model = self.credential.__class__ + with transaction.atomic(): + locked_credential = credential_model.objects.select_for_update().get(pk=self.credential.pk) + self.access_token = locked_credential.access_token + self.refresh_token = locked_credential.refresh_token + if not self.can_refresh_access_token(): + raise ValueError("LinkedIn refresh token flow is not fully configured.") + token_data = self._request_token_refresh(self.refresh_token) + self._apply_token_data(token_data) + self.credential = locked_credential + self._persist_tokens(token_data) + + def _persist_tokens(self, token_data: Optional[dict[str, Any]] = None) -> None: + setattr(settings, "LINKEDIN_ACCESS_TOKEN", self.access_token) + setattr(settings, "LINKEDIN_REFRESH_TOKEN", self.refresh_token) + + if self.credential is not None: + self.credential.access_token = self.access_token + self.credential.refresh_token = self.refresh_token + if token_data: + if token_data.get("expires_in") is not None: + self.credential.access_token_expires_at = timezone.now() + timedelta( + seconds=int(token_data["expires_in"]) + ) + if token_data.get("refresh_token_expires_in") is not None: + self.credential.refresh_token_expires_at = timezone.now() + timedelta( + seconds=int(token_data["refresh_token_expires_in"]) + ) + self.credential.save( + update_fields=[ + "access_token", + "refresh_token", + "access_token_expires_at", + "refresh_token_expires_at", + "updated_at", + ] + ) + return + + if not self.env_path: + logger.info("LinkedIn tokens refreshed in memory only; ENV_PATH is not configured.") + return + if not self.env_path.exists(): + logger.warning("LinkedIn tokens refreshed but env file does not exist: %s", self.env_path) + return + + lines = self.env_path.read_text(encoding="utf-8").splitlines() + replacements = { + "LINKEDIN_ACCESS_TOKEN": self.access_token, + "LINKEDIN_REFRESH_TOKEN": self.refresh_token, + } + + for key, value in replacements.items(): + serialized_value = json.dumps(value) if value is not None else '""' + for index, line in enumerate(lines): + if line.startswith(f"{key}="): + lines[index] = f"{key}={serialized_value}" + break + else: + lines.append(f"{key}={serialized_value}") + + self.env_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + def _is_auth_failure(self, response: Optional[requests.Response]) -> bool: + if response is None: + return False + return response.status_code in {401, 403} + def post_organization_post( self, commentary: str, @@ -39,6 +217,7 @@ def post_organization_post( article_title: Optional[str] = None, article_description: Optional[str] = None, ) -> requests.Response: + self.ensure_access_token() payload: dict[str, Any] = { "author": self.organization_urn, "commentary": commentary, @@ -57,13 +236,24 @@ def post_organization_post( } payload_json: str = json.dumps(payload) - response: requests.Response = requests.post(self.post_url, headers=self.headers, data=payload_json, timeout=15) - response.raise_for_status() - return response + response = requests.post(self.post_url, headers=self.headers, data=payload_json, timeout=15) + + try: + response.raise_for_status() + return response + except requests.HTTPError: + if not self._is_auth_failure(response) or not self.can_refresh_access_token(): + raise + + logger.info("LinkedIn post received %s; refreshing access token and retrying once.", response.status_code) + self.refresh_access_token() + retry_response = requests.post(self.post_url, headers=self.headers, data=payload_json, timeout=15) + retry_response.raise_for_status() + return retry_response def build_event_commentary( self, - event: Event, + event: "Event", is_new: bool = True, ) -> str: """ @@ -110,7 +300,7 @@ def build_event_commentary( def post_event( self, - event: Event, + event: "Event", is_new: bool = True, ) -> requests.Response: """ diff --git a/src/django_project/web/views.py b/src/django_project/web/views.py index 2141eba..0ea03b2 100644 --- a/src/django_project/web/views.py +++ b/src/django_project/web/views.py @@ -1,3 +1,4 @@ +from django.http import HttpRequest, HttpResponse from django.utils import timezone from handyhelpers.views.calendar import HtmxCalendarView from handyhelpers.views.htmx import ( @@ -9,6 +10,43 @@ from web.models import Event, TechGroup +def linkedin_oauth_callback(request: HttpRequest) -> HttpResponse: + """Display LinkedIn OAuth callback parameters for manual token exchange flows.""" + code = request.GET.get("code") + state = request.GET.get("state") + error = request.GET.get("error") + error_description = request.GET.get("error_description") + + if error: + body = ["LinkedIn OAuth callback received an error.", "", f"error: {error}"] + if error_description: + body.append(f"error_description: {error_description}") + return HttpResponse("\n".join(body), content_type="text/plain", status=400) + + if not code: + return HttpResponse( + "LinkedIn OAuth callback did not include a code parameter.", + content_type="text/plain", + status=400, + ) + + body = [ + "LinkedIn OAuth callback received successfully.", + "", + f"code: {code}", + ] + if state: + body.append(f"state: {state}") + body.extend( + [ + "", + "Use this code with:", + "python manage.py linkedin_oauth --redirect-uri --code ''", + ] + ) + return HttpResponse("\n".join(body), content_type="text/plain") + + class AboutContentView(HtmxOptionView): """Render the 'about' page"""