Skip to content

Conversation

@annamel
Copy link

@annamel annamel commented Jan 30, 2017

Periodic cleanup of YT aggregation table in order to decrease its size and thus the time of ttl_cleanup_sched time (Depends on PR366)

@annamel annamel force-pushed the ttl_sched_yt_cleanup branch from 4200bc8 to 4d334cf Compare February 10, 2017 15:03
yt_attempts = self.params.get('ttl_cleanup', {}).get('yt_attempts', 3)
yt_delay = self.params.get('ttl_cleanup', {}).get('yt_delay', 10)

from yt_worker import YqlWrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Импорт YqlWrapper специально был перенесён в метод из-за проблем зависимостей пакетов yql и yt. Нам сейчас нужно это так и оставить, потому что мы не хотим слепо обновляться до последних версий этих библиотек, это может привести к проблемам в других компоненах, которые пользуются библиотеками yql.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WHERE sum_expired_size >= {trigger};
"""

REPLACE_QUERY = """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это запрос нужно отформатировать таким же образом, которым отформатирован AGGREGATE_QUERY выше.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А почему не таким же как PREAGGREGATE_QUERY?

Copy link
Contributor

@nobodyisme nobodyisme Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Потому что PREAGGREGATE_QUERY отформатирован плохо, он нарушает структуру нативного кода.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.ttl_cleanup_timer = periodic_timer(
seconds=self.params.get('ttl_cleanup', {}).get('ttl_cleanup_period', 60 * 15))
self.ttl_cleanup_yt_timer = periodic_timer(
seconds=self.params.get('ttl_cleanup', {}).get('yt_cleanup_period', 60 * 60 * 24 * 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вынеси, пожалуйста, yt_cleanup_period в examples/mastermind.conf, и оставь комментарий про 60 * 60 * 24 * 10.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._cleanup_aggregation_yt()

except LockFailedError:
logger.info('TTl cleanup planner or ttl yt cleaner is already running')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Видимо, сообщение в лог внутри _ttl_cleanup_planner при LockFailedError теперь должно поменяться.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 85d74e4

except:
logger.exception('Failed to clean YT aggregation table')
finally:
logger.info('TTL YT cleaner finished')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему всё время скачет регистр?
https://github.com/yandex/mastermind/pull/391/files#diff-68b1944151933048b6bb46ecf5bb688fR263
https://github.com/yandex/mastermind/pull/391/files#diff-68b1944151933048b6bb46ecf5bb688fR269

Остановись, пожалуйста, на одном варианте.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вроде fixed

# But in the beginning we should get update from aggregation table
# (i.e. values that were recorded during our processing and thus that are absent in tmp_table)
replace_query = self.REPLACE_QUERY.format(
starting_time=time.time(), tmp_table=tmp_table_name, base_table=aggregate_table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Почему starting_time начинается от текущего момента, хотя выборка производилась раньше (возможно, сильно раньше), когда выполнялся запрос https://github.com/yandex/mastermind/pull/391/files#diff-878f10083915ff8adfd73d2409039842R243
  2. Почему в tmp-таблицу добавляются записи, которые появились раньше starting_time, хотя в комментарии выше написано, что должны добавляться новые?
    https://github.com/yandex/mastermind/pull/391/files#diff-878f10083915ff8adfd73d2409039842R82
    Разъясни, пожалуйста, эту часть логики.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Была ошибка. Спасибо что обратил внимание. Починила в 85d74e4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Есть ли тесты на этот кейс? Если тесты есть, почему по ним не было видно этой проблемы? Если нет, то будут ли?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ошибка, которую ты заметил, относится к ошибкам тестирования. Поясню. Поскольку в таблицу нет интенсивных записей (а мне не удавалось руками "поймать" момент, когда ф-ия колбасит данные и в параллель записать в таблицу; автоматизировать же этот код было лень), я на этапе каких-то последних тестов изменила знак неравенства с целью проверить, что "псевдо-новенькие" записи нормально перельются во временную таблицу. На этом "ручном" тесте я в частности отловила момент с несоответствием целочисленных типов. Но неравенство обратно поправить забыла. Я к тому, что не то чтобы этот код никогда не работал. Я его проверяла руками и смотрела таблички.

Автотестов на этот кейз нет и не представляется возможным. Да, мы можем сказать какие записи должны появиться в таблице в ходе работы теста по "основному" сценарию ttl_cleanup. Да, мы можем сказать, какие из этих записей точно должны удалиться в ходе yt_cleanup. Но поскольку и ttl_cleanup и yt_cleanup запускаются по своему расписанию и мы не контролируем их запуск, а в ходе теста мы уходим на относительно долгие операции (итерация по группам и проверка содержимого), то гарантированно проверить то, что запись была и запись удалилась, представляется маловероятным. Можно, конечно, в отдельном потоке, а лучше в отдельном worker-е, молотить по базе и смотреть что вот записи возникли и вот записи удалились. Но запрос на получение записей и анализ - тоже какое-то время. Время же жизни записи в таблице мы не можем контролировать. То есть даже в случае отдельного воркера тесты будут "вероятностные". Нормальным решением стало бы контролируемое время запуска yt_cleanup - тогда мы могли бы начиная с какой-то засечки ожидать исчезновения записей из таблицы, но такой вариант невозможен в силу текущей архитектуры мастермайнда

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я правильно понимаю, что в случае бага мы бы получили дублирующиеся записи в агрегированной таблице? Почему бы не написать тесты на то, что после чистки таблицы в новой копии у нас нет дублей?

Copy link
Author

@annamel annamel Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тест добавлен. К сожалению, он наслоился на PR6. https://github.yandex-team.ru/indigo/mastermind-qa/pull/6. Это последний патч в стопке, и да он зависит от тех изменений

for couple_data in couples_data:
c = couple_data['couple']

if c == "None":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Такого быть не должно, и обрабатывать это не имеет смысла. Откуда взялась такая проверка?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Проверка взялась от факта наличия такой записи. Откуда-то в монгу такая капла просочилось. Не знаю, есть ли такое в проде - в тестинге возникло. И продолжает возникать

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Покажи, пожалуйста, как выглядят эти записи в тестинге. Я таких не нашёл, ни в твоей монге, ни в тестовой. Вот, например, в твоей:
https://paste.yandex-team.ru/207942

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Wed Feb 22 12:15:10 2017] [ERROR] app/mastermind2.26: [mm.planner] [11636] Damaged couple? {u'couple': u'None', u'_id': ObjectId('586146b4acb2e49fc62bc61f'), u'cleanup_ts': 1485535200}

 Эта запись в контейнере. sudo lxc-attach -n mastermind.aniam.lxc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тогда тебе нужно понять, откуда такая запись появляется. Это проблема не в тестинге, а на твоём тестовом стенде. Если в тестинге такого нет, похоже, что проблема в твоём коде. Таких записей в монге появляться не должно.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


group_id = int(c.split(":")[0])

if group_id not in storage.groups:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это условие не обязательно должно выполняться. couple_id в общем случае не обязан быть связан с номерами групп, поэтому проверять тут это не стоит.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error("Not valid group is within collection {}".format(group_id))
continue
group = storage.groups[group_id]
if not group.couple:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple можно попробовать получить напрямую из storage.couples, используя поле couple записи в монге. Делать поиск через группу не нужно.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не вижу нужных изменения в коммите 85d74e4

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Моя недоработка. Сначала сделала как ты сказал, а потом откатила их. Дело в том, что мы все равно нуждаемся в group_id для результата. А если мы уже и так имеем группу, то ведь недурно проверить, что она все еще coupled. И там я уже не вижу принципиальной разницы - искать в storage.couples или в storage.groups. Может статься, что в storage.groups даже быстрее

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

С точки зрения здравого смысла ты получаешь из коллекции каплы, а значит и надо интерпретировать их как каплы. Группу в самом конце, где тебе нужно, ты можешь спокойно получить из капла, а танцы с бубном вокруг айдишника группы тут совершенно лишние. Тем более, получая капл, ты избавляешь себя от необходимости проверки, что у группы есть капл, — получая группу из капла ты автоматически получаешь и такую гарантию.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


ns_settings = self.namespaces_settings.get(group.couple.namespace.id)
if ns_settings and not ns_settings.attributes.ttl.enable:
logger.debug("Skipping group {} cause ns '{}' no ttl".format(group_id, group.couple.namespace.id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause -> because
no ttl -> no ttl enabled, или даже лучше полностью написать, что because ttl is not enabled for ns {}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Обсудили в телеграмме, комментарий признан не критичным

for couple_data in couples_data:
c = couple_data['couple']

if c == "None":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Покажи, пожалуйста, как выглядят эти записи в тестинге. Я таких не нашёл, ни в твоей монге, ни в тестовой. Вот, например, в твоей:
https://paste.yandex-team.ru/207942

logger.error("couple {} not in couples_hist {}".format(couple_id, couples_hist))
continue

if couples_hist[couple_id] < expiration:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Есть ли тесты на этот кейс? Если тесты есть, почему по ним не было видно этой проблемы? Если нет, то будут ли?

Я попросил тут оставить комментарий про значение None у couples_hist[couple_id], комментария в коммите не вижу.

converting_func = lambda r, i: "CAST({} AS {})".format(r, table.columns[i][1]) \
if table.columns[i][1] != "String" else ("'" + str(r) + "'")

ins_query += "(" + ", ".join(converting_func(r, row.index(r)) for r in row) + "),"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Пока не придём, пулл-реквест принят не будет. Я не могу согласиться с такой реализаций этого метода:

  1. Запрос приходится собирать по кускам по всему коду метода вместо того, чтобы увидеть его сразу;
  2. Код предполагает, что специального форматирования потребует только тип "String", об остальных типах (какие они бывают, какие поддерживаются, и т.д.) из этого кода непонятно совсем;
  3. Не видно, с какими данными из таблицы работает код.

# But in the beginning we should get update from aggregation table
# (i.e. values that were recorded during our processing and thus that are absent in tmp_table)
replace_query = self.REPLACE_QUERY.format(
starting_time=time.time(), tmp_table=tmp_table_name, base_table=aggregate_table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Есть ли тесты на этот кейс? Если тесты есть, почему по ним не было видно этой проблемы? Если нет, то будут ли?

logger.error("Not valid group is within collection {}".format(group_id))
continue
group = storage.groups[group_id]
if not group.couple:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не вижу нужных изменения в коммите 85d74e4

@annamel annamel force-pushed the ttl_sched_yt_cleanup branch from 63704a1 to 96fc2fd Compare March 2, 2017 13:22
Anna Melekhova added 4 commits March 2, 2017 16:35
By old records we mean records with expiration_date < last ttl_cleanup_run. That is
records that have to be removed by ttl_cleanup
r = self.send_request(query)

try:
table = next(x for x in r.results)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Кажется, можно было бы просто написать next(r.results)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вроде как да. r.results объявлен как self.result = method(
full_url,
timeout=self.timeout,
headers=headers
)
https://a.yandex-team.ru/api/tree/blob/trunk/arcadia/kikimr/yql/library/python/yql/client/request.py/?rev=2606973
Но при этом попытка получить next приводила к exception-у (если я правильно помню что-то типа (object is not an iterator). Если это важно, могу воспроизвести и разобраться, почему все-таки был exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тогда next(iter(r.results)) должен помочь.

try:
table = next(x for x in r.results)
except:
logger.exception("Incorrect results of query {}".format(query))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В случае исключения, кажется, продолжать нельзя, нужен return.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ок

value_type = table.columns[value_idx][1]
if value_type == "String":
return "'{}'".format(value) # add extra '
if value_type == "Uint64" or value_type == "Int64" or value_type == "Int32" or value_type == "Uint32":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if value_type in ("Uint64", "Int64", "Int32", "Uint32"):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ок

return "'{}'".format(value) # add extra '
if value_type == "Uint64" or value_type == "Int64" or value_type == "Int32" or value_type == "Uint32":
return "CAST({} AS {})".format(value, value_type)
raise ValueError("Unsupported type in aggregation table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сюда можно было бы добавить этот самый неподдерживаемый тип и даже название колонки.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ок

ins_query = "INSERT INTO [{tmp_table_name}] ({values_names}) VALUES {values};".format(
tmp_table_name=tmp_table_name,
values_names=", ".join(cln[0] for cln in table.columns),
values=", ".join("({})".format(", ".join(convertion(r, row.index(r)) for r in row)) for row in values_to_insert))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convertion(r, row.index(r)) for r in row)) лучше заменить на convertion(r, i) for i, r in enumerate(row). Операция index() тут не нужна, не говоря уже о том, что она дорогая.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Спасибо!

for couple_data in couples_data:
c = couple_data['couple']

if c == "None":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тогда тебе нужно понять, откуда такая запись появляется. Это проблема не в тестинге, а на твоём тестовом стенде. Если в тестинге такого нет, похоже, что проблема в твоём коде. Таких записей в монге появляться не должно.

logger.error("Not valid group is within collection {}".format(group_id))
continue
group = storage.groups[group_id]
if not group.couple:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

С точки зрения здравого смысла ты получаешь из коллекции каплы, а значит и надо интерпретировать их как каплы. Группу в самом конце, где тебе нужно, ты можешь спокойно получить из капла, а танцы с бубном вокруг айдишника группы тут совершенно лишние. Тем более, получая капл, ты избавляешь себя от необходимости проверки, что у группы есть капл, — получая группу из капла ты автоматически получаешь и такую гарантию.

logger.error("couple {} not in couples_hist {}".format(couple_id, couples_hist))
continue

if couples_hist[couple_id] < expiration:
Copy link
Contributor

@nobodyisme nobodyisme Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Про тесты: баг, который я заметил, приводил бы к тому, что мы теряли все полезные данные из агрегированной таблицы. Так что меня абсолютно не устраивает, что тестов на этот кейс нет. Этот кейс обязательно надо покрыть.

Anna Melekhova added 2 commits March 14, 2017 17:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants