[Python] My Indexing Service Worker Script - Open Source & Methode Reveal [No Support, all DIY]

Bugfisch

Regular Member
Joined
Nov 5, 2021
Messages
360
Reaction score
283
I worked on my own indexing service until January, using a somewhat (I guess) unusual method, but stopped development on the project. The beta is basically still running, but is not being developed further.
Since this method will no longer work as from 12/23, as Google is discontinuing the API required for it, I have decided to open source the code of my worker (i.e. the script that does the actual work in the backend).
The method is relatively simple and perhaps also good to know for people without coding knowledge:
We abuse the mobile Friendly Test. Wich is also a great way, to uncloak sites ;)
We can do this via the Google Webmaster API - a simple API key is also sufficient, so no annoying authentication fuss when generating accounts if you want to scale. The API sends a legitimate Google bot by in real time, so to speak, which also prepares the page for the index (I cross-checked Google cache times).

Searchconsole API keys have a request limit, but no IP limit - so you can create some for yourself.

The whole thing serves more as training material that is not directly executable because the whole database structure is missing - but anyone with programming knowledge can adapt it for themselves and use the functions they need.

I won't give much support here either - I'll just throw the whole thing into the room and leave it to you to play around with ;)

Python:
# -*- coding: utf-8 -*-
import requests
import random
import base64
import json
import time
import asyncio
import aiohttp
from pymongo import MongoClient

############
# SETTINGS
############

def settings(category, data):
    settings = {
        "basic" : {
            "googleAPIKeys" : ["apiCode1", "apiCode2"],
            "urlCount" : 5,
            "processes" : "auto"
        },
        "db" : {
            "DatabaseServer": "mongoDBServer",
            "DatabasePort": 27017,
            "DatabaseUser": "mongoDBUser",
            "DatabasePassword": "mongoDBPW"
            "DatabaseDB": "yourDatabase"
            "DatabaseCollection": "yourCollection"
        }
    }
    return(settings[category][data])

############
# SCRIPT
############

def main():
    running = True
    while running == True:
        timer = time.perf_counter() + 10
        print("Getting items")
        urls = getUrlItems()
        if len(urls) > 0:
            print("Starting Tasks")
            asyncio.get_event_loop().run_until_complete(startMultiprocess(urls))
        while time.perf_counter() < timer:
            print("waiting...")
            time.sleep(5)

#################################
# FUNCTIONS DATABASE
def mongoDbConnection(db, collection):
    client = MongoClient(f'mongodb://{settings("db", "DatabaseUser")}:{settings("db", "DatabasePassword")}@{settings("db", "DatabaseServer")}:{settings("db", "DatabasePort")}/')
    db = client[db]
    collection = db[collection]
    return(collection)

############
# _mainFunctions
############

def getUrlItems():
    maxUrls = len(settings("basic", "googleAPIKeys")) * settings("basic", "urlCount")
    urls = []
    for t in ["premium", "standard", "spam"]:
        slots = maxUrls - len(urls)
        urls.extend(getSubmittedUrls(t, slots))
        print(str(len(urls)))
    return(spreadApiCodes(urls))

def spreadApiCodes(items):
    urls = []
    i = 0
    for u in items:
        if i == len(settings("basic", "googleAPIKeys")):
            i = 0
        u.update({"api": settings("basic", "googleAPIKeys")[i]})
        urls.append(u)
        i += 1
    return(urls)

def getSubmittedUrls(_type, _max):
    urls = []
    MongoCollection = mongoDbConnection(settings("db", "yourDatabase"), settings("db", "yourCollection"))
    docs = MongoCollection.find({"status": "pending", "type": _type}).sort("timeSubmitted", 1).limit(_max)
    for row in docs:
        urls.append({"url": row["url"], "id": row["_id"]})
    return(urls)

def updateSubmittedUrls(_id, state):
    urls = []
    MongoCollection = mongoDbConnection(settings("db", "yourDatabase"), settings("db", "yourCollection"))
    if state:
        r = MongoCollection.update_one({"_id": _id}, {"$set":{"status":"success", "timeFinished": time.time()}}, upsert=False)
        return()
    docs = MongoCollection.find({"_id": _id})
    for row in docs:
        if row["failed"] == 2:
            r = MongoCollection.update_one({"_id": _id}, {"$set":{"status":"failed", "timeFinished": time.time(), "cost":0}}, upsert=False)
            MongoCollection = mongoDbConnection(settings("db", "yourDatabase"), "user")
            r = MongoCollection.update_one({"_id": row["userId"]}, {"$inc":{"credits":-1}}, upsert=False)
            return()
        r = MongoCollection.update_one({"_id": _id}, {"$inc":{"failed": 1}}, upsert=False)
        return()

async def startMultiprocess(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(requestGoogleAPI(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)

def poolCount(urls):
    if (settings("basic", "processes") == "auto"):
        pool = len(settings("basic", "googleAPIKeys")) * settings("basic", "urlCount")
        if len(urls) < pool:
            return(len(urls))
        # if pool > 100:
        #     pool = 100
        return(pool)
    if len(urls) < pool:
        return(len(urls))
    return(settings("basic", "processes"))

async def requestGoogleAPI(session, task):
    startTime = time.time()
    async with session.post('https://searchconsole.googleapis.com/v1/urlTestingTools/mobileFriendlyTest:run', params=apiCodeToRequest(task["api"]),    headers = generateApiHeaders(),    data=urlToRequest(task["url"])) as response:
        raw = await response.text()
        print(f'+++ {decodeApiResponse(raw)} ||| {task["url"]} |@|{task["api"]} ||| {str(round(time.time()-startTime))} SEC')
        if decodeApiResponse(raw) == "FAILED":
            print(raw)
            return(False)
        if decodeApiResponse(raw) == "COMPLETE":
            updateSubmittedUrls(task["id"], True)
        else:
            updateSubmittedUrls(task["id"], False)
        return(True)

def decodeApiResponse(raw):
    try:
        json_data = json.loads(raw)
        status = json_data['testStatus']['status']
        return(status)
    except Exception as e:
        print(e)
        return("FAILED")

def decodeApiResponseMobility(raw):
    try:
        json_data = json.loads(raw)
        status = json_data['mobileFriendliness']
        return(status)
    except Exception as e:
        print(e)
        return("FAILED")

def apiCodeToRequest(apiCode):
    data = {'key': apiCode}
    return(data)

def generateApiHeaders():
    headers = {'Content-Type': 'application/json'}

def urlToRequest(url):
    data = {"url": url}
    return(data)

def setMongo(server, db, collection):
    client = MongoClient(server)
    db = client[db]
    collection = db[collection]
    return(collection)

def rdmNoStr(fm, to):
    no = str(random.randrange(999, 9999999))
    return(no)

if __name__ == "__main__":
    main()
 
Does this force Google to index a website with a new domain? Am I getting it or do I need to copy this and paste it into chatGPT for a 4th-level grade language summary??
 
Does this force Google to index a website with a new domain?
Well, depends on what you call index. It will force a google bot on your page and that bot will index it, yes. But this doesnt mean, that it will actually land in the SERPs. Google has more than one Index Layer - basicly one can say, the bot saves just all the stuff he sees in an internal index - so that other algos can run over it, analyze the content, etc.
We can basicly only force this first layer and this is done by this method.
 
Great share @Bugfisch!

I updated the code so it uses files to load both APIs & links from local files, as well as saving the submitted URLs to a file,
Didn't test this yet, so it might need some adjustments,
You'll need to place the links in links.txt, apis in apis.txt and submitted will be saved in submitted.txt (you can change the file locations at the top of the script), here's the code:
Python:
import requests
import random
import base64
import json
import time
import asyncio
import aiohttp

# Define the file paths at the top of the script
API_KEYS_FILE = "apis.txt"
URLS_FILE = "links.txt"
SUBMITTED_URLS_FILE = "submitted.txt"

# Load API keys from apis.txt
with open(API_KEYS_FILE, 'r') as f:
    googleAPIKeys = [line.strip() for line in f]

############
# SETTINGS
############

def settings(category, data):
    settings = {
        "basic" : {
            "googleAPIKeys" : googleAPIKeys,
            "urlCount" : 5,
            "processes" : "auto"
        }
    }
    return(settings[category][data])

############
# SCRIPT
############

def main():
    running = True
    while running == True:
        timer = time.perf_counter() + 10
        print("Getting items")
        urls = getUrlItems()
        if len(urls) > 0:
            print("Starting Tasks")
            asyncio.get_event_loop().run_until_complete(startMultiprocess(urls))
        while time.perf_counter() < timer:
            print("waiting...")
            time.sleep(5)

############
# _mainFunctions
############

def getUrlItems():
    maxUrls = len(settings("basic", "googleAPIKeys")) * settings("basic", "urlCount")
    urls = []
    with open(URLS_FILE, 'r') as f:
        lines = f.readlines()
    urls = [{"url": line.strip(), "id": i} for i, line in enumerate(lines) if i < maxUrls]
    return(spreadApiCodes(urls))

def spreadApiCodes(items):
    urls = []
    i = 0
    for u in items:
        if i == len(settings("basic", "googleAPIKeys")):
            i = 0
        u.update({"api": settings("basic", "googleAPIKeys")[i]})
        urls.append(u)
        i += 1
    return(urls)

def updateSubmittedUrls(_id, state):
    with open(URLS_FILE, 'r') as f:
        lines = f.readlines()
    with open(URLS_FILE, 'w') as f:
        for i, line in enumerate(lines):
            if i != _id:
                f.write(line)
    with open(SUBMITTED_URLS_FILE, 'a') as f:
        f.write(lines[_id])

async def startMultiprocess(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(requestGoogleAPI(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)

def poolCount(urls):
    if (settings("basic", "processes") == "auto"):
        pool = len(settings("basic", "googleAPIKeys")) * settings("basic", "urlCount")
        if len(urls) < pool:
            return(len(urls))
        return(pool)
    if len(urls) < pool:
        return(len(urls))
    return(settings("basic", "processes"))

async def requestGoogleAPI(session, task):
    startTime = time.time()
    async with session.post('https://searchconsole.googleapis.com/v1/urlTestingTools/mobileFriendlyTest:run', params=apiCodeToRequest(task["api"]),    headers = generateApiHeaders(),    data=urlToRequest(task["url"])) as response:
        raw = await response.text()
        print(f'+++ {decodeApiResponse(raw)} ||| {task["url"]} |@|{task["api"]} ||| {str(round(time.time()-startTime))} SEC')
        if decodeApiResponse(raw) == "FAILED":
            print(raw)
            return(False)
        if decodeApiResponse(raw) == "COMPLETE":
            updateSubmittedUrls(task["id"], True)
        else:
            updateSubmittedUrls(task["id"], False)
        return(True)

def decodeApiResponse(raw):
    try:
        json_data = json.loads(raw)
        status = json_data['testStatus']['status']
        return(status)
    except Exception as e:
        print(e)
        return("FAILED")

def decodeApiResponseMobility(raw):
    try:
        json_data = json.loads(raw)
        status = json_data['mobileFriendliness']
        return(status)
    except Exception as e:
        print(e)
        return("FAILED")

def apiCodeToRequest(apiCode):
    data = {'key': apiCode}
    return(data)

def generateApiHeaders():
    headers = {'Content-Type': 'application/json'}

def urlToRequest(url):
    data = {"url": url}
    return(data)

def rdmNoStr(fm, to):
    no = str(random.randrange(999, 9999999))
    return(no)

if __name__ == "__main__":
    main()
 
Back
Top