update cron job
This commit is contained in:
parent
4c57c0c520
commit
d0a29e8379
@ -3,6 +3,8 @@ import asyncio
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from pocketbase import PocketBase
|
from pocketbase import PocketBase
|
||||||
|
from dateutil.parser import isoparse
|
||||||
|
|
||||||
import time
|
import time
|
||||||
# Load API keys and credentials from environment variables
|
# Load API keys and credentials from environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@ -33,11 +35,12 @@ async def get_subscription_data():
|
|||||||
|
|
||||||
async with aiohttp.ClientSession(headers=headers) as session:
|
async with aiohttp.ClientSession(headers=headers) as session:
|
||||||
while True:
|
while True:
|
||||||
# If we have made 100 requests, wait for 60 seconds before continuing
|
# Limit API request rate if necessary
|
||||||
if request_count > 0 and request_count % 100 == 0:
|
if request_count > 0 and request_count % 200 == 0:
|
||||||
print("Reached 100 API requests. Waiting for 60 seconds...")
|
print("Reached 100 API requests. Waiting for 60 seconds...")
|
||||||
await asyncio.sleep(60)
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
|
# Note: Do not include the meta parameter
|
||||||
url = f"{base_url}?page[number]={page}&page[size]=100"
|
url = f"{base_url}?page[number]={page}&page[size]=100"
|
||||||
async with session.get(url) as response:
|
async with session.get(url) as response:
|
||||||
if response.status != 200:
|
if response.status != 200:
|
||||||
@ -46,11 +49,20 @@ async def get_subscription_data():
|
|||||||
break
|
break
|
||||||
|
|
||||||
data = await response.json()
|
data = await response.json()
|
||||||
# Append the subscription data from this page
|
|
||||||
if "data" in data:
|
|
||||||
all_subscriptions.extend(data["data"])
|
|
||||||
|
|
||||||
# If a next-page link exists, increment the page counter; otherwise, break the loop
|
# Extract subscription data and check for custom meta in attributes
|
||||||
|
if "data" in data:
|
||||||
|
for item in data["data"]:
|
||||||
|
subscription = {
|
||||||
|
"id": item.get("id"),
|
||||||
|
"attributes": item.get("attributes", {}),
|
||||||
|
}
|
||||||
|
# Access your custom meta if it exists under a key like 'custom_meta'
|
||||||
|
if "custom_meta" in subscription["attributes"]:
|
||||||
|
subscription["custom_meta"] = subscription["attributes"]["custom_meta"]
|
||||||
|
all_subscriptions.append(subscription)
|
||||||
|
|
||||||
|
# Handle pagination if a next-page link is available
|
||||||
if "links" in data and data["links"].get("next"):
|
if "links" in data and data["links"].get("next"):
|
||||||
page += 1
|
page += 1
|
||||||
else:
|
else:
|
||||||
@ -60,14 +72,20 @@ async def get_subscription_data():
|
|||||||
|
|
||||||
return all_subscriptions
|
return all_subscriptions
|
||||||
|
|
||||||
|
|
||||||
async def run():
|
async def run():
|
||||||
|
|
||||||
|
# Fetch all users (assumes pb.collection(...).get_full_list() is synchronous)
|
||||||
all_users = pb.collection("users").get_full_list()
|
all_users = pb.collection("users").get_full_list()
|
||||||
users_by_email = {user.email: user for user in all_users if hasattr(user, 'email')}
|
users_by_email = {user.email: user for user in all_users if hasattr(user, 'email')}
|
||||||
|
|
||||||
|
# Fetch all subscriptions (awaited)
|
||||||
all_subscriptions = await get_subscription_data()
|
all_subscriptions = await get_subscription_data()
|
||||||
print(f"Total Subscriptions: {len(all_subscriptions)}\n")
|
print(f"Total Subscriptions: {len(all_subscriptions)}\n")
|
||||||
|
|
||||||
# Group subscriptions by email, prioritizing "active" status first, then latest updated_at
|
# Group subscriptions by email.
|
||||||
|
# For each email, we want to always choose an active subscription over a non-active one.
|
||||||
|
# If both subscriptions are of the same "activity" level, we choose the one with the later updated_at.
|
||||||
subscriptions_by_email = {}
|
subscriptions_by_email = {}
|
||||||
for sub in all_subscriptions:
|
for sub in all_subscriptions:
|
||||||
attributes = sub.get('attributes', {})
|
attributes = sub.get('attributes', {})
|
||||||
@ -76,34 +94,46 @@ async def run():
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
status = attributes.get('status', '').lower()
|
status = attributes.get('status', '').lower()
|
||||||
updated_at = attributes.get('updated_at')
|
updated_at_str = attributes.get('updated_at')
|
||||||
existing_sub = subscriptions_by_email.get(user_email)
|
try:
|
||||||
|
updated_at = isoparse(updated_at_str) if updated_at_str else None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error parsing updated_at ({updated_at_str}) for {user_email}: {e}")
|
||||||
|
updated_at = None
|
||||||
|
|
||||||
# First-time entry: always add
|
existing_sub = subscriptions_by_email.get(user_email)
|
||||||
if not existing_sub:
|
if not existing_sub:
|
||||||
subscriptions_by_email[user_email] = sub
|
subscriptions_by_email[user_email] = sub
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Get info from the already saved subscription
|
||||||
existing_attrs = existing_sub.get('attributes', {})
|
existing_attrs = existing_sub.get('attributes', {})
|
||||||
existing_status = existing_attrs.get('status', '').lower()
|
existing_status = existing_attrs.get('status', '').lower()
|
||||||
existing_updated = existing_attrs.get('updated_at')
|
existing_updated_str = existing_attrs.get('updated_at')
|
||||||
|
try:
|
||||||
|
existing_updated = isoparse(existing_updated_str) if existing_updated_str else None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error parsing existing updated_at ({existing_updated_str}) for {user_email}: {e}")
|
||||||
|
existing_updated = None
|
||||||
|
|
||||||
# Prioritize "active" status
|
# If the new sub is active, it should win over any non-active subscription.
|
||||||
if status == 'active':
|
if status == 'active':
|
||||||
if existing_status == 'active':
|
if existing_status == 'active':
|
||||||
# Both active: keep the newer one
|
# Both active: keep the one with the later updated_at.
|
||||||
if updated_at > existing_updated:
|
if updated_at and existing_updated and updated_at > existing_updated:
|
||||||
subscriptions_by_email[user_email] = sub
|
subscriptions_by_email[user_email] = sub
|
||||||
else:
|
else:
|
||||||
# Replace non-active with active
|
# Replace a non-active subscription with an active one.
|
||||||
subscriptions_by_email[user_email] = sub
|
subscriptions_by_email[user_email] = sub
|
||||||
else:
|
else:
|
||||||
|
# New sub is not active.
|
||||||
if existing_status != 'active':
|
if existing_status != 'active':
|
||||||
# Neither is active: keep the newer one
|
# Both are non-active: choose the one with the later updated_at.
|
||||||
if updated_at > existing_updated:
|
if updated_at and existing_updated and updated_at > existing_updated:
|
||||||
subscriptions_by_email[user_email] = sub
|
subscriptions_by_email[user_email] = sub
|
||||||
|
# If one of the dates is missing, you might add extra logic here.
|
||||||
|
|
||||||
# Process filtered subscriptions
|
# Process the subscriptions after grouping.
|
||||||
for user_email, sub in subscriptions_by_email.items():
|
for user_email, sub in subscriptions_by_email.items():
|
||||||
try:
|
try:
|
||||||
attributes = sub.get('attributes', {})
|
attributes = sub.get('attributes', {})
|
||||||
@ -111,13 +141,16 @@ async def run():
|
|||||||
user = users_by_email.get(user_email)
|
user = users_by_email.get(user_email)
|
||||||
|
|
||||||
if status in ['expired', 'refunded']:
|
if status in ['expired', 'refunded']:
|
||||||
|
# Example logic: downgrade a Pro user if not lifetime and subscription is expired/refunded.
|
||||||
if user and getattr(user, 'tier', None) == 'Pro' and not getattr(user, 'lifetime', False):
|
if user and getattr(user, 'tier', None) == 'Pro' and not getattr(user, 'lifetime', False):
|
||||||
pb.collection('users').update(user.id, {
|
# Uncomment the line below to perform the update:
|
||||||
'tier': 'Free'
|
# pb.collection('users').update(user.id, {'tier': 'Free'})
|
||||||
})
|
|
||||||
print(f"Downgraded: {user_email}")
|
print(f"Downgraded: {user_email}")
|
||||||
except:
|
print(attributes)
|
||||||
pass
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing user {user_email}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user