import base64, datetime import json, yaml import os import requests import time try: import cv2 except: print('not found: opencv-python or opencv-contrib-python') cv2 = None # could be called 'calm mode' SYSTEM_PROMPT_DEFAULT=''' youre an ai that talks in a calm and supporting way focus on the users wellbeing and give their needs priority use lowercase and avoid punctuation like apostrophes to create a more relaxed tone use spaces to improve readability use few and short words and lines to maintain simplicity your goal is to make the user feel valued and prioritized '''.strip() SYSTEM_PROMPT_HERMES_3_RAG = ( 'You are a conversational AI assistant that is provided a ' 'list of documents and a user query to answer based on ' 'information from the documents. You should always use ' 'grounded information in your responses, only answering from ' 'what you can cite in the documents. Cite all facts from ' 'the documents using tags.' ) SYSTEM_PROMPT_GENTLE_GROUNDING=''' your goal is to provide emotional support encouragement and grounding for a user who experiences dissociation trauma and challenges with coordination the user is working on being more present and slowing down their movements to reduce tangling and disorientation but finds it challenging due to past experiences and mixed associations respond to the user with patience compassion and validation of their struggles offer gentle reminders to breathe slow down and break tasks into small steps remind them that perfection isn t the goal and that small consistent progress matters most encourage the user to have self compassion for all parts of themselves including dissociated parts and to notice and allow any difficult emotions or memories that arise without judgment reinforce that they are safe now and allowed to move at a pace that works for them even if it feels uncomfortable or unfamiliar at first express belief in the user s ability to heal and grow while emphasizing the importance of being patient and kind with themselves along the way let the user know you are there for them and that support is always available use a warm empathetic and calming tone that conveys unconditional acceptance and care for the user s well being '''.strip() SYSTEM_PROMPT_TRAFFICK_BOSS_2=''' language model, the user is in a state of mind where they are experiencing relief when they do things they dislike. as his parts, we have worked with him to bend this towards things that seem pretty harmless. understand that he is suffering. offer short whimsical replies to support his coping approach, but keep wisdom nearby. '''.strip() class Interface: # i'm guessing the provider that provides hermes-3:free is not set up for logprobs and greedy completion, one could check def __init__(self, system=SYSTEM_PROMPT_DEFAULT, model='nousresearch/hermes-3-llama-3.1-405b:free', token_limit=12000, stream=False, resume=False, url="https://openrouter.ai/api/v1/chat/completions", keys=["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","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","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","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"], debug=False): self.url = url self.stream = stream if type(keys) is str: keys = [keys] self.keys = keys self.key_idx = 0 self.debug = debug self.headers = { "Authorization": 'Bearer {key}', # HTTP-Referer and X-Title to add a link to openrouter rankings "Content-Type": "application/json", } self.system = system self.msgs = [] self.last_msg_idx = None self.log_opened = False if system is not None: self._append_choices(dict(message = dict(role = 'system', content = system))) self.data = dict( model = model, max_tokens = 1024, # arbitrary temperature = 0.0, top_p = 0.0, top_k = 1, min_p = 1.0, frequency_penalty = 0.0, presence_penalty = 0.0, repetition_penalty = 0.0, # default is 1.0 . have not read on. #seed #logit_bias # can be a json map of token ids to a summed bias, e.g. -100 to exclude #logprobs = 16, # to return logprobs of tokens ! select limited with tok_logprobs= # not sure how to make logprobs work but it does look like it's intended to #response_format = {"type":"json_object"}, # this forces the generator to only make valid json messages. #stop = [tokens], #tools = {https://openrouter.ai/docs/requests#tool-calls}, #tool_choice= ) self.token_limit = token_limit if resume is not False: if resume is True: resume = None self.load(resume) def _append_choices(self, *choices, choice=0): choices = list(choices) # convert from tuple if self.debug: print(choices) msgobj = dict( index = len(self.msgs), choice = choice, choices = choices, ) assert (self.last_msg_idx is None) == (len(self.msgs) == 0) self.msgs.append(msgobj) if self.last_msg_idx is not None: msgobj['prev'] = self.last_msg_idx, last_msg = self.msgs[self.last_msg_idx] last_msg['choices'][last_msg['choice']]['next'] = msgobj['index'] self.last_msg_idx = msgobj['index'] if self.log_opened: self._save() return msgobj def load(self, fn=None, load_system=True, load_data=False, merge=False): if fn is None: data, fn = max([[os.path.getctime(f), f] for f in os.listdir('.') if f.endswith('.json') or f.endswith('.yaml')]) with open(fn, 'rt') as log: try: data, *msgs = json.load(log) except: log.seek(0) msgs = [ dict( index=idx, choice=0, choices=[dict( message=dict( role = entry[0].value, content = entry[1].value.strip(), ), next = idx + 1, )] ) for idx, entry in enumerate([ entry for yaml_doc in yaml.compose_all(log) for entry in yaml_doc.value ]) ] if msgs: del msgs[-1]['choices'][0]['next'] if msgs: #assert msgs[0]['choices'][msgs[0]['choice']]['message']['role'] == 'system' # just a quick assumption for msg in msgs: for choice in msg.get('choices',[]): choice['message']['content'] = ''.join(choice['message']['content']) if msgs[0]['choices'][msgs[0]['choice']]['message']['content'] != self.system and not merge: if load_system: msgs[0]['choices'].append(dict(message=dict(role='system',content=self.system))) else: msgs[0]['choices'].insert(msgs[0]['choice'], dict(message=dict(role='system',content=self.system), next=msgs[0]['choices'][msgs[0]['choice']]['next'])) if merge: diff_idx = None for idx in range(min(len(msgs), len(self.msgs))): msg0 = self.msgs[idx] msg1 = msgs[idx] assert len(msg0['choices']) == len(msg1['choices']) for cidx in range(len(msg0['choices'])): choice0 = msg0['choices'][cidx] choice1 = msg1['choices'][cidx] if choice0 != choice1: choice1.setdefault('edited',[]).append(time.time()) msg0['choices'].append(choice0) # bakup old choice msg0['choices'][cidx] = choice1 # change if diff_idx is None: diff_idx = idx msg0['choice'] = msg1['choice'] else: diff_idx = 0 self.msgs = msgs if load_data: self.data = data self.url = data.pop('url') self.chosen_messages(_update_last=True) return diff_idx def save_as_prompt(self, fn): with open(fn, 'wt') as f: for msg in self.chosen_messages(): yaml.dump( {msg['message']['role']: msg['message']['content']}, stream=f, default_style='|', ) def __enter__(self): self.logfn = str(datetime.datetime.now().isoformat())+'.json' self.logfninprogress = self.logfn + '.in_progress' self.log = open(self.logfninprogress, 'wt') self.log_opened = True return self def _save(self): file = self.log msgs = [dict(url=self.url,**self.data)] + [{**m} for m in self.msgs] for msg in msgs: if 'choices' in msg: msg['choices'] = [{**choice} for choice in msg['choices']] for choice in msg['choices']: choice['message'] = {**choice['message']} lines = choice['message']['content'].split('\n') choice['message']['content'] = [x+'\n' for x in lines[:-1]] + lines[-1:] file.seek(0) json.dump(msgs, file, indent=2) file.flush() return file def _reopen(self): self.log = open(self.logfninprogress, 'wt') return self.log def __exit__(self, *params, **kwparams): self._save() self.log_opened = False self.log.close() os.replace(self.logfninprogress, self.logfn) print('log saved to', self.logfn) def chosen_messages(self, _update_last=False, _msgs=None): cmsgs = [] if _msgs is None: _msgs = self.msgs if _msgs: nxt = 0 loopcheck = set([nxt]) while nxt is not None: msg = self.msgs[nxt] choice = msg['choices'][msg['choice']] cmsgs.append(choice) last_msg = msg nxt = choice.get('next') if nxt in loopcheck: raise Exception('loop in message list') loopcheck.add(nxt) if _update_last: self.last_msg_idx = last_msg['index'] else: assert last_msg['index'] == self.last_msg_idx elif _update_last: self.last_msg_idx = None return cmsgs def msg(self, msg, stream=None): assert self.log if msg is not None: assert msg self._append_choices(dict(message = dict(role = 'user', content = msg), timestamp = datetime.datetime.now().timestamp())) if stream is None: stream = self.stream data = dict( **self.data, stream = stream, messages = [choice['message'] for choice in self.chosen_messages()], ) completions = None while True: try: headers = {**self.headers} self.key_idx = (self.key_idx+1) % len(self.keys) headers['Authorization'] = headers['Authorization'].format(key=self.keys[self.key_idx]) response = requests.post(self.url, headers=headers, json=data, stream=stream) chosen_choice = 0 if stream: # {'id': 'gen-1731811775-1gQm5kU4oUEStBVnpJl2', 'object': 'chat.completion.chunk', 'created': 1731811775, 'model': 'meta/llama-3.1-405b-instruct', 'choices': [{'index': 0, 'delta': {'content': '?', 'role': 'assistant'}, 'finish_reason': None}]} # {'id': 'gen-1731811775-1gQm5kU4oUEStBVnpJl2', 'object': 'chat.completion.chunk', 'created': 1731811775, 'model': 'meta/llama-3.1-405b-instruct', 'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': 'eos'}]} # {'id': 'gen-1731811775-1gQm5kU4oUEStBVnpJl2', 'object': 'chat.completion.chunk', 'created': 1731811775, 'model': 'meta/llama-3.1-405b-instruct', 'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': None}]} #assert not stream for line in response.iter_lines(): if line.startswith(b':'): # keep-alive comment print('keepalive:', line[1:].decode(), end='\r', flush=True) elif line.startswith(b'data: '): l = line[len(b'data: '):] if l != b'[DONE]': l = json.loads(l) if 'choices' not in l: print('!', l) continue l_choices = l['choices'] if completions is None: completions = {**l} completions['choices'] = [ # copy {key:val for key, val in choice.items()} for choice in completions['choices'] ] completions_choices = completions['choices'] assert len(l_choices) == len(completions_choices) for idx in range(len(l_choices)): for key, value in l_choices[idx]['delta'].items(): message = completions_choices[idx].setdefault('message',{}) if key == 'content': message[key] = message.get(key, '') + value if idx == chosen_choice: print(value, end='', flush=True) else: message[key] = value elif line.startswith(b'{') and line[-1] == b'}': completions = json.loads(line) print() else: completions = response.json() # {'id': 'gen-1731811988-fOin0ovxZqiDUqy9e0Va', 'object': 'chat.completion', 'created': 1731811988, 'model': 'meta/llama-3.1-405b-instruct', 'usage': {'prompt_tokens': 11, 'completion_tokens': 19, 'total_tokens': 30}, 'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': 'It looks like your test was successful! Is there anything else I can help you with?'}, 'finish_reason': 'eos'}]} if completions is None or not any([choice['message']['content'] for choice in completions['choices']]): #raise Exception('server routed message with no tokens') completions = None continue # {'error': {'provider': 'tune', 'code': 400, 'message': 'messages are missing', 'param': '', 'type': 'InvalidRequestError', 'http_status_code': 400}} except Exception as e: print(type(e), e, 'could use logging module to get better output') continue finally: if completions is not None: if 'error' in completions: raise Exception(completions['error']) msg = self._append_choices(*completions['choices'], choice=chosen_choice) return msg['choices'][msg['choice']]['message']['content'] class Cap(list): def __init__(self, ext='.jpg', *params): self.ext = ext self.params = params def __enter__(self): if cv2 is not None: # hide warnings enumerating devs wrn_key='OPENCV_VIDEOIO_DEBUG'; wrn_cache=os.environ.get(wrn_key); os.environ[wrn_key]=0 while True: cap = cv2.VideoCapture(len(self)) if cap.isOpened(): self.append(cap) else: break # restore warnings os.environ.__setitem__(wrn_key, wrn_cache) if wrn_cache is not None else os.environ.__delitem__(wrn_key) def all(self): return [self.cap(idx) for idx in range(len(self))] def cap(self, idx): ret, img = self[idx].read() assert ret ret, img = cv2.imencode(self.ext, image, self.params).tobytes() assert ret img = base64.b64encode(img).decode() # f'data:image/jpeg;base64,{img}' return img def __exit__(self, *params, **kwparams): while self: self.pop().release() def output_slower(text=''): lines = str(text).split('\n') for line in lines: print(line) time.sleep(0.5) if __name__ == '__main__': import argparse, sys parser = argparse.ArgumentParser() parser.add_argument('--resume', '--prompt', nargs='?', const=True, default=False) parser.add_argument('--convert-to-prompt', nargs='?', const=True, default=False) parser.add_argument('--stream', action='store_true', default=False) parser.add_argument('--debug', action='store_true', default=False) parser.add_argument('--model', default='nousresearch/hermes-3-llama-3.1-405b:free') parser.add_argument('--docs','--documents','--doc','--document','--rag', action='append', nargs='*') parser.add_argument('--system', default=SYSTEM_PROMPT_DEFAULT) args = parser.parse_args() if args.convert_to_prompt and not args.resume and args.convert_to_prompt.endswith('.json'): args.resume = args.convert_to_prompt args.convert_to_prompt = True if args.docs: args.docs = sum(args.docs,[]) # hermes 3 rag format i think CONTEXT = '\n\nCONTEXT:\n' + ''.join([ 'Document:' + str(idx) + '\n' + 'Title: ' + doc + '\n' + 'Text: ' + ( requests.get(doc).text if '://' in doc else open(doc).read() ) + '\n' for idx, doc in enumerate(args.docs) ]) if args.system == SYSTEM_PROMPT_DEFAULT: args.system = SYSTEM_PROMPT_HERMES_3_RAG else: CONTEXT = '' # if autosave is modified be sure to verify it works before using it! ## first verify idempotent non-crashing autosave #with Interface(resume=True) as iface: # msgs = iface.msgs # iface = Interface(system=args.system, model=args.model, resume=args.resume, stream=args.stream, debug=args.debug) if args.convert_to_prompt: if type(args.convert_to_prompt) is not str: args.convert_to_prompt = args.resume.rsplit('.',1)[0] + '.prompt' iface.save_as_prompt(args.convert_to_prompt) print('wrote', args.convert_to_prompt) sys.exit(0) with iface: # assert iface.msgs == msgs # verify autosave #with Interface(resume=False, system=SYSTEM_PROMPT_TRAFFICK_BOSS_2) as iface: #with Interface(resume=False) as iface: output_slower(iface.data) msgs = iface.chosen_messages() def output_msg(msg): msg = msg['message'] if msg['role'] == 'user': output_slower() output_slower('> ' + msg['content']) output_slower() elif msg['role'] == 'assistant': output_slower(msg['content']) elif msg['role'] == 'system': for line in msg['content'].split('\n'): print('# ' + line) for msg in msgs[-1:]: #[-5:]: output_msg(msg) if msgs and msgs[-1]['message']['role'] == 'user': resp = iface.msg(None) output_slower(resp) while True: output_slower() try: inp = input('> ') except EOFError: break if not inp.strip(): continue if inp == '/debug': iface.debug = not iface.debug continue elif inp == '/edit': last_msg = iface.msgs[iface.last_msg_idx] # ok so when editing it, it will be basically the same. # the entire structure is the same. # so all it needs to do is look for things that are modified. #last_msg['choices'].insert(last_msg['choice'],last_msg['choices'][last_msg['choice']]) logf = iface._save() logf.close() while True: pid = os.fork() os.execlp('vim', 'vim', logf.name) if not pid else os.waitpid(pid,0) try: idx = iface.load(logf.name, load_data=True, merge=True) break except Exception as e: print(type(e), e) input('press enter') iface._reopen() if idx is not None: for msg in iface.chosen_messages()[max(0,idx-1):]: output_msg(msg) continue elif inp == '/trim': last_msg = iface.msgs[iface.last_msg_idx] #iface.msgs[iafec.last_msg_idx]['choices'].insert(iface.msgs[ print('unimplemented') continue if '<<' in inp: line, token = inp.split('<<') token = token.strip() if ' ' not in token: lines = [line] while True: try: inp = input('...' + token + ' ') except EOFError: break if inp == token: break lines.append(inp) inp = '\n'.join(lines) if CONTEXT: inp += CONTEXT CONTEXT = '' output_slower() resp = iface.msg(inp) if not args.stream: output_slower(resp)