LibreOffice plugin to pipe whole Writer documents through Google Translate, that ought to keep most of the page formatting.

⌈⌋ branch:  PageTranslate


Artifact [329c6810ca]

Artifact 329c6810ca33edf271b4c73c33c68124cbc48764:

  • File pythonpath/translationbackends.py — part of check-in [c62b11cb0e] at 2021-06-10 14:53:13 on branch trunk — Change exception names, and use `LangSelection` for dialog (makes error more understandable). (user: mario size: 24701)

# encoding: utf-8
# api: pagetranslate
# type: classes
# category: language
# title: via_* translation backends
# description: hooks up the translation services (google, mymemory, deepl, ...)
# version: 1.9
# state: beta
# depends: python:requests (>= 2.5), python:langdetect, python:translate, python:deep-translator
# config:
#    { name: backend, type: str, value: "Google Translate", description: backend title }
#    { name: api_key, type: str, value: "", description: API key }
#    { name: email, type: str, value: "", description: MyMemory email }
#    { name: cmd, type: str, value: "translate-cli -o {text}", description: cli program }
#
# Different online service backends and http interfaces are now coalesced here.
# Each class handles sentence/blockwise transfer to one of the online machine
# translators to get text snippets transformed.
#
# The primary function is .translate(), with .linebreakwise() being used for
# table-cell snippets. Language from/to are passed through .__init__(params).
#
# translate-python or deep-translator are loaded on demand, as to not impose
# a dependency unless the according backends are actually used. (Configuration
# now uses params["backend"] with some fuzzy title mapping in assign_service().)
#


# modules
import re, json, time, uuid, html, sys
import os, subprocess, shlex
from random import randrange as rand
from httprequests import http, urlencode, quote, quote_plus
import logging as log
from traceback import format_exc


# regex
class rx:
    # Google Translate
    gtrans = re.compile('class="(?:t0|result-container)">(.+?)</div>', re.S)
    
    # text block splitting
    split1900 = re.compile("(.{1,1895}\.|.{1,1900}\s|.*$)", re.S)
    split500 = re.compile("(.{1,495}\.|.{1,500}\s|.*$)", re.S)
    
    # content detection
    empty = re.compile("^[\s\d,.:;§():-]+$")
    letters = re.compile("\w\w+", re.UNICODE)
    breakln = re.compile("\s?/\s?#\s?§\s?/\s?", re.UNICODE)


# Google Translate (default backend)
#
#  · calls mobile page http://translate.google.com/m?hl=en&sl=auto&q=TRANSLATE
#  · iterates over each 1900 characters
#
class google:

    def __init__(self, params={}):
        self.params = params  # config+argparse
        self.max_len = 1900
        self.rx_split = rx.split1900

    # request text translation from google
    def fetch(self, text, dst_lang="en", src_lang='auto'):
        # fetch translation page
        url = "https://translate.google.com/m?tl=%s&hl=%s&sl=%s&q=%s" % (
            dst_lang, dst_lang, src_lang, quote_plus(text.encode("utf-8"))
        )
        result = http.get(url).content.decode("utf-8")
        # extract content from text <div>
        m = rx.gtrans.search(result)
        if m:
            text = m.group(1)
            text = self.html_unescape(text)
        else:
            log.warning("NO TRANSLATION RESULT EXTRACTED: " + html)
            log.debug("ORIG TEXT: " + repr(text))
        return text

    # decode HTML entities
    def html_unescape(self, s):
        try:
            return html.unescape(s)
        except:
            return s.replace("&#39;", "'").replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">").replace("&quot;", '"')
            
    # skip snippets that are empty-ish or too short for translating
    def skip(self, text):
        #log.debug("translate %d chars" % len(text))
        if len(text) < 2:
            log.debug("skipping/len<2")
            return True
        elif rx.empty.match(text):
            log.debug("skipping/empty")
            return True
        elif not rx.letters.search(text):
            log.debug("skipping/noletters")
            return True

    # language detection (if from==auto, try to deduce it; required by some backends)
    def lang(self, text, lang=None):
        lang = lang or self.params["from"]
        if lang in ("auto", "", "select"):
            try:
                import langdetect
                lang = langdetect.detect(text)
            except:
                log.warning("`pip install langdetect` for best results\n"+format_exc())
                lang = "en"
        return lang

    # iterate over text segments (1900 char limit)        
    def translate(self, text, lang="auto"):
        if lang == "auto":
            lang = self.params["lang"]
        if self.skip(text):
            return text
        elif len(text) >= self.max_len:
            log.debug("spliterate/%s+" % self.max_len)
            return " ".join(self.fetch(segment, lang) for segment in self.rx_split.findall(text))
        else:
            return self.fetch(text, lang)
            
    # translate w/ preserving paragraph breaks (meant for table cell content)
    def linebreakwise(self, text, lang="auto"):
        if not self.params.get("quick"):
            # split on linebreaks and translate each individually
            text = "\n\n".join(self.translate(text) for text in text.split("\n\n"))
        else:
            # use temporary placeholder `/#§/`
            text = self.translate(text.replace("\n\n", u"/#§/"))
            text = re.sub(rx.breakln, "\n\n", text)
        return text

# variant that uses the AJAX or API interface
class google_ajax(google):
    # request text translation from google
    def fetch(self, text, dst_lang="en", src_lang='auto'):
        r = http.get(
            url="https://translate.googleapis.com/translate_a/single",
            params={
                "client": "gtx",
                "sl": self.params["from"],
                "tl": self.params["lang"],
                "dt": "t",
                "q": text
            }
        )
        if r.status_code == 200:
            r = r.json()   # request result should be JSON, else client was probably blocked
            #log.debug("'" + text + "' ==> " + repr(r))
            text = "".join([s[0] for s in r[0]])  # result is usually wrapped in three lists [[[u"translated text", u"original", None, None, 3, None, None, [[]] → one per sentence
        else:
            log.debug("AJAX ERROR: " + repr(r))
        return text


# DeepL online translator
#  · will easily yield HTTP 429 Too many requests,
#    so probably not useful for multi-paragraph translation anyway (just text selections)
#  · uses some kind of json-rpc
#
# data origins:
#  · https://www.deepl.com/translator = nothing
#  · jsonrpcId = random integer
#  · sessionId = random client-side guid
#      (https://www.deepl.com/js/translator_glossary_late.min.js?v=… → generated in `function u()`)
#  · instanceId
#      (https://www.deepl.com/PHP/backend/clientState.php?request_type=jsonrpc&il=EN → "uid":"(.+=)")
#  · LMTBID cookie
#      (https://s.deepl.com/web/stats?request_type=jsonrpc ← jsonrpc+session+instId+clientinfos)
#
# translation requests:
#  < https://www2.deepl.com/jsonrpc
#    cookies: LMTBID: GUID...
#    referer: https://www.deepl.com/translator
# repsonse  body:
#  > result.translations[0].beams[0].postprocessed_sentence
#
class deepl_web(google):
    
    def __init__(self, params):
        self.params = params
        self.lang = params["lang"].upper()
        self.id_ = rand(202002000, 959009000) # e.g. 702005000, arbitrary, part of jsonrpc req-resp association
        self.sess = str(uuid.uuid4())    # e.g. 233beb7c-96bc-459c-ae20-157c0bebb2e4
        self.inst = ""   # e.g. ef629644-3d1b-41a4-a2de-0626d23c99ee
        
        # fetch homepage (redundant)
        html = http.get("https://www.deepl.com/translator").text  # should fetch us the cookie / No, it doesn't
        self.versions = dict(re.findall("([\w.]+)\?v=(\d+)", html))
        #print(html)
        
        # instanceId from clientState…
        j = http.post(
            "https://www.deepl.com/PHP/backend/clientState.php?request_type=jsonrpc&il=EN",
            data = json.dumps({"jsonrpc":"2.0", "method":"getClientState", "params":{"v":"20180814"}, "id":self.id()})
        ).json()
        self.inst = j.get("clientVars", {}).get("uid", "")
        print(j)
        
        # aquire LMTBID cookie (not sure if needed)
        h = http.post(
            "https://s.deepl.com/web/stats?request_type=jsonrpc",
            data = json.dumps({
                "jsonrpc":"2.0", "method":"WebAppPushStatistics", "id":self.id(),
                "params":{
                    "value":{
                        "instanceId":self.inst,
                        "sessionId":self.sess,
                        "event":"web/pageview",
                        "url":"https://www.deepl.com/translator",
                        "userAgent":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:72.0) Gecko/20100101 Firefox/72.0",
                        "resolution":{"width":1920,"height":1080,"devicePixelRatio":1,"viewportWidth":1900,"viewportHeight":916},
                        "data":{"referrer":""}
                    }
                }
            })
        )
        print(h.headers)

    def id(self):
        self.id_ += 1
        return self.id_
        
    def rpc(self, text):
        return json.dumps({
           "jsonrpc" : "2.0",
           "method" : "LMT_handle_jobs",
           "id" : self.id(),
           "params" : {
              "lang" : {
                 "target_lang" : self.lang,
                 "user_preferred_langs" : [
                    self.lang,
                    "EN"
                 ],
                 "source_lang_user_selected" : "auto"
              },
              "timestamp" : int(time.time()*1000),
              "priority" : -1,
              "commonJobParams" : {},
              "jobs" : [
                 {
                    "raw_en_context_after" : [],
                    "raw_en_context_before" : [],
                    "kind" : "default",
                    "preferred_num_beams" : 4,
                    "raw_en_sentence" : text,
                    "quality" : "fast"
                 }
              ]
           }
        })

    def translate(self, text):
        # skip empty paragraph/table snippets
        if self.skip(text):
            return text
        
        # delay?
        time.sleep(rand(1, 15) / 10.0)
        
        # request
        r = http.post(
            "https://www2.deepl.com/jsonrpc",
            data=self.rpc(text),
            headers={"Referer": "https://www.deepl.com/translator", "Content-Type": "text/plain"}
        )
        if r.status_code != 200:
            log.error(repr(r.content))
            return text
            #return r, r.content
        
        # decode
        r = r.json()
        print(r)
        if r.get("result"):
            return r["result"]["translations"][0]["beams"][0]["postprocessed_sentence"]
        else:
            return text


# DeepL API
#
# So, there's a free API and the pro API now. This might make the _web scraping
# dancearound redundant. The free API is certainly more enticing for testing.
# In general, DeepL provides a more streamlined translation than GoogleTranslate.
# It's mostly in here because the API is quite simple.
#
# ENTIRELY UNTESTED
#    
class deepl_api(deepl_web):

    def __init__(self, params):
        self.params = params
        self.api_url = "https://api.deepl.com/v2/translate"
        
    def translate(self, text, preserve=0):
        # skip empty paragraph/table snippets
        if self.skip(text):
            return text

        # https://www.deepl.com/docs-api/translating-text/request/
        r = http.get(
            self.api_url, params={
                "auth_key": self.params["api_key"],
                "text": text,
                "target_lang": self.params["lang"],
                "split_sentences": "1",
                "preserve_formatting": str(preserve)
                #"tag_handling": "xml"
            }
        )
        if r.status_code == 200:
            r = r.json().get("translations")
            if r:
                return r[0]["text"]
        else:
            log.error(repr(r))
            if r.status_code == 403:
                r.status = "Authorization/API key invalid"
            if not hasattr(r, "status"):
                r.status = "???"
            raise ConnectionRefusedError(r.status_code, r.status, r.headers)
        return text
    
    def linebreakwise(self, text):
        return self.translate(text, preserve=1)

# DeepL free API
#
# Registration is broken (error 10040 or whatever, "contact support" lel), even though
# it seems to create an account regardless; but API yields SSL or connection errors.
# Thus STILL UNTESTED.
#
class deepl_free_api(deepl_api):
    def __init__(self, params):
        self.params = params
        self.api_url = "https://api-free.deepl.com/v2/translate"


# Translate-python
# requires `pip install translate`
#
#  · provides "microsoft" backend (requires OAuth secret in api_key)
#
#  · or "mymemory" (with email in `email` instead)
#
# https://translate-python.readthedocs.io/en/latest/
#
class translate_python(google):

    def __init__(self, params={}):
        self.params = params  # config+argparse
        #self.error = pagetranslate.MessageBox

        Translator = None
        try:
            from translate import Translator
        except:
            log.error(format_exc())
            raise ImportException("Run `pip install translate` to use this module.")
            
        # interestingly this backend function might just work as is.
        if re.search("mymemory", params.get("backend", ""), re.I):
            self.translate = Translator(
                provider="mymemory", to_lang=params["lang"], email=params.get("email", "")
            ).translate
        else:
            self.translate = Translator(
                provider="microsoft", to_lang=params["lang"], secret_access_key=params["api_key"]
            ).translate

        # though .linebreakwise has no equivalent, not sure if necessary,
        # or if formatting/linebreaks are preserved anyway
        # (or: we might just use the default google. implementation)
        #self.linebreakwise = self.translate

    translate = None
    #linebreakwise = None



# deep-translator
# requires `pip install deep-translator`
#  · more backends than pytranslate,
#    though PONS+Linguee are just dictionaries
#  → https://github.com/nidhaloff/deep-translator
#
class deep_translator(google):

    def __init__(self, params={}):
        # config+argparse
        self.params = params
        backend = params.get("backend", "Pons")
        langs = {
            "source": self.coarse_lang(params.get("from", "auto")),
            "target": self.coarse_lang(params.get("lang", "en")),
        }
        api_key = {
            "api_key": params["api_key"]
        }
        # import
        import functools
        import deep_translator
        # map to backends / uniform decorators
        backend = [
            id for id in ["linguee", "pons", "QCRI", "yandex", "deepl", "free", "microsoft", "papago"] if re.search(id, backend, re.I)
        ]
        log.info(backend)
        if "linguee" in backend:
            self.translate = self.from_words(deep_translator.LingueeTranslator(**langs).translate)
        elif "pons" in backend:
            self.translate = self.from_words(deep_translator.PonsTranslator(**langs).translate)
        elif "QCRI" in backend:
            self.translate = functools.partial(deep_translator.QCRI(**api_key).translate, **langs)
        elif "yandex" in backend:
            self.translate = functools.partial(deep_translator.YandexTranslator(**api_key).translate, **langs)
        elif "deepl" in backend:
            self.translate = deep_translator.DeepL(api_key=params["api_key"], use_free_api=("free" in backend), **langs).translate
        elif "microsoft" in backend:
            self.translate = deep_translator.MicrosoftTranslator(api_key=params["api_key"], **langs).translate
        elif "papago" in backend:
            client_id, secret_key = params["api_key"].split(":") # api_key must contain `clientid:clientsecret`
            self.translate = deep_translator.PapagoTranslator(client_id=client_id, secret_key=secret_key, **langs).translate
        else:
            self.translate = deep_translator.GoogleTranslator(**langs).translate

    # shorten language co-DE to just two-letter moniker
    def coarse_lang(self, id):
        if id.find("-") > 0:
            id = re.sub("(?<!zh)-\w+", "", id)
        return id
    
    # decorator to translate word-wise
    def from_words(self, fn):
        def translate(text):
            words = re.findall("(\w+)", text)
            words = { w: fn(w) for w in list(set(words)) }
            text = re.sub("(\w+)", lambda m: words.get(m[0], m[0]), text)
            return text
        return translate

    translate = None
    #linebreakwise = None


# MyMemory, only allows max 500 bytes input per API request. Therefore reusing
# the Google backend, but with a different rx_split.
#
# We kinda need the source language here, as mymem provides no "auto" detection.
# Thus importing langdetect here, else fall back to "en". The alternative would
# be fiddling with OOs paragraph locales again, and turning it into a full on
# usability nightmare.
#
# doc:
#   https://mymemory.translated.net/doc/spec.php
# errs:
#   'PLEASE SELECT TWO DISTINCT LANGUAGES'
#   'INVALID EMAIL PROVIDED'
#   'AUTO' IS AN INVALID SOURCE LANGUAGE . EXAMPLE: LANGPAIR=EN|IT USING 2 LETTER ISO OR RFC3066 LIKE ZH-CN. ALMOST ALL LANGUAGES SUPPORTED BUT SOME MAY HAVE NO CONTENT"
#   'SELECT' IS AN INVALID SOURCE LANGUAGE . EXAMPLE: LANGPAIR=EN|IT USING 2 LETTER ISO OR RFC3066 LIKE ZH-CN. ALMOST ALL LANGUAGES SUPPORTED BUT SOME MAY HAVE NO CONTENT"
#
class mymemory(google):

    def __init__(self, params={}):
        self.params = params  # config+argparse
        self.max_len = 500
        self.rx_split = rx.split500

    # API    
    def fetch(self, text, lang="en", src_lang="en"):
        src_lang = self.lang(text)
        if lang == src_lang:
            log.info("Skipping "+src_lang+"|"+lang)
            return text
        # call
        url = "https://api.mymemory.translated.net/get?q=%s&langpair=%s|%s&of=json&mt=1" % (
            quote_plus(text.encode("utf-8")), src_lang, lang
        )
        if self.params.get("email"):
            url = url + "&de=" + self.params["email"]
        # any exceptions are covered in main
        j = http.get(url).content.decode("utf-8")
        log.debug(j)
        j = json.loads(j)
        if j["responseStatus"] in ("200", 200):
            text = j["responseData"]["translatedText"]
            # or match[0]…
        else:
            raise RuntimeError(j)
        return text


# Because, why not?
# Invokes a commandline tool for translating texts.
# The "cmd" can be:
#
#    `translate-cli -t {text}`
# Or
#    `deep_translator -trans "google" -src "auto" -tg {lang} -txt {text}`
#
# Don't quote placeholders {}, {text} or {lang} in the command.
#
class cli(google):

    def __init__(self, params):
        self.params = params
        self.cmd = params.get("cmd", "translate-cli -o -f auto -t {lang} {text}")

    # pipe text through external program
    def translate(self, text):
        if self.skip(text):
            return text
        cmd = [self.repl(arg, text, self.params) for arg in shlex.split(self.cmd)]
        try:
            proc = subprocess.run(cmd, stdout=subprocess.PIPE)
            return proc.stdout.decode("utf-8")
        except AttributeError as e:
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
            proc.wait()
            return proc.stdout.read().decode("utf-8")

    # substitute placeholders: {}, {text} or $lang or %source%
    def repl(self, arg, text, params):
        repl = {
            "text|\}": text,
            "lang|target|to": params["lang"],
            "from|source": params["from"]
        }
        for k,v in repl.items():
            if re.match("""^["']?[\{%$]" + k + "[\}%$]?["']?$""", arg):
                return v
        return arg


# PONS text translation
#
# This is a mix of web scraping and API usage. It's not an official API,
# so unlikely to last. Unlike the PonsTranslator in D-L, this one uses
# the full text translation interface, not the dictionary.
#
class pons(google):
    init_url = "https://en.pons.com/text-translation"
    api_url = "https://api.pons.com/text-translation-web/v4/translate?locale=en"

    def __init__(self, params):
        self.params = params
        self.session = self.impressionId()

    # fetch from v4 api    
    def fetch(self, text, dst_lang="de", src_lang='en'):
        r = http.post(
            self.api_url,
            json = {
                "impressionId": self.session,
                "sourceLanguage": src_lang,
                "targetLanguage": dst_lang,
                "text": text
            }
        ).json()
        if r.get("serviceMessage"):
            raise RuntimeError(r)
        elif r.get("text"):
            #log.debug(f"'{text}' ==> {repr(r)} // {src_lang}→{dst_lang}")
            return r["text"]
        else:
            return text
    
    # detect language, and avoid non-text snippets
    def translate(self, text):
        if self.skip(text):
            return text
        r = self.fetch(text, self.params["lang"], self.lang(text))
        if r:
            return r
        else:
            return text

    # invoked once to get session identifier        
    def impressionId(self):
        html = http.get(self.init_url).text
        return re.findall(""" ["']?impressionId["']? \s*[:=]\s* ["'](\w+-[\w-]+-\w+)["'] """, html, re.X)[0]


# SYSTRAN Translate API
# · https://docs.systran.net/translateAPI/translation/
# · also requires an API key (seemingly not available in trial subscription)
#
class systran(google):
    url = "https://api-translate.systran.net/translation/text/translate?key=YOUR_API_KEY&input=&target=&source="
    #url = "/compatmode/google/language/translate/v2?q=..&target=lang"
    def fetch(self, text, target="en", source="auto"):
        r = http.post(
            url=self.url,
            params={
                "q": text,
                "target": target,
                "source": source,
                #"key": self.params["api_key"],
            },
            headers={
                "Authorization": "Bearer " + self.params["api_key"]
            }
        )
        data = r.json()   # if not JSON response, we probably ran into a HTTP/API error
        #log.debug(repr(data))
        if data.get("error"):
            raise ConnectionRefusedError(data["error"], r.status_code, r.headers) 
        else:
            text = data["outputs"][0]["output"]  # nested result structure
        return text

# ArgosTranslate
#
#  · offline translation package (OpenNMT)
#  · comes with a GUI to install readymade models
#  · only works with distro-supplied libreoffice+python binding, not any /opt/… setups
#
class argos(google):

    def chpath(self):
        pass # PYTHONPATH has no effect on numpy import errors, seems to work only with distro-bound python installs

    def translate(self, text):
        source, target = self.lang(text), self.params["lang"]
        if source == target:
            raise ValueError("Can't have same source and target language")
        pair = self.get_langpair(source, target)
        #self.translate = pair.translate
        return pair.translate(text)

    def get_langpair(self, source, target):
        import argostranslate.translate
        model = { m.code: m for m in argostranslate.translate.get_installed_languages() }
        try:
            return model[source].get_translation(model[target])
        except Exception as e:
            raise ValueError("Requested language model/pair ({}→{}) not found, use `argos-translate-gui` to download/install the combination".format(source, target))


# maps a pagetranslate.t.* object (in main module),
# according to configured backend (now a string)
def assign_service(params):
    w = params.get("backend", "Google")
    map = {
        "^google$ | ^google [\s\-_] translate$": google,
        "^google.*ajax": google_ajax,
        "^deepl [\s_] web": deepl_web,
        "^deepl [\s_] (api|pro)": deepl_api,
        "^deepl \s free": deepl_free_api,
        "^mymemory | translated\.net": mymemory,
        "^pons \s text": pons,
        "^systran": systran,
        "^argos": argos,
        "^command | ^CLI | tool | program": cli,
        "^microsoft | translate[_-]py | ^T-?P: | \(T-?P\)": translate_python,
        "linguee | pons\sdict | QCRI | yandex | ^D-?T: | \(D-?T\)": deep_translator,
    }
    for rx, cls in map.items():
        if re.search(rx, w, re.I|re.X):
            break
    else:
        cls = google
    return cls(params)