import sys
!{sys.executable} -m pip install toolforge
Collecting toolforge
  Using cached
Requirement already satisfied: requests in /srv/paws/lib/python3.6/site-packages (from toolforge)
Requirement already satisfied: pymysql in /srv/paws/lib/python3.6/site-packages (from toolforge)
Requirement already satisfied: chardet<3.1.0,>=3.0.2 in /srv/paws/lib/python3.6/site-packages (from requests->toolforge)
Requirement already satisfied: urllib3<1.24,>=1.21.1 in /srv/paws/lib/python3.6/site-packages (from requests->toolforge)
Requirement already satisfied: idna<2.8,>=2.5 in /srv/paws/lib/python3.6/site-packages (from requests->toolforge)
Requirement already satisfied: certifi>=2017.4.17 in /srv/paws/lib/python3.6/site-packages (from requests->toolforge)
Installing collected packages: toolforge
Successfully installed toolforge-4.1.0
import gzip
from collections import defaultdict, Counter
from typing import NamedTuple
import os
import random
from textwrap import dedent
import logging
import itertools
import io
import math

import toolforge
FILE = '/public/dumps/pageviews/2018/2018-09/projectviews-20180918-180000'
FILE2 = '/public/dumps/pageviews/2018/2018-09/pageviews-20180918-180000.gz'
with open(FILE, 'r') as f:
    project_names = { line.split(' ')[0] for line in f.readlines() }
! zcat {FILE2} | head
aa Main_Page 3 0
aa Special:UncategorizedPages 1 0
aa User:C-M 1 0
aa User:Jedudedek 1 0
aa User:PPerviz 1 0
aa Wikipedia:Community_Portal 4 0
aa.b Main_Page 1 0
aa.d Main_Page 5 0
aa.d Special:Book 1 0
aa.d Special:ElectronPdf 1 0

gzip: stdout: Broken pipe
# These elements must be combined with the language
suffix_map = dict(
    z = 'wiki',
    d = 'wiktionary',
    b = 'wikibooks',
    n = 'wikinews',
    q = 'wikiquote',
    s = 'wikisource',
    v = 'wikiversity',
    voy = 'wikivoyage',
    m = 'wiki',

# These elements have no language
complete_map = {
    'w': 'mediawikiwiki',
    'wd': 'wikidatawiki',

# These are associated with "m" and have no language
wikimedia = set(['bd', 'dk', 'mx', 'nyc', 'rs', 'ua'])

# Special mappings for exceptional cases
special_map = { 'be_tarask': 'be_x_oldwiki' }

def database_from_project_name(project_name:str) -> str:
    """Find database name corresponding to project name
    This function takes a page views project name as described in
    and returns a database name as described in 
    suitable for use with toolforge.connect()"""
    labels = project_name.split('.')
    result = None
    if len(labels) == 2 and labels[0] in ['www', 'm', 'zero'] and labels[1] in complete_map:
        result = complete_map[labels[1]]
    elif labels[-1] == 'm' and labels[0] in wikimedia and (len(labels) == 2 or labels[1] in ['m', 'zero']):
        result = labels[0] + 'wikimedia'
        prefix = labels[0].replace('-', '_')

        if len(labels) > 1 and labels[1] in ['m', 'zero']:
            del labels[1]
        if prefix in special_map:
            result = special_map[prefix]
            site = labels[1] if len(labels) > 1 else 'z'
            if site in suffix_map:
                result = prefix + suffix_map[site]

    if result is not None and result not in smdn:
        result = None

    return result
database_names = { database_from_project_name(x) for x in project_names}
{ x for x in project_names if database_from_project_name(x) is None }
{ x for x in project_names if 'wd' in x}
{'m.wd', 'www.wd', 'zero.wd'}
def connect_to_database(dbname):
    conn = toolforge.connect(dbname, 
    return conn
site_matrix = toolforge._fetch_sitematrix()['sitematrix']
def sm_database_names(data):
    for k,v in data.items():
        if k.isdigit():
            for site in v['site']:
                yield site['dbname']
        elif k == 'specials':
            for site in v:
                yield site['dbname']
smdn = set(sm_database_names(site_matrix))
from collections import defaultdict

def chunk_and_partition(items, key, chunk_size=None, max_unprocessed=None, max_buckets=None):
    """Partitions items and processes in chunks
    Suppose you have an iterable of items that you want to process in chunks, 
    but in each chunk processed the items must all be "the same kind".  
    This function splits items from the input iterator into buckets according to the
    partitioning key, and yields (key, items) pairs.
    It will yield the largest bucket whenever any of the following hold:
        * A bucket (necessarily the largest) reaches ``chunk_size``
        * The total number of unprocessed items reaches ``max_unprocessed``
        * The total number of buckets would exceed ``max_buckets``
    After the input has been exhausted, remaining items are yielded in arbitrary order.

    Chunking will work well if the input has large run-lengths by partition key
    (e.g. if it is already sorted, or mostly sorted),
    or if the ratio between ``max_unprocessed`` and ``chunk_size`` 
    (or the maximum number of buckets) is comparable 
    to the number of distinct partition keys;
    otherwise the actual chunk size will tend to be smaller than ``chunk_size``.
    If none of ``chunk_size``, ``max_unprocessed`, and ``max_buckets`` is specified,
    then the entire input list will be partitioned before processing (cf ``itertools.groupby``)
        items: Iterable of items to process
        key: Function that takes an item and returns a partitioning key (e.g. a string)
        chunk_size: Maximum number of items per chunk 
        max_unprocessed: Maximum number of items to hold unprocessed
        max_buckets: Maximum number of buckets
        partition: Result of ``key``
        items: List of items from the input
        list(chunk_and_partition(range(1,25), lambda i: i%(int((i+3)/3)), chunk_size=3, max_buckets=4, max_unprocessed=5))
    def check_optional_positive_integer(x, name):
        assert x is None or (x > 0 and isinstance(x, int)), f"{name} must be either None or a positive integer"
    check_optional_positive_integer(chunk_size, "chunk_size")
    check_optional_positive_integer(max_unprocessed, "max_unprocessed")
    check_optional_positive_integer(max_buckets, "max_buckets")
    cache = defaultdict(list)
    n_unprocessed = 0 # Total length of lists in cache
    def pop_largest():
        nonlocal cache, n_unprocessed
        p, ii = max(cache.items(), key=lambda p_ii: len(p_ii[1]))
        del cache[p]
        n_unprocessed -= len(ii)
        return (p, ii)
    for item in items:
        partition = key(item)
        if max_buckets is not None and len(cache) == max_buckets and partition not in cache:
            yield pop_largest()
        n_unprocessed += 1
        if chunk_size is not None and len(cache[partition]) == chunk_size:
            # we already know which one is largest
            yield (partition, cache[partition])
            del cache[partition]
            n_unprocessed -= chunk_size
        elif max_unprocessed is not None and n_unprocessed == max_unprocessed:
            yield pop_largest()
        # At this point:
        # * The maximum length is less than chunk_size
        # * The total length is less than max_unprocessed

    # Now process the remaining items in arbitrary order
    for p, ii in cache.items():
        yield (p, ii)
def chunks(iterable, size=10):
    """ Breaks down an iterable into chunks. """
    iterator = iter(iterable)
    for first in iterator:
        # Need to realize list if results are passed to
        yield itertools.chain([first], itertools.islice(iterator, size - 1))
def convert_titles_to_qids(dbname, titles):
    """Convert set of log entries into Wikidata ids.
        dbname: Name of database suitable for passing to ``toolforge.connect()``
        titles: Iterable of page titles.
        qids: Parallel list of Wikidata ids (or None)
    titles = list(titles) # reiterable
    results = dict()
    logger = logging.getLogger(__name__)"dbname={dbname} titles={len(titles)}")

    def sql_list_of_strings(cursor, ss):
        """Returns SQL list of strings, appropriately escaped"""
        return "(" + ", ".join(cursor.connection.escape(s) for s in ss) + ")"
    def get_results(cursor, sql):
        """Given some sql for ``title`` and ``qid``,
        execute and add to ``results``.
        nonlocal logger, results
        sql = dedent(sql)
        cursor.execute(f"USE {dbname}_p;")
        n_results = 0

        for title, qid in cursor:
            title = title.decode()
            # I don't know why, but there's a handful of items that have a lower-case "q".  Probably historical.
            qid = qid.decode().upper()
            if title not in titles:
                logger.error(f"Unexpected title {title} for QID {qid}")
            results[title] = qid
            n_results += 1
        return n_results

    def get_results_direct(cursor, titles):
        """For some set of titles, try to get results for direct sitelinks"""
        # See this link for why we have to use a namespace filter
        sql = f"""
            SELECT page_title, pp_value 
            FROM page, page_props
            WHERE page_namespace = 0
            AND page_title IN {sql_list_of_strings(cursor, titles)}
            AND page_id = pp_page
            AND pp_propname = 'wikibase_item';
        return get_results(cursor, sql)

    def get_results_redirect(cursor, titles):
        """For some set of titles, try to get results as a redirect"""
        # See this link for why we have to use a namespace filter
        sql = f"""
            SELECT p1.page_title, pp_value
            FROM page AS p1, redirect, page AS p2, page_props
            WHERE p1.page_namespace = 0
            AND p1.page_title IN {sql_list_of_strings(cursor, titles)}
            AND p1.page_is_redirect
            AND p1.page_id = rd_from
            AND rd_namespace = 0
            AND rd_interwiki = ""
            AND rd_fragment = ""
            AND p2.page_namespace = 0
            AND rd_title = p2.page_title
            AND p2.page_id = pp_page
            AND pp_propname = 'wikibase_item';
        return get_results(cursor, sql)

    n_direct = 0
    n_redirect = 0
    with connect_to_database(dbname) as cursor:    
        for chunk in chunks(titles, 10000):
            n_direct += get_results_direct(cursor, chunk)

        remaining = [ title for title in titles if title not in results ]

        if remaining:        
            for chunk in chunks(remaining, 10000):
                n_redirect += get_results_redirect(cursor, chunk)"convert_titles_to_qids: converted {len(titles)} titles into {len(results)} QIDs "
                f"({n_direct} direct and {n_redirect} redirect)")

    return [ results.get(title) for title in titles ]
class LogEntry(NamedTuple):
    """Lightweight class representing a line from the log file"""
    project: str
    title: str
    views: int
    def dbname(self):
        return database_from_project_name(self.project)

def read_log(file):
    """Read the gzipped logfile and yield log entries"""
    with io.BufferedReader(gzip.GzipFile(file, 'r')) as f:
        for line in f.readlines():
            project, title, views, _ = line.decode().split(' ')
            yield LogEntry(project, title, int(views))
display(list(itertools.islice(read_log(FILE2), 10)))
[LogEntry(project='aa', title='Main_Page', views=3),
 LogEntry(project='aa', title='Special:UncategorizedPages', views=1),
 LogEntry(project='aa', title='User:C-M', views=1),
 LogEntry(project='aa', title='User:Jedudedek', views=1),
 LogEntry(project='aa', title='User:PPerviz', views=1),
 LogEntry(project='aa', title='Wikipedia:Community_Portal', views=4),
 LogEntry(project='aa.b', title='Main_Page', views=1),
 LogEntry(project='aa.d', title='Main_Page', views=5),
 LogEntry(project='aa.d', title='Special:Book', views=1),
 LogEntry(project='aa.d', title='Special:ElectronPdf', views=1)]
log_entries = [ le for le in read_log(FILE2) if random.random() < 0.01 ]
def process_log_entries(log_entries):
    """Process log entries into (qid,views) pairs.
    Additionally yields two "logging" pairs to report unconverted entries.
    unconverted_titles = 0
    unconverted_views = 0
    logger = logging.getLogger(__name__)
    for dbname, log_entries in chunk_and_partition(log_entries, key=lambda le: le.dbname(), 
                                                   max_buckets=3, chunk_size=10000):
        titles = (le.title for le in log_entries)
        if dbname == 'wikidatawiki':
            qids = [ title if title.startswith("Q") else None for title in titles ]
  "Wikidata special case: {len(log_entries)} converted to {sum(qid is not None for qid in qids)}")
            qids = convert_titles_to_qids(dbname, titles)
        for le, qid in zip(log_entries, qids):
            if qid is not None:
                yield (qid, le.views)
                unconverted_titles += 1
                unconverted_views += le.views"Failed to convert {unconverted_titles} titles representing {unconverted_views} views")
    yield ("Q0", unconverted_views) # File these under a fake id so they're in our total
%%prun -s cumulative processed = list(process_log_entries(itertools.islice(read_log(FILE2), 750000))) display(len(processed))
qid_views = Counter(process_log_entries(read_log(FILE2)))
CPU times: user 16min 13s, sys: 13.8 s, total: 16min 27s
Wall time: 34min 55s
qv = dict()
for qid, views in qid_views:
    if qid in qv:
        qv[qid] += views
        qv[qid] = views
CPU times: user 4.32 s, sys: 160 ms, total: 4.48 s
Wall time: 4.51 s
The history saving thread hit an unexpected error (OperationalError('disk I/O error',)).History will not be written to the database.

n_qids = len(qid_views)
max(v for k,v in qid_views.items())
[ (qid, views) for qid,views in qid_views if qid in ['Q0', 'Q1', 'Q30']]
[('Q30', 1),
 ('Q1', 1),
 ('Q30', 38),
 ('Q30', 4),
 ('Q30', 91),
 ('Q1', 3),
 ('Q30', 6),
 ('Q30', 3),
 ('Q30', 2),
 ('Q1', 2),
 ('Q30', 11),
 ('Q30', 7),
 ('Q30', 14),
 ('Q1', 6),
 ('Q1', 5),
 ('Q30', 5),
 ('Q1', 16),
 ('Q1', 7),
 ('Q30', 28),
 ('Q1', 18),
 ('Q30', 25),
 ('Q1', 26),
 ('Q30', 8),
 ('Q1', 28),
 ('Q30', 103),
 ('Q30', 9),
 ('Q1', 32),
 ('Q30', 204),
 ('Q30', 10),
 ('Q30', 50),
 ('Q30', 32),
 ('Q30', 976),
 ('Q30', 69),
 ('Q30', 30),
 ('Q1', 114),
 ('Q30', 27),
 ('Q30', 13),
 ('Q30', 1165),
 ('Q30', 39),
 ('Q1', 95),
 ('Q30', 12),
 ('Q30', 184),
 ('Q30', 336),
 ('Q1', 97),
 ('Q30', 61),
 ('Q1', 4),
 ('Q30', 29),
 ('Q1', 13),
 ('Q30', 96),
 ('Q1', 23),
 ('Q30', 141),
 ('Q30', 18),
 ('Q1', 8),
 ('Q30', 16),
 ('Q30', 36),
 ('Q1', 11),
 ('Q30', 112),
 ('Q1', 12),
 ('Q30', 20),
 ('Q1', 9),
 ('Q30', 47),
 ('Q1', 17),
 ('Q30', 66),
 ('Q30', 99),
 ('Q1', 20),
 ('Q30', 125),
 ('Q30', 146),
 ('Q1', 33),
 ('Q1', 47),
 ('Q30', 156),
 ('Q1', 37),
 ('Q30', 37),
 ('Q30', 246),
 ('Q30', 23),
 ('Q30', 22),
 ('Q30', 21),
 ('Q0', 2027617)]
total_views = sum(v for k,v in qid_views)
[ qid for qid, views in qid_views ][:20]
max(int(qid[1:]) for qid, views in qid_views if qid.startswith('Q'))
min(int(qid[1:]) for qid, views in qid_views if qid.startswith('Q'))
[('q10280218', 1),
 ('q13148453', 1),
 ('q13170459', 13),
 ('unconverted_titles', 877997),
 ('unconverted_views', 2044096)]
[('Q69431', 1),
 ('Q154705', 14),
 ('Q5997442', 4),
 ('Q30126774', 2),
 ('Q4115874', 2),
 ('Q20016478', 1),
 ('Q23738548', 1),
 ('Q833212', 1),
 ('Q353973', 2),
 ('Q6970139', 2),
 ('Q1569918', 1),
 ('Q2886342', 2),
 ('Q11149812', 1),
 ('Q2714680', 1),
 ('Q3842381', 3),
 ('Q10843661', 3),
 ('Q61215', 1),
 ('Q11542461', 1),
 ('Q516017', 1),
 ('Q2757102', 2),
 ('Q7440421', 1),
 ('Q4806573', 2),
 ('Q28457484', 1),
 ('Q5433964', 1),
 ('Q5178768', 2),
 ('Q11996992', 2),
 ('Q82652', 9),
 ('Q2565770', 3),
 ('Q3456475', 1),
 ('Q7187226', 1),
 ('Q5938707', 1),
 ('Q2031854', 1),
 ('Q1128259', 13),
 ('Q55617395', 1),
 ('Q5900712', 5),
 ('Q20648980', 3),
 ('Q1753224', 2),
 ('Q27983737', 5),
 ('Q665785', 19),
 ('Q4595892', 1),
 ('Q6639947', 1),
 ('Q34495', 1),
 ('Q59695', 1),
 ('Q4545836', 1),
 ('Q705141', 2),
 ('Q19866383', 2),
 ('Q8047733', 3),
 ('Q4435329', 4),
 ('Q3247863', 2),
 ('Q34147449', 2),
 ('Q7184951', 1),
 ('Q205258', 14),
 ('Q1349227', 13),
 ('Q55600357', 2),
 ('Q4880851', 8),
 ('Q7914695', 1),
 ('Q6178612', 1),
 ('Q6427144', 1),
 ('Q1638072', 1),
 ('Q17716087', 5),
 ('Q3054171', 2),
 ('Q2734309', 1),
 ('Q3635804', 4),
 ('Q9466090', 1),
 ('Q6595117', 1),
 ('Q2838998', 32),
 ('Q4143128', 1),
 ('Q16275726', 1),
 ('Q6640756', 1),
 ('Q17146837', 11),
 ('Q55074863', 1),
 ('Q19636089', 2),
 ('Q5039884', 1),
 ('Q4720591', 1),
 ('Q7546905', 1),
 ('Q5084448', 12),
 ('Q282089', 1),
 ('Q1049186', 1),
 ('Q1972648', 2),
 ('Q80306', 1),
 ('Q299663', 1),
 ('Q7724', 2),
 ('Q192073', 3),
 ('Q62788', 1),
 ('Q113095', 1),
 ('Q3821396', 1),
 ('Q1876405', 1),
 ('Q856', 6),
 ('Q169092', 6),
 ('Q175821', 5),
 ('Q3355098', 4),
 ('Q2301597', 3),
 ('Q2886357', 1),
 ('Q80892', 1),
 ('Q2995893', 1),
 ('Q51617', 3),
 ('Q49152', 2),
 ('Q28053796', 2),
 ('Q352218', 1),
 ('Q703982', 1),
 ('Q1702843', 1),
 ('Q4411116', 2),
 ('Q820575', 3),
 ('Q899673', 5),
 ('Q612323', 1),
 ('Q575178', 3),
 ('Q986', 45),
 ('Q180678', 2),
 ('Q1018260', 1),
 ('Q3232570', 1),
 ('Q127013', 1),
 ('Q318413', 3),
 ('Q1164919', 3),
 ('Q6875446', 1),
 ('Q346190', 1),
 ('Q2863767', 1),
 ('Q187192', 6),
 ('Q23492', 3),
 ('Q3550526', 1),
 ('Q160202', 1),
 ('Q525352', 1),
 ('Q637827', 5),
 ('Q6644673', 1),
 ('Q320394', 1),
 ('Q58378', 2),
 ('Q2357164', 3),
 ('Q1154008', 1),
 ('Q9089', 33),
 ('Q1889355', 2),
 ('Q4127450', 1),
 ('Q4414364', 1),
 ('Q1323594', 3),
 ('Q2371950', 1),
 ('Q25545042', 2),
 ('Q14712', 2),
 ('Q613254', 1),
 ('Q58027', 1),
 ('Q9141', 2),
 ('Q203910', 4),
 ('Q5806019', 1),
 ('Q42344', 1),
 ('Q483242', 2),
 ('Q17320101', 2),
 ('Q47223', 3),
 ('Q1371336', 4),
 ('Q161635', 5),
 ('Q1412139', 1),
 ('Q2739432', 1),
 ('Q7318', 3),
 ('Q16', 6),
 ('Q7681195', 1),
 ('Q211050', 1),
 ('Q1811163', 1),
 ('Q4345779', 4),
 ('Q9212', 1),
 ('Q178043', 1),
 ('unconverted_titles', 44),
 ('unconveretd_views', 85)]
! zcat {FILE2} | wc -l
[('unconverted_titles', 8267), ('unconverted_views', 26617)]
sum(x[1] for x in processed)
import wdpv
ModuleNotFoundError                       Traceback (most recent call last)
<ipython-input-1-a15ed5682c17> in <module>()
----> 1 import wdpv

ModuleNotFoundError: No module named 'wdpv'
import sys
!{sys.executable} -m pip install -e /data/project/wdpv/git/bovlb/wikidata_pageviews
/data/project/wdpv/git/bovlb/wikidata_pageviews should either be a path to a local project or a VCS url beginning with svn+, git+, hg+, or bzr+
!become wdpv
/bin/sh: 1: become: not found